mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2025-12-27 04:29:58 +00:00
Merge pull request #2559 from quickwit-oss/trinity/sstable-partial-automaton
allow warming partially an sstable for an automaton
This commit is contained in:
@@ -67,6 +67,7 @@ tokenizer-api = { version = "0.3", path = "./tokenizer-api", package = "tantivy-
|
|||||||
sketches-ddsketch = { version = "0.3.0", features = ["use_serde"] }
|
sketches-ddsketch = { version = "0.3.0", features = ["use_serde"] }
|
||||||
hyperloglogplus = { version = "0.4.1", features = ["const-loop"] }
|
hyperloglogplus = { version = "0.4.1", features = ["const-loop"] }
|
||||||
futures-util = { version = "0.3.28", optional = true }
|
futures-util = { version = "0.3.28", optional = true }
|
||||||
|
futures-channel = { version = "0.3.28", optional = true }
|
||||||
fnv = "1.0.7"
|
fnv = "1.0.7"
|
||||||
|
|
||||||
[target.'cfg(windows)'.dependencies]
|
[target.'cfg(windows)'.dependencies]
|
||||||
@@ -121,7 +122,7 @@ zstd-compression = ["zstd"]
|
|||||||
failpoints = ["fail", "fail/failpoints"]
|
failpoints = ["fail", "fail/failpoints"]
|
||||||
unstable = [] # useful for benches.
|
unstable = [] # useful for benches.
|
||||||
|
|
||||||
quickwit = ["sstable", "futures-util"]
|
quickwit = ["sstable", "futures-util", "futures-channel"]
|
||||||
|
|
||||||
# Compares only the hash of a string when indexing data.
|
# Compares only the hash of a string when indexing data.
|
||||||
# Increases indexing speed, but may lead to extremely rare missing terms, when there's a hash collision.
|
# Increases indexing speed, but may lead to extremely rare missing terms, when there's a hash collision.
|
||||||
|
|||||||
@@ -3,6 +3,12 @@ use std::io;
|
|||||||
use common::json_path_writer::JSON_END_OF_PATH;
|
use common::json_path_writer::JSON_END_OF_PATH;
|
||||||
use common::BinarySerializable;
|
use common::BinarySerializable;
|
||||||
use fnv::FnvHashSet;
|
use fnv::FnvHashSet;
|
||||||
|
#[cfg(feature = "quickwit")]
|
||||||
|
use futures_util::{FutureExt, StreamExt, TryStreamExt};
|
||||||
|
#[cfg(feature = "quickwit")]
|
||||||
|
use itertools::Itertools;
|
||||||
|
#[cfg(feature = "quickwit")]
|
||||||
|
use tantivy_fst::automaton::{AlwaysMatch, Automaton};
|
||||||
|
|
||||||
use crate::directory::FileSlice;
|
use crate::directory::FileSlice;
|
||||||
use crate::positions::PositionReader;
|
use crate::positions::PositionReader;
|
||||||
@@ -219,13 +225,18 @@ impl InvertedIndexReader {
|
|||||||
self.termdict.get_async(term.serialized_value_bytes()).await
|
self.termdict.get_async(term.serialized_value_bytes()).await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_term_range_async(
|
async fn get_term_range_async<'a, A: Automaton + 'a>(
|
||||||
&self,
|
&'a self,
|
||||||
terms: impl std::ops::RangeBounds<Term>,
|
terms: impl std::ops::RangeBounds<Term>,
|
||||||
|
automaton: A,
|
||||||
limit: Option<u64>,
|
limit: Option<u64>,
|
||||||
) -> io::Result<impl Iterator<Item = TermInfo> + '_> {
|
merge_holes_under_bytes: usize,
|
||||||
|
) -> io::Result<impl Iterator<Item = TermInfo> + 'a>
|
||||||
|
where
|
||||||
|
A::State: Clone,
|
||||||
|
{
|
||||||
use std::ops::Bound;
|
use std::ops::Bound;
|
||||||
let range_builder = self.termdict.range();
|
let range_builder = self.termdict.search(automaton);
|
||||||
let range_builder = match terms.start_bound() {
|
let range_builder = match terms.start_bound() {
|
||||||
Bound::Included(bound) => range_builder.ge(bound.serialized_value_bytes()),
|
Bound::Included(bound) => range_builder.ge(bound.serialized_value_bytes()),
|
||||||
Bound::Excluded(bound) => range_builder.gt(bound.serialized_value_bytes()),
|
Bound::Excluded(bound) => range_builder.gt(bound.serialized_value_bytes()),
|
||||||
@@ -242,7 +253,9 @@ impl InvertedIndexReader {
|
|||||||
range_builder
|
range_builder
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut stream = range_builder.into_stream_async().await?;
|
let mut stream = range_builder
|
||||||
|
.into_stream_async_merging_holes(merge_holes_under_bytes)
|
||||||
|
.await?;
|
||||||
|
|
||||||
let iter = std::iter::from_fn(move || stream.next().map(|(_k, v)| v.clone()));
|
let iter = std::iter::from_fn(move || stream.next().map(|(_k, v)| v.clone()));
|
||||||
|
|
||||||
@@ -288,7 +301,9 @@ impl InvertedIndexReader {
|
|||||||
limit: Option<u64>,
|
limit: Option<u64>,
|
||||||
with_positions: bool,
|
with_positions: bool,
|
||||||
) -> io::Result<bool> {
|
) -> io::Result<bool> {
|
||||||
let mut term_info = self.get_term_range_async(terms, limit).await?;
|
let mut term_info = self
|
||||||
|
.get_term_range_async(terms, AlwaysMatch, limit, 0)
|
||||||
|
.await?;
|
||||||
|
|
||||||
let Some(first_terminfo) = term_info.next() else {
|
let Some(first_terminfo) = term_info.next() else {
|
||||||
// no key matches, nothing more to load
|
// no key matches, nothing more to load
|
||||||
@@ -315,6 +330,84 @@ impl InvertedIndexReader {
|
|||||||
Ok(true)
|
Ok(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Warmup a block postings given a range of `Term`s.
|
||||||
|
/// This method is for an advanced usage only.
|
||||||
|
///
|
||||||
|
/// returns a boolean, whether a term matching the range was found in the dictionary
|
||||||
|
pub async fn warm_postings_automaton<
|
||||||
|
A: Automaton + Clone + Send + 'static,
|
||||||
|
E: FnOnce(Box<dyn FnOnce() -> io::Result<()> + Send>) -> F,
|
||||||
|
F: std::future::Future<Output = io::Result<()>>,
|
||||||
|
>(
|
||||||
|
&self,
|
||||||
|
automaton: A,
|
||||||
|
// with_positions: bool, at the moment we have no use for it, and supporting it would add
|
||||||
|
// complexity to the coalesce
|
||||||
|
executor: E,
|
||||||
|
) -> io::Result<bool>
|
||||||
|
where
|
||||||
|
A::State: Clone,
|
||||||
|
{
|
||||||
|
// merge holes under 4MiB, that's how many bytes we can hope to receive during a TTFB from
|
||||||
|
// S3 (~80MiB/s, and 50ms latency)
|
||||||
|
const MERGE_HOLES_UNDER_BYTES: usize = (80 * 1024 * 1024 * 50) / 1000;
|
||||||
|
// we build a first iterator to download everything. Simply calling the function already
|
||||||
|
// download everything we need from the sstable, but doesn't start iterating over it.
|
||||||
|
let _term_info_iter = self
|
||||||
|
.get_term_range_async(.., automaton.clone(), None, MERGE_HOLES_UNDER_BYTES)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let (sender, posting_ranges_to_load_stream) = futures_channel::mpsc::unbounded();
|
||||||
|
let termdict = self.termdict.clone();
|
||||||
|
let cpu_bound_task = move || {
|
||||||
|
// then we build a 2nd iterator, this one with no holes, so we don't go through blocks
|
||||||
|
// we can't match.
|
||||||
|
// This makes the assumption there is a caching layer below us, which gives sync read
|
||||||
|
// for free after the initial async access. This might not always be true, but is in
|
||||||
|
// Quickwit.
|
||||||
|
// We build things from this closure otherwise we get into lifetime issues that can only
|
||||||
|
// be solved with self referential strucs. Returning an io::Result from here is a bit
|
||||||
|
// more leaky abstraction-wise, but a lot better than the alternative
|
||||||
|
let mut stream = termdict.search(automaton).into_stream()?;
|
||||||
|
|
||||||
|
// we could do without an iterator, but this allows us access to coalesce which simplify
|
||||||
|
// things
|
||||||
|
let posting_ranges_iter =
|
||||||
|
std::iter::from_fn(move || stream.next().map(|(_k, v)| v.postings_range.clone()));
|
||||||
|
|
||||||
|
let merged_posting_ranges_iter = posting_ranges_iter.coalesce(|range1, range2| {
|
||||||
|
if range1.end + MERGE_HOLES_UNDER_BYTES >= range2.start {
|
||||||
|
Ok(range1.start..range2.end)
|
||||||
|
} else {
|
||||||
|
Err((range1, range2))
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
for posting_range in merged_posting_ranges_iter {
|
||||||
|
if let Err(_) = sender.unbounded_send(posting_range) {
|
||||||
|
// this should happen only when search is cancelled
|
||||||
|
return Err(io::Error::other("failed to send posting range back"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
};
|
||||||
|
let task_handle = executor(Box::new(cpu_bound_task));
|
||||||
|
|
||||||
|
let posting_downloader = posting_ranges_to_load_stream
|
||||||
|
.map(|posting_slice| {
|
||||||
|
self.postings_file_slice
|
||||||
|
.read_bytes_slice_async(posting_slice)
|
||||||
|
.map(|result| result.map(|_slice| ()))
|
||||||
|
})
|
||||||
|
.buffer_unordered(5)
|
||||||
|
.try_collect::<Vec<()>>();
|
||||||
|
|
||||||
|
let (_, slices_downloaded) =
|
||||||
|
futures_util::future::try_join(task_handle, posting_downloader).await?;
|
||||||
|
|
||||||
|
Ok(!slices_downloaded.is_empty())
|
||||||
|
}
|
||||||
|
|
||||||
/// Warmup the block postings for all terms.
|
/// Warmup the block postings for all terms.
|
||||||
/// This method is for an advanced usage only.
|
/// This method is for an advanced usage only.
|
||||||
///
|
///
|
||||||
|
|||||||
@@ -93,6 +93,7 @@ impl TermInfoBlockMeta {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
pub struct TermInfoStore {
|
pub struct TermInfoStore {
|
||||||
num_terms: usize,
|
num_terms: usize,
|
||||||
block_meta_bytes: OwnedBytes,
|
block_meta_bytes: OwnedBytes,
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
use std::io::{self, Write};
|
use std::io::{self, Write};
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
use common::{BinarySerializable, CountingWriter};
|
use common::{BinarySerializable, CountingWriter};
|
||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
@@ -113,8 +114,9 @@ static EMPTY_TERM_DICT_FILE: Lazy<FileSlice> = Lazy::new(|| {
|
|||||||
/// The `Fst` crate is used to associate terms to their
|
/// The `Fst` crate is used to associate terms to their
|
||||||
/// respective `TermOrdinal`. The `TermInfoStore` then makes it
|
/// respective `TermOrdinal`. The `TermInfoStore` then makes it
|
||||||
/// possible to fetch the associated `TermInfo`.
|
/// possible to fetch the associated `TermInfo`.
|
||||||
|
#[derive(Clone)]
|
||||||
pub struct TermDictionary {
|
pub struct TermDictionary {
|
||||||
fst_index: tantivy_fst::Map<OwnedBytes>,
|
fst_index: Arc<tantivy_fst::Map<OwnedBytes>>,
|
||||||
term_info_store: TermInfoStore,
|
term_info_store: TermInfoStore,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -136,7 +138,7 @@ impl TermDictionary {
|
|||||||
let fst_index = open_fst_index(fst_file_slice)?;
|
let fst_index = open_fst_index(fst_file_slice)?;
|
||||||
let term_info_store = TermInfoStore::open(values_file_slice)?;
|
let term_info_store = TermInfoStore::open(values_file_slice)?;
|
||||||
Ok(TermDictionary {
|
Ok(TermDictionary {
|
||||||
fst_index,
|
fst_index: Arc::new(fst_index),
|
||||||
term_info_store,
|
term_info_store,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -74,6 +74,7 @@ const CURRENT_TYPE: DictionaryType = DictionaryType::SSTable;
|
|||||||
|
|
||||||
// TODO in the future this should become an enum of supported dictionaries
|
// TODO in the future this should become an enum of supported dictionaries
|
||||||
/// A TermDictionary wrapping either an FST based dictionary or a SSTable based one.
|
/// A TermDictionary wrapping either an FST based dictionary or a SSTable based one.
|
||||||
|
#[derive(Clone)]
|
||||||
pub struct TermDictionary(InnerTermDict);
|
pub struct TermDictionary(InnerTermDict);
|
||||||
|
|
||||||
impl TermDictionary {
|
impl TermDictionary {
|
||||||
|
|||||||
@@ -28,6 +28,7 @@ pub type TermDictionaryBuilder<W> = sstable::Writer<W, TermInfoValueWriter>;
|
|||||||
pub type TermStreamer<'a, A = AlwaysMatch> = sstable::Streamer<'a, TermSSTable, A>;
|
pub type TermStreamer<'a, A = AlwaysMatch> = sstable::Streamer<'a, TermSSTable, A>;
|
||||||
|
|
||||||
/// SSTable used to store TermInfo objects.
|
/// SSTable used to store TermInfo objects.
|
||||||
|
#[derive(Clone)]
|
||||||
pub struct TermSSTable;
|
pub struct TermSSTable;
|
||||||
|
|
||||||
pub type TermStreamerBuilder<'a, A = AlwaysMatch> = sstable::StreamerBuilder<'a, TermSSTable, A>;
|
pub type TermStreamerBuilder<'a, A = AlwaysMatch> = sstable::StreamerBuilder<'a, TermSSTable, A>;
|
||||||
|
|||||||
@@ -11,6 +11,8 @@ description = "sstables for tantivy"
|
|||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
common = {version= "0.7", path="../common", package="tantivy-common"}
|
common = {version= "0.7", path="../common", package="tantivy-common"}
|
||||||
|
futures-util = "0.3.30"
|
||||||
|
itertools = "0.13.0"
|
||||||
tantivy-bitpacker = { version= "0.6", path="../bitpacker" }
|
tantivy-bitpacker = { version= "0.6", path="../bitpacker" }
|
||||||
tantivy-fst = "0.5"
|
tantivy-fst = "0.5"
|
||||||
# experimental gives us access to Decompressor::upper_bound
|
# experimental gives us access to Decompressor::upper_bound
|
||||||
|
|||||||
271
sstable/src/block_match_automaton.rs
Normal file
271
sstable/src/block_match_automaton.rs
Normal file
@@ -0,0 +1,271 @@
|
|||||||
|
use tantivy_fst::Automaton;
|
||||||
|
|
||||||
|
/// Returns whether a block can match an automaton based on its bounds.
|
||||||
|
///
|
||||||
|
/// start key is exclusive, and optional to account for the first block. end key is inclusive and
|
||||||
|
/// mandatory.
|
||||||
|
pub(crate) fn can_block_match_automaton(
|
||||||
|
start_key_opt: Option<&[u8]>,
|
||||||
|
end_key: &[u8],
|
||||||
|
automaton: &impl Automaton,
|
||||||
|
) -> bool {
|
||||||
|
let start_key = if let Some(start_key) = start_key_opt {
|
||||||
|
start_key
|
||||||
|
} else {
|
||||||
|
// if start_key_opt is None, we would allow an automaton matching the empty string to match
|
||||||
|
if automaton.is_match(&automaton.start()) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
&[]
|
||||||
|
};
|
||||||
|
can_block_match_automaton_with_start(start_key, end_key, automaton)
|
||||||
|
}
|
||||||
|
|
||||||
|
// similar to can_block_match_automaton, ignoring the edge case of the initial block
|
||||||
|
fn can_block_match_automaton_with_start(
|
||||||
|
start_key: &[u8],
|
||||||
|
end_key: &[u8],
|
||||||
|
automaton: &impl Automaton,
|
||||||
|
) -> bool {
|
||||||
|
// notation: in loops, we use `kb` to denotate a key byte (a byte taken from the start/end key),
|
||||||
|
// and `rb`, a range byte (usually all values higher than a `kb` when comparing with
|
||||||
|
// start_key, or all values lower than a `kb` when comparing with end_key)
|
||||||
|
|
||||||
|
if start_key >= end_key {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
let common_prefix_len = crate::common_prefix_len(start_key, end_key);
|
||||||
|
|
||||||
|
let mut base_state = automaton.start();
|
||||||
|
for kb in &start_key[0..common_prefix_len] {
|
||||||
|
base_state = automaton.accept(&base_state, *kb);
|
||||||
|
}
|
||||||
|
|
||||||
|
// this is not required for correctness, but allows dodging more expensive checks
|
||||||
|
if !automaton.can_match(&base_state) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// we have 3 distinct case:
|
||||||
|
// - keys are `abc` and `abcd` => we test for abc[\0-d].*
|
||||||
|
// - keys are `abcd` and `abce` => we test for abc[d-e].*
|
||||||
|
// - keys are `abcd` and `abc` => contradiction with start_key < end_key.
|
||||||
|
//
|
||||||
|
// ideally for (abc, abcde] we could test for abc([\0-c].*|d([\0-d].*|e)?)
|
||||||
|
// but let's start simple (and correct), and tighten our bounds latter
|
||||||
|
//
|
||||||
|
// and for (abcde, abcfg] we could test for abc(d(e.+|[f-\xff].*)|e.*|f([\0-f].*|g)?)
|
||||||
|
// abc (
|
||||||
|
// d(e.+|[f-\xff].*) |
|
||||||
|
// e.* |
|
||||||
|
// f([\0-f].*|g)?
|
||||||
|
// )
|
||||||
|
//
|
||||||
|
// these are all written as regex, but can be converted to operations we can do:
|
||||||
|
// - [x-y] is a for c in x..=y
|
||||||
|
// - .* is a can_match()
|
||||||
|
// - .+ is a for c in 0..=255 { accept(c).can_match() }
|
||||||
|
// - ? is a the thing before can_match(), or current state.is_match()
|
||||||
|
// - | means test both side
|
||||||
|
|
||||||
|
// we have two cases, either start_key is a prefix of end_key (e.g. (abc, abcjp]),
|
||||||
|
// or it is not (e.g. (abcdg, abcjp]). It is not possible however that end_key be a prefix of
|
||||||
|
// start_key (or that both are equal) because we already handled start_key >= end_key.
|
||||||
|
//
|
||||||
|
// if we are in the first case, we want to visit the following states:
|
||||||
|
// abc (
|
||||||
|
// [\0-i].* |
|
||||||
|
// j (
|
||||||
|
// [\0-o].* |
|
||||||
|
// p
|
||||||
|
// )?
|
||||||
|
// )
|
||||||
|
// Everything after `abc` is handled by `match_range_end`
|
||||||
|
//
|
||||||
|
// if we are in the 2nd case, we want to visit the following states:
|
||||||
|
// abc (
|
||||||
|
// d(g.+|[h-\xff].*) | // this is handled by match_range_start
|
||||||
|
//
|
||||||
|
// [e-i].* | // this is handled here
|
||||||
|
//
|
||||||
|
// j ( // this is handled by match_range_end (but countrary to the other
|
||||||
|
// [\0-o].* | // case, j is already consumed so to not check [\0-i].* )
|
||||||
|
// p
|
||||||
|
// )?
|
||||||
|
// )
|
||||||
|
|
||||||
|
let Some(start_range) = start_key.get(common_prefix_len) else {
|
||||||
|
return match_range_end(&end_key[common_prefix_len..], &automaton, base_state);
|
||||||
|
};
|
||||||
|
|
||||||
|
let end_range = end_key[common_prefix_len];
|
||||||
|
|
||||||
|
// things starting with start_range were handled in match_range_start
|
||||||
|
// this starting with end_range are handled bellow.
|
||||||
|
// this can run for 0 iteration in cases such as (abc, abd]
|
||||||
|
for rb in (start_range + 1)..end_range {
|
||||||
|
let new_state = automaton.accept(&base_state, rb);
|
||||||
|
if automaton.can_match(&new_state) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let state_for_start = automaton.accept(&base_state, *start_range);
|
||||||
|
if match_range_start(
|
||||||
|
&start_key[common_prefix_len + 1..],
|
||||||
|
&automaton,
|
||||||
|
state_for_start,
|
||||||
|
) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
let state_for_end = automaton.accept(&base_state, end_range);
|
||||||
|
if automaton.is_match(&state_for_end) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
match_range_end(&end_key[common_prefix_len + 1..], &automaton, state_for_end)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn match_range_start<S, A: Automaton<State = S>>(
|
||||||
|
start_key: &[u8],
|
||||||
|
automaton: &A,
|
||||||
|
mut state: S,
|
||||||
|
) -> bool {
|
||||||
|
// case (abcdgj, abcpqr], `abcd` is already consumed, we need to handle:
|
||||||
|
// - [h-\xff].*
|
||||||
|
// - g[k-\xff].*
|
||||||
|
// - gj.+ == gf[\0-\xff].*
|
||||||
|
|
||||||
|
for kb in start_key {
|
||||||
|
// this is an optimisation, and is not needed for correctness
|
||||||
|
if !automaton.can_match(&state) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// does the [h-\xff].* part. we skip if kb==255 as [\{0100}-\xff] is an empty range, and
|
||||||
|
// this would overflow in our u8 world
|
||||||
|
if *kb < u8::MAX {
|
||||||
|
for rb in (kb + 1)..=u8::MAX {
|
||||||
|
let temp_state = automaton.accept(&state, rb);
|
||||||
|
if automaton.can_match(&temp_state) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// push g
|
||||||
|
state = automaton.accept(&state, *kb);
|
||||||
|
}
|
||||||
|
|
||||||
|
// this isn't required for correctness, but can save us from looping 256 below
|
||||||
|
if !automaton.can_match(&state) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// does the final `.+`, which is the same as `[\0-\xff].*`
|
||||||
|
for rb in 0..=u8::MAX {
|
||||||
|
let temp_state = automaton.accept(&state, rb);
|
||||||
|
if automaton.can_match(&temp_state) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
false
|
||||||
|
}
|
||||||
|
|
||||||
|
fn match_range_end<S, A: Automaton<State = S>>(
|
||||||
|
end_key: &[u8],
|
||||||
|
automaton: &A,
|
||||||
|
mut state: S,
|
||||||
|
) -> bool {
|
||||||
|
// for (abcdef, abcmps]. the prefix `abcm` has been consumed, `[d-l].*` was handled elsewhere,
|
||||||
|
// we just need to handle
|
||||||
|
// - [\0-o].*
|
||||||
|
// - p
|
||||||
|
// - p[\0-r].*
|
||||||
|
// - ps
|
||||||
|
for kb in end_key {
|
||||||
|
// this is an optimisation, and is not needed for correctness
|
||||||
|
if !automaton.can_match(&state) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// does the `[\0-o].*`
|
||||||
|
for rb in 0..*kb {
|
||||||
|
let temp_state = automaton.accept(&state, rb);
|
||||||
|
if automaton.can_match(&temp_state) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// push p
|
||||||
|
state = automaton.accept(&state, *kb);
|
||||||
|
// verify the `p` case
|
||||||
|
if automaton.is_match(&state) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
false
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub(crate) mod tests {
|
||||||
|
use proptest::prelude::*;
|
||||||
|
use tantivy_fst::Automaton;
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
pub(crate) struct EqBuffer(pub Vec<u8>);
|
||||||
|
|
||||||
|
impl Automaton for EqBuffer {
|
||||||
|
type State = Option<usize>;
|
||||||
|
|
||||||
|
fn start(&self) -> Self::State {
|
||||||
|
Some(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_match(&self, state: &Self::State) -> bool {
|
||||||
|
*state == Some(self.0.len())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn accept(&self, state: &Self::State, byte: u8) -> Self::State {
|
||||||
|
state
|
||||||
|
.filter(|pos| self.0.get(*pos) == Some(&byte))
|
||||||
|
.map(|pos| pos + 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn can_match(&self, state: &Self::State) -> bool {
|
||||||
|
state.is_some()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn will_always_match(&self, _state: &Self::State) -> bool {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn gen_key_strategy() -> impl Strategy<Value = Vec<u8>> {
|
||||||
|
// we only generate bytes in [0, 1, 2, 254, 255] to reduce the search space without
|
||||||
|
// ignoring edge cases that might ocure with integer over/underflow
|
||||||
|
proptest::collection::vec(prop_oneof![0u8..=2, 254u8..=255], 0..5)
|
||||||
|
}
|
||||||
|
|
||||||
|
proptest! {
|
||||||
|
#![proptest_config(ProptestConfig {
|
||||||
|
cases: 10000, .. ProptestConfig::default()
|
||||||
|
})]
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_proptest_automaton_match_block(start in gen_key_strategy(), end in gen_key_strategy(), key in gen_key_strategy()) {
|
||||||
|
let expected = start < key && end >= key;
|
||||||
|
let automaton = EqBuffer(key);
|
||||||
|
|
||||||
|
assert_eq!(can_block_match_automaton(Some(&start), &end, &automaton), expected);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_proptest_automaton_match_first_block(end in gen_key_strategy(), key in gen_key_strategy()) {
|
||||||
|
let expected = end >= key;
|
||||||
|
let automaton = EqBuffer(key);
|
||||||
|
assert_eq!(can_block_match_automaton(None, &end, &automaton), expected);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -7,6 +7,7 @@ use zstd::bulk::Decompressor;
|
|||||||
pub struct BlockReader {
|
pub struct BlockReader {
|
||||||
buffer: Vec<u8>,
|
buffer: Vec<u8>,
|
||||||
reader: OwnedBytes,
|
reader: OwnedBytes,
|
||||||
|
next_readers: std::vec::IntoIter<OwnedBytes>,
|
||||||
offset: usize,
|
offset: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -15,6 +16,18 @@ impl BlockReader {
|
|||||||
BlockReader {
|
BlockReader {
|
||||||
buffer: Vec::new(),
|
buffer: Vec::new(),
|
||||||
reader,
|
reader,
|
||||||
|
next_readers: Vec::new().into_iter(),
|
||||||
|
offset: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn from_multiple_blocks(readers: Vec<OwnedBytes>) -> BlockReader {
|
||||||
|
let mut next_readers = readers.into_iter();
|
||||||
|
let reader = next_readers.next().unwrap_or_else(OwnedBytes::empty);
|
||||||
|
BlockReader {
|
||||||
|
buffer: Vec::new(),
|
||||||
|
reader,
|
||||||
|
next_readers,
|
||||||
offset: 0,
|
offset: 0,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -34,42 +47,52 @@ impl BlockReader {
|
|||||||
self.offset = 0;
|
self.offset = 0;
|
||||||
self.buffer.clear();
|
self.buffer.clear();
|
||||||
|
|
||||||
let block_len = match self.reader.len() {
|
loop {
|
||||||
0 => return Ok(false),
|
let block_len = match self.reader.len() {
|
||||||
1..=3 => {
|
0 => {
|
||||||
|
// we are out of data for this block. Check if we have another block after
|
||||||
|
if let Some(new_reader) = self.next_readers.next() {
|
||||||
|
self.reader = new_reader;
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
return Ok(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
1..=3 => {
|
||||||
|
return Err(io::Error::new(
|
||||||
|
io::ErrorKind::UnexpectedEof,
|
||||||
|
"failed to read block_len",
|
||||||
|
))
|
||||||
|
}
|
||||||
|
_ => self.reader.read_u32() as usize,
|
||||||
|
};
|
||||||
|
if block_len <= 1 {
|
||||||
|
return Ok(false);
|
||||||
|
}
|
||||||
|
let compress = self.reader.read_u8();
|
||||||
|
let block_len = block_len - 1;
|
||||||
|
|
||||||
|
if self.reader.len() < block_len {
|
||||||
return Err(io::Error::new(
|
return Err(io::Error::new(
|
||||||
io::ErrorKind::UnexpectedEof,
|
io::ErrorKind::UnexpectedEof,
|
||||||
"failed to read block_len",
|
"failed to read block content",
|
||||||
))
|
));
|
||||||
}
|
}
|
||||||
_ => self.reader.read_u32() as usize,
|
if compress == 1 {
|
||||||
};
|
let required_capacity =
|
||||||
if block_len <= 1 {
|
Decompressor::upper_bound(&self.reader[..block_len]).unwrap_or(1024 * 1024);
|
||||||
return Ok(false);
|
self.buffer.reserve(required_capacity);
|
||||||
}
|
Decompressor::new()?
|
||||||
let compress = self.reader.read_u8();
|
.decompress_to_buffer(&self.reader[..block_len], &mut self.buffer)?;
|
||||||
let block_len = block_len - 1;
|
|
||||||
|
|
||||||
if self.reader.len() < block_len {
|
self.reader.advance(block_len);
|
||||||
return Err(io::Error::new(
|
} else {
|
||||||
io::ErrorKind::UnexpectedEof,
|
self.buffer.resize(block_len, 0u8);
|
||||||
"failed to read block content",
|
self.reader.read_exact(&mut self.buffer[..])?;
|
||||||
));
|
}
|
||||||
}
|
|
||||||
if compress == 1 {
|
|
||||||
let required_capacity =
|
|
||||||
Decompressor::upper_bound(&self.reader[..block_len]).unwrap_or(1024 * 1024);
|
|
||||||
self.buffer.reserve(required_capacity);
|
|
||||||
Decompressor::new()?
|
|
||||||
.decompress_to_buffer(&self.reader[..block_len], &mut self.buffer)?;
|
|
||||||
|
|
||||||
self.reader.advance(block_len);
|
return Ok(true);
|
||||||
} else {
|
|
||||||
self.buffer.resize(block_len, 0u8);
|
|
||||||
self.reader.read_exact(&mut self.buffer[..])?;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(true)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
|
|||||||
@@ -143,6 +143,16 @@ where TValueReader: value::ValueReader
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn from_multiple_blocks(reader: Vec<OwnedBytes>) -> Self {
|
||||||
|
DeltaReader {
|
||||||
|
idx: 0,
|
||||||
|
common_prefix_len: 0,
|
||||||
|
suffix_range: 0..0,
|
||||||
|
value_reader: TValueReader::default(),
|
||||||
|
block_reader: BlockReader::from_multiple_blocks(reader),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn empty() -> Self {
|
pub fn empty() -> Self {
|
||||||
DeltaReader::new(OwnedBytes::empty())
|
DeltaReader::new(OwnedBytes::empty())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,6 +7,8 @@ use std::sync::Arc;
|
|||||||
use common::bounds::{transform_bound_inner_res, TransformBound};
|
use common::bounds::{transform_bound_inner_res, TransformBound};
|
||||||
use common::file_slice::FileSlice;
|
use common::file_slice::FileSlice;
|
||||||
use common::{BinarySerializable, OwnedBytes};
|
use common::{BinarySerializable, OwnedBytes};
|
||||||
|
use futures_util::{stream, StreamExt, TryStreamExt};
|
||||||
|
use itertools::Itertools;
|
||||||
use tantivy_fst::automaton::AlwaysMatch;
|
use tantivy_fst::automaton::AlwaysMatch;
|
||||||
use tantivy_fst::Automaton;
|
use tantivy_fst::Automaton;
|
||||||
|
|
||||||
@@ -98,20 +100,52 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
|
|||||||
&self,
|
&self,
|
||||||
key_range: impl RangeBounds<[u8]>,
|
key_range: impl RangeBounds<[u8]>,
|
||||||
limit: Option<u64>,
|
limit: Option<u64>,
|
||||||
|
automaton: &impl Automaton,
|
||||||
|
merge_holes_under_bytes: usize,
|
||||||
) -> io::Result<DeltaReader<TSSTable::ValueReader>> {
|
) -> io::Result<DeltaReader<TSSTable::ValueReader>> {
|
||||||
let slice = self.file_slice_for_range(key_range, limit);
|
let match_all = automaton.will_always_match(&automaton.start());
|
||||||
let data = slice.read_bytes_async().await?;
|
if match_all {
|
||||||
Ok(TSSTable::delta_reader(data))
|
let slice = self.file_slice_for_range(key_range, limit);
|
||||||
|
let data = slice.read_bytes_async().await?;
|
||||||
|
Ok(TSSTable::delta_reader(data))
|
||||||
|
} else {
|
||||||
|
let blocks = stream::iter(self.get_block_iterator_for_range_and_automaton(
|
||||||
|
key_range,
|
||||||
|
automaton,
|
||||||
|
merge_holes_under_bytes,
|
||||||
|
));
|
||||||
|
let data = blocks
|
||||||
|
.map(|block_addr| {
|
||||||
|
self.sstable_slice
|
||||||
|
.read_bytes_slice_async(block_addr.byte_range)
|
||||||
|
})
|
||||||
|
.buffered(5)
|
||||||
|
.try_collect::<Vec<_>>()
|
||||||
|
.await?;
|
||||||
|
Ok(DeltaReader::from_multiple_blocks(data))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn sstable_delta_reader_for_key_range(
|
pub(crate) fn sstable_delta_reader_for_key_range(
|
||||||
&self,
|
&self,
|
||||||
key_range: impl RangeBounds<[u8]>,
|
key_range: impl RangeBounds<[u8]>,
|
||||||
limit: Option<u64>,
|
limit: Option<u64>,
|
||||||
|
automaton: &impl Automaton,
|
||||||
) -> io::Result<DeltaReader<TSSTable::ValueReader>> {
|
) -> io::Result<DeltaReader<TSSTable::ValueReader>> {
|
||||||
let slice = self.file_slice_for_range(key_range, limit);
|
let match_all = automaton.will_always_match(&automaton.start());
|
||||||
let data = slice.read_bytes()?;
|
if match_all {
|
||||||
Ok(TSSTable::delta_reader(data))
|
let slice = self.file_slice_for_range(key_range, limit);
|
||||||
|
let data = slice.read_bytes()?;
|
||||||
|
Ok(TSSTable::delta_reader(data))
|
||||||
|
} else {
|
||||||
|
// if operations are sync, we assume latency is almost null, and there is no point in
|
||||||
|
// merging accross holes
|
||||||
|
let blocks = self.get_block_iterator_for_range_and_automaton(key_range, automaton, 0);
|
||||||
|
let data = blocks
|
||||||
|
.map(|block_addr| self.sstable_slice.read_bytes_slice(block_addr.byte_range))
|
||||||
|
.collect::<Result<Vec<_>, _>>()?;
|
||||||
|
Ok(DeltaReader::from_multiple_blocks(data))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn sstable_delta_reader_block(
|
pub(crate) fn sstable_delta_reader_block(
|
||||||
@@ -204,6 +238,42 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
|
|||||||
self.sstable_slice.slice((start_bound, end_bound))
|
self.sstable_slice.slice((start_bound, end_bound))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn get_block_iterator_for_range_and_automaton<'a>(
|
||||||
|
&'a self,
|
||||||
|
key_range: impl RangeBounds<[u8]>,
|
||||||
|
automaton: &'a impl Automaton,
|
||||||
|
merge_holes_under_bytes: usize,
|
||||||
|
) -> impl Iterator<Item = BlockAddr> + 'a {
|
||||||
|
let lower_bound = match key_range.start_bound() {
|
||||||
|
Bound::Included(key) | Bound::Excluded(key) => {
|
||||||
|
self.sstable_index.locate_with_key(key).unwrap_or(u64::MAX)
|
||||||
|
}
|
||||||
|
Bound::Unbounded => 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
let upper_bound = match key_range.end_bound() {
|
||||||
|
Bound::Included(key) | Bound::Excluded(key) => {
|
||||||
|
self.sstable_index.locate_with_key(key).unwrap_or(u64::MAX)
|
||||||
|
}
|
||||||
|
Bound::Unbounded => u64::MAX,
|
||||||
|
};
|
||||||
|
let block_range = lower_bound..=upper_bound;
|
||||||
|
self.sstable_index
|
||||||
|
.get_block_for_automaton(automaton)
|
||||||
|
.filter(move |(block_id, _)| block_range.contains(block_id))
|
||||||
|
.map(|(_, block_addr)| block_addr)
|
||||||
|
.coalesce(move |first, second| {
|
||||||
|
if first.byte_range.end + merge_holes_under_bytes >= second.byte_range.start {
|
||||||
|
Ok(BlockAddr {
|
||||||
|
first_ordinal: first.first_ordinal,
|
||||||
|
byte_range: first.byte_range.start..second.byte_range.end,
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
Err((first, second))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
/// Opens a `TermDictionary`.
|
/// Opens a `TermDictionary`.
|
||||||
pub fn open(term_dictionary_file: FileSlice) -> io::Result<Self> {
|
pub fn open(term_dictionary_file: FileSlice) -> io::Result<Self> {
|
||||||
let (main_slice, footer_len_slice) = term_dictionary_file.split_from_end(20);
|
let (main_slice, footer_len_slice) = term_dictionary_file.split_from_end(20);
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ use std::ops::Range;
|
|||||||
|
|
||||||
use merge::ValueMerger;
|
use merge::ValueMerger;
|
||||||
|
|
||||||
|
mod block_match_automaton;
|
||||||
mod delta;
|
mod delta;
|
||||||
mod dictionary;
|
mod dictionary;
|
||||||
pub mod merge;
|
pub mod merge;
|
||||||
|
|||||||
@@ -1,10 +1,12 @@
|
|||||||
use common::OwnedBytes;
|
use common::OwnedBytes;
|
||||||
|
use tantivy_fst::Automaton;
|
||||||
|
|
||||||
|
use crate::block_match_automaton::can_block_match_automaton;
|
||||||
use crate::{BlockAddr, SSTable, SSTableDataCorruption, TermOrdinal};
|
use crate::{BlockAddr, SSTable, SSTableDataCorruption, TermOrdinal};
|
||||||
|
|
||||||
#[derive(Default, Debug, Clone)]
|
#[derive(Default, Debug, Clone)]
|
||||||
pub struct SSTableIndex {
|
pub struct SSTableIndex {
|
||||||
blocks: Vec<BlockMeta>,
|
pub(crate) blocks: Vec<BlockMeta>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SSTableIndex {
|
impl SSTableIndex {
|
||||||
@@ -74,6 +76,31 @@ impl SSTableIndex {
|
|||||||
// locate_with_ord always returns an index within range
|
// locate_with_ord always returns an index within range
|
||||||
self.get_block(self.locate_with_ord(ord)).unwrap()
|
self.get_block(self.locate_with_ord(ord)).unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn get_block_for_automaton<'a>(
|
||||||
|
&'a self,
|
||||||
|
automaton: &'a impl Automaton,
|
||||||
|
) -> impl Iterator<Item = (u64, BlockAddr)> + 'a {
|
||||||
|
std::iter::once((None, &self.blocks[0]))
|
||||||
|
.chain(self.blocks.windows(2).map(|window| {
|
||||||
|
let [prev, curr] = window else {
|
||||||
|
unreachable!();
|
||||||
|
};
|
||||||
|
(Some(&*prev.last_key_or_greater), curr)
|
||||||
|
}))
|
||||||
|
.enumerate()
|
||||||
|
.filter_map(move |(pos, (prev_key, current_block))| {
|
||||||
|
if can_block_match_automaton(
|
||||||
|
prev_key,
|
||||||
|
¤t_block.last_key_or_greater,
|
||||||
|
automaton,
|
||||||
|
) {
|
||||||
|
Some((pos as u64, current_block.block_addr.clone()))
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
@@ -99,3 +126,106 @@ impl SSTable for IndexSSTable {
|
|||||||
|
|
||||||
type ValueWriter = crate::value::index::IndexValueWriter;
|
type ValueWriter = crate::value::index::IndexValueWriter;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use crate::block_match_automaton::tests::EqBuffer;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_get_block_for_automaton() {
|
||||||
|
let sstable = SSTableIndex {
|
||||||
|
blocks: vec![
|
||||||
|
BlockMeta {
|
||||||
|
last_key_or_greater: vec![0, 1, 2],
|
||||||
|
block_addr: BlockAddr {
|
||||||
|
first_ordinal: 0,
|
||||||
|
byte_range: 0..10,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
BlockMeta {
|
||||||
|
last_key_or_greater: vec![0, 2, 2],
|
||||||
|
block_addr: BlockAddr {
|
||||||
|
first_ordinal: 5,
|
||||||
|
byte_range: 10..20,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
BlockMeta {
|
||||||
|
last_key_or_greater: vec![0, 3, 2],
|
||||||
|
block_addr: BlockAddr {
|
||||||
|
first_ordinal: 10,
|
||||||
|
byte_range: 20..30,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
],
|
||||||
|
};
|
||||||
|
|
||||||
|
let res = sstable
|
||||||
|
.get_block_for_automaton(&EqBuffer(vec![0, 1, 1]))
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
assert_eq!(
|
||||||
|
res,
|
||||||
|
vec![(
|
||||||
|
0,
|
||||||
|
BlockAddr {
|
||||||
|
first_ordinal: 0,
|
||||||
|
byte_range: 0..10
|
||||||
|
}
|
||||||
|
)]
|
||||||
|
);
|
||||||
|
let res = sstable
|
||||||
|
.get_block_for_automaton(&EqBuffer(vec![0, 2, 1]))
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
assert_eq!(
|
||||||
|
res,
|
||||||
|
vec![(
|
||||||
|
1,
|
||||||
|
BlockAddr {
|
||||||
|
first_ordinal: 5,
|
||||||
|
byte_range: 10..20
|
||||||
|
}
|
||||||
|
)]
|
||||||
|
);
|
||||||
|
let res = sstable
|
||||||
|
.get_block_for_automaton(&EqBuffer(vec![0, 3, 1]))
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
assert_eq!(
|
||||||
|
res,
|
||||||
|
vec![(
|
||||||
|
2,
|
||||||
|
BlockAddr {
|
||||||
|
first_ordinal: 10,
|
||||||
|
byte_range: 20..30
|
||||||
|
}
|
||||||
|
)]
|
||||||
|
);
|
||||||
|
let res = sstable
|
||||||
|
.get_block_for_automaton(&EqBuffer(vec![0, 4, 1]))
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
assert!(res.is_empty());
|
||||||
|
|
||||||
|
let complex_automaton = EqBuffer(vec![0, 1, 1]).union(EqBuffer(vec![0, 3, 1]));
|
||||||
|
let res = sstable
|
||||||
|
.get_block_for_automaton(&complex_automaton)
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
assert_eq!(
|
||||||
|
res,
|
||||||
|
vec![
|
||||||
|
(
|
||||||
|
0,
|
||||||
|
BlockAddr {
|
||||||
|
first_ordinal: 0,
|
||||||
|
byte_range: 0..10
|
||||||
|
}
|
||||||
|
),
|
||||||
|
(
|
||||||
|
2,
|
||||||
|
BlockAddr {
|
||||||
|
first_ordinal: 10,
|
||||||
|
byte_range: 20..30
|
||||||
|
}
|
||||||
|
)
|
||||||
|
]
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -5,8 +5,9 @@ use std::sync::Arc;
|
|||||||
use common::{BinarySerializable, FixedSize, OwnedBytes};
|
use common::{BinarySerializable, FixedSize, OwnedBytes};
|
||||||
use tantivy_bitpacker::{compute_num_bits, BitPacker};
|
use tantivy_bitpacker::{compute_num_bits, BitPacker};
|
||||||
use tantivy_fst::raw::Fst;
|
use tantivy_fst::raw::Fst;
|
||||||
use tantivy_fst::{IntoStreamer, Map, MapBuilder, Streamer};
|
use tantivy_fst::{Automaton, IntoStreamer, Map, MapBuilder, Streamer};
|
||||||
|
|
||||||
|
use crate::block_match_automaton::can_block_match_automaton;
|
||||||
use crate::{common_prefix_len, SSTableDataCorruption, TermOrdinal};
|
use crate::{common_prefix_len, SSTableDataCorruption, TermOrdinal};
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
@@ -64,6 +65,41 @@ impl SSTableIndex {
|
|||||||
SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block_with_ord(ord),
|
SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block_with_ord(ord),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn get_block_for_automaton<'a>(
|
||||||
|
&'a self,
|
||||||
|
automaton: &'a impl Automaton,
|
||||||
|
) -> impl Iterator<Item = (u64, BlockAddr)> + 'a {
|
||||||
|
match self {
|
||||||
|
SSTableIndex::V2(v2_index) => {
|
||||||
|
BlockIter::V2(v2_index.get_block_for_automaton(automaton))
|
||||||
|
}
|
||||||
|
SSTableIndex::V3(v3_index) => {
|
||||||
|
BlockIter::V3(v3_index.get_block_for_automaton(automaton))
|
||||||
|
}
|
||||||
|
SSTableIndex::V3Empty(v3_empty) => {
|
||||||
|
BlockIter::V3Empty(std::iter::once((0, v3_empty.block_addr.clone())))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
enum BlockIter<V2, V3, T> {
|
||||||
|
V2(V2),
|
||||||
|
V3(V3),
|
||||||
|
V3Empty(std::iter::Once<T>),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<V2: Iterator<Item = T>, V3: Iterator<Item = T>, T> Iterator for BlockIter<V2, V3, T> {
|
||||||
|
type Item = T;
|
||||||
|
|
||||||
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
|
match self {
|
||||||
|
BlockIter::V2(v2) => v2.next(),
|
||||||
|
BlockIter::V3(v3) => v3.next(),
|
||||||
|
BlockIter::V3Empty(once) => once.next(),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
@@ -123,6 +159,59 @@ impl SSTableIndexV3 {
|
|||||||
pub(crate) fn get_block_with_ord(&self, ord: TermOrdinal) -> BlockAddr {
|
pub(crate) fn get_block_with_ord(&self, ord: TermOrdinal) -> BlockAddr {
|
||||||
self.block_addr_store.binary_search_ord(ord).1
|
self.block_addr_store.binary_search_ord(ord).1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn get_block_for_automaton<'a>(
|
||||||
|
&'a self,
|
||||||
|
automaton: &'a impl Automaton,
|
||||||
|
) -> impl Iterator<Item = (u64, BlockAddr)> + 'a {
|
||||||
|
// this is more complicated than other index formats: we don't have a ready made list of
|
||||||
|
// blocks, and instead need to stream-decode the sstable.
|
||||||
|
|
||||||
|
GetBlockForAutomaton {
|
||||||
|
streamer: self.fst_index.stream(),
|
||||||
|
block_addr_store: &self.block_addr_store,
|
||||||
|
prev_key: None,
|
||||||
|
automaton,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO we iterate over the entire Map to find matching blocks,
|
||||||
|
// we could manually iterate on the underlying Fst and skip whole branches if our Automaton says
|
||||||
|
// cannot match. this isn't as bad as it sounds given the fst is a lot smaller than the rest of the
|
||||||
|
// sstable.
|
||||||
|
// To do that, we can't use tantivy_fst's Stream with an automaton, as we need to know 2 consecutive
|
||||||
|
// fst keys to form a proper opinion on whether this is a match, which we wan't translate into a
|
||||||
|
// single automaton
|
||||||
|
struct GetBlockForAutomaton<'a, A: Automaton> {
|
||||||
|
streamer: tantivy_fst::map::Stream<'a>,
|
||||||
|
block_addr_store: &'a BlockAddrStore,
|
||||||
|
prev_key: Option<Vec<u8>>,
|
||||||
|
automaton: &'a A,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<A: Automaton> Iterator for GetBlockForAutomaton<'_, A> {
|
||||||
|
type Item = (u64, BlockAddr);
|
||||||
|
|
||||||
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
|
while let Some((new_key, block_id)) = self.streamer.next() {
|
||||||
|
if let Some(prev_key) = self.prev_key.as_mut() {
|
||||||
|
if can_block_match_automaton(Some(prev_key), new_key, self.automaton) {
|
||||||
|
prev_key.clear();
|
||||||
|
prev_key.extend_from_slice(new_key);
|
||||||
|
return Some((block_id, self.block_addr_store.get(block_id).unwrap()));
|
||||||
|
}
|
||||||
|
prev_key.clear();
|
||||||
|
prev_key.extend_from_slice(new_key);
|
||||||
|
} else {
|
||||||
|
self.prev_key = Some(new_key.to_owned());
|
||||||
|
if can_block_match_automaton(None, new_key, self.automaton) {
|
||||||
|
return Some((block_id, self.block_addr_store.get(block_id).unwrap()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
@@ -734,7 +823,8 @@ fn find_best_slope(elements: impl Iterator<Item = (usize, u64)> + Clone) -> (u32
|
|||||||
mod tests {
|
mod tests {
|
||||||
use common::OwnedBytes;
|
use common::OwnedBytes;
|
||||||
|
|
||||||
use super::{BlockAddr, SSTableIndexBuilder, SSTableIndexV3};
|
use super::*;
|
||||||
|
use crate::block_match_automaton::tests::EqBuffer;
|
||||||
use crate::SSTableDataCorruption;
|
use crate::SSTableDataCorruption;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@@ -823,4 +913,108 @@ mod tests {
|
|||||||
(12345, 1)
|
(12345, 1)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_get_block_for_automaton() {
|
||||||
|
let sstable_index_builder = SSTableIndexBuilder {
|
||||||
|
blocks: vec![
|
||||||
|
BlockMeta {
|
||||||
|
last_key_or_greater: vec![0, 1, 2],
|
||||||
|
block_addr: BlockAddr {
|
||||||
|
first_ordinal: 0,
|
||||||
|
byte_range: 0..10,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
BlockMeta {
|
||||||
|
last_key_or_greater: vec![0, 2, 2],
|
||||||
|
block_addr: BlockAddr {
|
||||||
|
first_ordinal: 5,
|
||||||
|
byte_range: 10..20,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
BlockMeta {
|
||||||
|
last_key_or_greater: vec![0, 3, 2],
|
||||||
|
block_addr: BlockAddr {
|
||||||
|
first_ordinal: 10,
|
||||||
|
byte_range: 20..30,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
],
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut sstable_index_bytes = Vec::new();
|
||||||
|
let fst_len = sstable_index_builder
|
||||||
|
.serialize(&mut sstable_index_bytes)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let sstable = SSTableIndexV3::load(OwnedBytes::new(sstable_index_bytes), fst_len).unwrap();
|
||||||
|
|
||||||
|
let res = sstable
|
||||||
|
.get_block_for_automaton(&EqBuffer(vec![0, 1, 1]))
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
assert_eq!(
|
||||||
|
res,
|
||||||
|
vec![(
|
||||||
|
0,
|
||||||
|
BlockAddr {
|
||||||
|
first_ordinal: 0,
|
||||||
|
byte_range: 0..10
|
||||||
|
}
|
||||||
|
)]
|
||||||
|
);
|
||||||
|
let res = sstable
|
||||||
|
.get_block_for_automaton(&EqBuffer(vec![0, 2, 1]))
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
assert_eq!(
|
||||||
|
res,
|
||||||
|
vec![(
|
||||||
|
1,
|
||||||
|
BlockAddr {
|
||||||
|
first_ordinal: 5,
|
||||||
|
byte_range: 10..20
|
||||||
|
}
|
||||||
|
)]
|
||||||
|
);
|
||||||
|
let res = sstable
|
||||||
|
.get_block_for_automaton(&EqBuffer(vec![0, 3, 1]))
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
assert_eq!(
|
||||||
|
res,
|
||||||
|
vec![(
|
||||||
|
2,
|
||||||
|
BlockAddr {
|
||||||
|
first_ordinal: 10,
|
||||||
|
byte_range: 20..30
|
||||||
|
}
|
||||||
|
)]
|
||||||
|
);
|
||||||
|
let res = sstable
|
||||||
|
.get_block_for_automaton(&EqBuffer(vec![0, 4, 1]))
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
assert!(res.is_empty());
|
||||||
|
|
||||||
|
let complex_automaton = EqBuffer(vec![0, 1, 1]).union(EqBuffer(vec![0, 3, 1]));
|
||||||
|
let res = sstable
|
||||||
|
.get_block_for_automaton(&complex_automaton)
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
assert_eq!(
|
||||||
|
res,
|
||||||
|
vec![
|
||||||
|
(
|
||||||
|
0,
|
||||||
|
BlockAddr {
|
||||||
|
first_ordinal: 0,
|
||||||
|
byte_range: 0..10
|
||||||
|
}
|
||||||
|
),
|
||||||
|
(
|
||||||
|
2,
|
||||||
|
BlockAddr {
|
||||||
|
first_ordinal: 10,
|
||||||
|
byte_range: 20..30
|
||||||
|
}
|
||||||
|
)
|
||||||
|
]
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -86,16 +86,24 @@ where
|
|||||||
bound_as_byte_slice(&self.upper),
|
bound_as_byte_slice(&self.upper),
|
||||||
);
|
);
|
||||||
self.term_dict
|
self.term_dict
|
||||||
.sstable_delta_reader_for_key_range(key_range, self.limit)
|
.sstable_delta_reader_for_key_range(key_range, self.limit, &self.automaton)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn delta_reader_async(&self) -> io::Result<DeltaReader<TSSTable::ValueReader>> {
|
async fn delta_reader_async(
|
||||||
|
&self,
|
||||||
|
merge_holes_under_bytes: usize,
|
||||||
|
) -> io::Result<DeltaReader<TSSTable::ValueReader>> {
|
||||||
let key_range = (
|
let key_range = (
|
||||||
bound_as_byte_slice(&self.lower),
|
bound_as_byte_slice(&self.lower),
|
||||||
bound_as_byte_slice(&self.upper),
|
bound_as_byte_slice(&self.upper),
|
||||||
);
|
);
|
||||||
self.term_dict
|
self.term_dict
|
||||||
.sstable_delta_reader_for_key_range_async(key_range, self.limit)
|
.sstable_delta_reader_for_key_range_async(
|
||||||
|
key_range,
|
||||||
|
self.limit,
|
||||||
|
&self.automaton,
|
||||||
|
merge_holes_under_bytes,
|
||||||
|
)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -130,7 +138,16 @@ where
|
|||||||
|
|
||||||
/// See `into_stream(..)`
|
/// See `into_stream(..)`
|
||||||
pub async fn into_stream_async(self) -> io::Result<Streamer<'a, TSSTable, A>> {
|
pub async fn into_stream_async(self) -> io::Result<Streamer<'a, TSSTable, A>> {
|
||||||
let delta_reader = self.delta_reader_async().await?;
|
self.into_stream_async_merging_holes(0).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Same as `into_stream_async`, but tries to issue a single io operation when requesting
|
||||||
|
/// blocks that are not consecutive, but also less than `merge_holes_under_bytes` bytes appart.
|
||||||
|
pub async fn into_stream_async_merging_holes(
|
||||||
|
self,
|
||||||
|
merge_holes_under_bytes: usize,
|
||||||
|
) -> io::Result<Streamer<'a, TSSTable, A>> {
|
||||||
|
let delta_reader = self.delta_reader_async(merge_holes_under_bytes).await?;
|
||||||
self.into_stream_given_delta_reader(delta_reader)
|
self.into_stream_given_delta_reader(delta_reader)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -327,4 +344,7 @@ mod tests {
|
|||||||
assert!(!term_streamer.advance());
|
assert!(!term_streamer.advance());
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO add test for sparse search with a block of poison (starts with 0xffffffff) => such a
|
||||||
|
// block instantly causes an unexpected EOF error
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user