diff --git a/columnar/src/columnar/reader/mod.rs b/columnar/src/columnar/reader/mod.rs index f850e4f6f..832fdb7da 100644 --- a/columnar/src/columnar/reader/mod.rs +++ b/columnar/src/columnar/reader/mod.rs @@ -170,7 +170,7 @@ impl ColumnarReader { ) -> io::Result> { let stream = self .stream_for_column_range(column_name) - .into_stream_async() + .into_stream_async(0) .await?; read_all_columns_in_stream(stream, &self.column_data, self.format_version) } diff --git a/src/index/inverted_index_reader.rs b/src/index/inverted_index_reader.rs index 30685eaa4..5781f9df3 100644 --- a/src/index/inverted_index_reader.rs +++ b/src/index/inverted_index_reader.rs @@ -3,6 +3,12 @@ use std::io; use common::json_path_writer::JSON_END_OF_PATH; use common::BinarySerializable; use fnv::FnvHashSet; +#[cfg(feature = "quickwit")] +use futures_util::{FutureExt, StreamExt, TryStreamExt}; +#[cfg(feature = "quickwit")] +use itertools::Itertools; +#[cfg(feature = "quickwit")] +use tantivy_fst::automaton::{AlwaysMatch, Automaton}; use crate::directory::FileSlice; use crate::positions::PositionReader; @@ -219,13 +225,18 @@ impl InvertedIndexReader { self.termdict.get_async(term.serialized_value_bytes()).await } - async fn get_term_range_async( - &self, + async fn get_term_range_async<'a, A: Automaton + 'a>( + &'a self, terms: impl std::ops::RangeBounds, + automaton: A, limit: Option, - ) -> io::Result + '_> { + merge_holes_under: usize, + ) -> io::Result + 'a> + where + A::State: Clone, + { use std::ops::Bound; - let range_builder = self.termdict.range(); + let range_builder = self.termdict.search(automaton); let range_builder = match terms.start_bound() { Bound::Included(bound) => range_builder.ge(bound.serialized_value_bytes()), Bound::Excluded(bound) => range_builder.gt(bound.serialized_value_bytes()), @@ -242,7 +253,7 @@ impl InvertedIndexReader { range_builder }; - let mut stream = range_builder.into_stream_async().await?; + let mut stream = range_builder.into_stream_async(merge_holes_under).await?; let iter = std::iter::from_fn(move || stream.next().map(|(_k, v)| v.clone())); @@ -288,7 +299,9 @@ impl InvertedIndexReader { limit: Option, with_positions: bool, ) -> io::Result { - let mut term_info = self.get_term_range_async(terms, limit).await?; + let mut term_info = self + .get_term_range_async(terms, AlwaysMatch, limit, 0) + .await?; let Some(first_terminfo) = term_info.next() else { // no key matches, nothing more to load @@ -315,6 +328,55 @@ impl InvertedIndexReader { Ok(true) } + /// Warmup a block postings given a range of `Term`s. + /// This method is for an advanced usage only. + /// + /// returns a boolean, whether a term matching the range was found in the dictionary + pub async fn warm_postings_automaton( + &self, + automaton: A, + // with_positions: bool, at the moment we have no use for it, and supporting it would add + // complexity to the coalesce + ) -> io::Result + where + A::State: Clone, + { + // merge holes under 4MiB, that's how many bytes we can hope to receive during a TTFB from + // S3 (~80MiB/s, and 50ms latency) + let merge_holes_under = (80 * 1024 * 1024 * 50) / 1000; + // we build a first iterator to download everything. Simply calling the function already + // loads everything, but doesn't start iterating over the sstable. + let mut _term_info = self + .get_term_range_async(.., automaton.clone(), None, merge_holes_under) + .await?; + // we build a 2nd iterator, this one with no holes, so we don't go through blocks we can't + // match, and just download them to reduce our query count. This makes the assumption + // there is a caching layer below, which might not always be true, but is in Quickwit. + let term_info = self.get_term_range_async(.., automaton, None, 0).await?; + + let range_to_load = term_info + .map(|term_info| term_info.postings_range) + .coalesce(|range1, range2| { + if range1.end + merge_holes_under >= range2.start { + Ok(range1.start..range2.end) + } else { + Err((range1, range2)) + } + }); + + let slices_downloaded = futures_util::stream::iter(range_to_load) + .map(|posting_slice| { + self.postings_file_slice + .read_bytes_slice_async(posting_slice) + .map(|result| result.map(|_slice| ())) + }) + .buffer_unordered(5) + .try_collect::>() + .await?; + + Ok(!slices_downloaded.is_empty()) + } + /// Warmup the block postings for all terms. /// This method is for an advanced usage only. /// diff --git a/sstable/src/dictionary.rs b/sstable/src/dictionary.rs index b98513788..b351d64af 100644 --- a/sstable/src/dictionary.rs +++ b/sstable/src/dictionary.rs @@ -101,6 +101,7 @@ impl Dictionary { key_range: impl RangeBounds<[u8]>, limit: Option, automaton: &impl Automaton, + merge_holes_under: usize, ) -> io::Result> { let match_all = automaton.will_always_match(&automaton.start()); if match_all { @@ -108,8 +109,11 @@ impl Dictionary { let data = slice.read_bytes_async().await?; Ok(TSSTable::delta_reader(data)) } else { - let blocks = - stream::iter(self.get_block_iterator_for_range_and_automaton(key_range, automaton)); + let blocks = stream::iter(self.get_block_iterator_for_range_and_automaton( + key_range, + automaton, + merge_holes_under, + )); let data = blocks .map(|block_addr| { self.sstable_slice @@ -134,7 +138,9 @@ impl Dictionary { let data = slice.read_bytes()?; Ok(TSSTable::delta_reader(data)) } else { - let blocks = self.get_block_iterator_for_range_and_automaton(key_range, automaton); + // if operations are sync, we assume latency is almost null, and there is no point in + // merging accross holes + let blocks = self.get_block_iterator_for_range_and_automaton(key_range, automaton, 0); let data = blocks .map(|block_addr| self.sstable_slice.read_bytes_slice(block_addr.byte_range)) .collect::, _>>()?; @@ -236,6 +242,7 @@ impl Dictionary { &'a self, key_range: impl RangeBounds<[u8]>, automaton: &'a impl Automaton, + merge_holes_under: usize, ) -> impl Iterator + 'a { let lower_bound = match key_range.start_bound() { Bound::Included(key) | Bound::Excluded(key) => { @@ -255,8 +262,8 @@ impl Dictionary { .get_block_for_automaton(automaton) .filter(move |(block_id, _)| block_range.contains(block_id)) .map(|(_, block_addr)| block_addr) - .coalesce(|first, second| { - if first.byte_range.end == second.byte_range.start { + .coalesce(move |first, second| { + if first.byte_range.end + merge_holes_under >= second.byte_range.start { Ok(BlockAddr { first_ordinal: first.first_ordinal, byte_range: first.byte_range.start..second.byte_range.end, diff --git a/sstable/src/streamer.rs b/sstable/src/streamer.rs index ca5206fb9..e1c34a538 100644 --- a/sstable/src/streamer.rs +++ b/sstable/src/streamer.rs @@ -89,13 +89,21 @@ where .sstable_delta_reader_for_key_range(key_range, self.limit, &self.automaton) } - async fn delta_reader_async(&self) -> io::Result> { + async fn delta_reader_async( + &self, + merge_holes_under: usize, + ) -> io::Result> { let key_range = ( bound_as_byte_slice(&self.lower), bound_as_byte_slice(&self.upper), ); self.term_dict - .sstable_delta_reader_for_key_range_async(key_range, self.limit, &self.automaton) + .sstable_delta_reader_for_key_range_async( + key_range, + self.limit, + &self.automaton, + merge_holes_under, + ) .await } @@ -129,8 +137,11 @@ where } /// See `into_stream(..)` - pub async fn into_stream_async(self) -> io::Result> { - let delta_reader = self.delta_reader_async().await?; + pub async fn into_stream_async( + self, + merge_holes_undex: usize, + ) -> io::Result> { + let delta_reader = self.delta_reader_async(merge_holes_undex).await?; self.into_stream_given_delta_reader(delta_reader) }