allow warming up automaton

This commit is contained in:
trinity-1686a
2024-09-14 13:58:34 +02:00
parent 9e2ddec4b3
commit 24c5dc2398
4 changed files with 96 additions and 16 deletions

View File

@@ -170,7 +170,7 @@ impl ColumnarReader {
) -> io::Result<Vec<DynamicColumnHandle>> {
let stream = self
.stream_for_column_range(column_name)
.into_stream_async()
.into_stream_async(0)
.await?;
read_all_columns_in_stream(stream, &self.column_data, self.format_version)
}

View File

@@ -3,6 +3,12 @@ use std::io;
use common::json_path_writer::JSON_END_OF_PATH;
use common::BinarySerializable;
use fnv::FnvHashSet;
#[cfg(feature = "quickwit")]
use futures_util::{FutureExt, StreamExt, TryStreamExt};
#[cfg(feature = "quickwit")]
use itertools::Itertools;
#[cfg(feature = "quickwit")]
use tantivy_fst::automaton::{AlwaysMatch, Automaton};
use crate::directory::FileSlice;
use crate::positions::PositionReader;
@@ -219,13 +225,18 @@ impl InvertedIndexReader {
self.termdict.get_async(term.serialized_value_bytes()).await
}
async fn get_term_range_async(
&self,
async fn get_term_range_async<'a, A: Automaton + 'a>(
&'a self,
terms: impl std::ops::RangeBounds<Term>,
automaton: A,
limit: Option<u64>,
) -> io::Result<impl Iterator<Item = TermInfo> + '_> {
merge_holes_under: usize,
) -> io::Result<impl Iterator<Item = TermInfo> + 'a>
where
A::State: Clone,
{
use std::ops::Bound;
let range_builder = self.termdict.range();
let range_builder = self.termdict.search(automaton);
let range_builder = match terms.start_bound() {
Bound::Included(bound) => range_builder.ge(bound.serialized_value_bytes()),
Bound::Excluded(bound) => range_builder.gt(bound.serialized_value_bytes()),
@@ -242,7 +253,7 @@ impl InvertedIndexReader {
range_builder
};
let mut stream = range_builder.into_stream_async().await?;
let mut stream = range_builder.into_stream_async(merge_holes_under).await?;
let iter = std::iter::from_fn(move || stream.next().map(|(_k, v)| v.clone()));
@@ -288,7 +299,9 @@ impl InvertedIndexReader {
limit: Option<u64>,
with_positions: bool,
) -> io::Result<bool> {
let mut term_info = self.get_term_range_async(terms, limit).await?;
let mut term_info = self
.get_term_range_async(terms, AlwaysMatch, limit, 0)
.await?;
let Some(first_terminfo) = term_info.next() else {
// no key matches, nothing more to load
@@ -315,6 +328,55 @@ impl InvertedIndexReader {
Ok(true)
}
/// Warmup a block postings given a range of `Term`s.
/// This method is for an advanced usage only.
///
/// returns a boolean, whether a term matching the range was found in the dictionary
pub async fn warm_postings_automaton<A: Automaton + Clone>(
&self,
automaton: A,
// with_positions: bool, at the moment we have no use for it, and supporting it would add
// complexity to the coalesce
) -> io::Result<bool>
where
A::State: Clone,
{
// merge holes under 4MiB, that's how many bytes we can hope to receive during a TTFB from
// S3 (~80MiB/s, and 50ms latency)
let merge_holes_under = (80 * 1024 * 1024 * 50) / 1000;
// we build a first iterator to download everything. Simply calling the function already
// loads everything, but doesn't start iterating over the sstable.
let mut _term_info = self
.get_term_range_async(.., automaton.clone(), None, merge_holes_under)
.await?;
// we build a 2nd iterator, this one with no holes, so we don't go through blocks we can't
// match, and just download them to reduce our query count. This makes the assumption
// there is a caching layer below, which might not always be true, but is in Quickwit.
let term_info = self.get_term_range_async(.., automaton, None, 0).await?;
let range_to_load = term_info
.map(|term_info| term_info.postings_range)
.coalesce(|range1, range2| {
if range1.end + merge_holes_under >= range2.start {
Ok(range1.start..range2.end)
} else {
Err((range1, range2))
}
});
let slices_downloaded = futures_util::stream::iter(range_to_load)
.map(|posting_slice| {
self.postings_file_slice
.read_bytes_slice_async(posting_slice)
.map(|result| result.map(|_slice| ()))
})
.buffer_unordered(5)
.try_collect::<Vec<()>>()
.await?;
Ok(!slices_downloaded.is_empty())
}
/// Warmup the block postings for all terms.
/// This method is for an advanced usage only.
///

View File

@@ -101,6 +101,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
key_range: impl RangeBounds<[u8]>,
limit: Option<u64>,
automaton: &impl Automaton,
merge_holes_under: usize,
) -> io::Result<DeltaReader<TSSTable::ValueReader>> {
let match_all = automaton.will_always_match(&automaton.start());
if match_all {
@@ -108,8 +109,11 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
let data = slice.read_bytes_async().await?;
Ok(TSSTable::delta_reader(data))
} else {
let blocks =
stream::iter(self.get_block_iterator_for_range_and_automaton(key_range, automaton));
let blocks = stream::iter(self.get_block_iterator_for_range_and_automaton(
key_range,
automaton,
merge_holes_under,
));
let data = blocks
.map(|block_addr| {
self.sstable_slice
@@ -134,7 +138,9 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
let data = slice.read_bytes()?;
Ok(TSSTable::delta_reader(data))
} else {
let blocks = self.get_block_iterator_for_range_and_automaton(key_range, automaton);
// if operations are sync, we assume latency is almost null, and there is no point in
// merging accross holes
let blocks = self.get_block_iterator_for_range_and_automaton(key_range, automaton, 0);
let data = blocks
.map(|block_addr| self.sstable_slice.read_bytes_slice(block_addr.byte_range))
.collect::<Result<Vec<_>, _>>()?;
@@ -236,6 +242,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
&'a self,
key_range: impl RangeBounds<[u8]>,
automaton: &'a impl Automaton,
merge_holes_under: usize,
) -> impl Iterator<Item = BlockAddr> + 'a {
let lower_bound = match key_range.start_bound() {
Bound::Included(key) | Bound::Excluded(key) => {
@@ -255,8 +262,8 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
.get_block_for_automaton(automaton)
.filter(move |(block_id, _)| block_range.contains(block_id))
.map(|(_, block_addr)| block_addr)
.coalesce(|first, second| {
if first.byte_range.end == second.byte_range.start {
.coalesce(move |first, second| {
if first.byte_range.end + merge_holes_under >= second.byte_range.start {
Ok(BlockAddr {
first_ordinal: first.first_ordinal,
byte_range: first.byte_range.start..second.byte_range.end,

View File

@@ -89,13 +89,21 @@ where
.sstable_delta_reader_for_key_range(key_range, self.limit, &self.automaton)
}
async fn delta_reader_async(&self) -> io::Result<DeltaReader<TSSTable::ValueReader>> {
async fn delta_reader_async(
&self,
merge_holes_under: usize,
) -> io::Result<DeltaReader<TSSTable::ValueReader>> {
let key_range = (
bound_as_byte_slice(&self.lower),
bound_as_byte_slice(&self.upper),
);
self.term_dict
.sstable_delta_reader_for_key_range_async(key_range, self.limit, &self.automaton)
.sstable_delta_reader_for_key_range_async(
key_range,
self.limit,
&self.automaton,
merge_holes_under,
)
.await
}
@@ -129,8 +137,11 @@ where
}
/// See `into_stream(..)`
pub async fn into_stream_async(self) -> io::Result<Streamer<'a, TSSTable, A>> {
let delta_reader = self.delta_reader_async().await?;
pub async fn into_stream_async(
self,
merge_holes_undex: usize,
) -> io::Result<Streamer<'a, TSSTable, A>> {
let delta_reader = self.delta_reader_async(merge_holes_undex).await?;
self.into_stream_given_delta_reader(delta_reader)
}