From 9c93bfeb516a3d3827785e4fc8d7358753bdced4 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Fri, 21 Apr 2023 11:23:09 +0200 Subject: [PATCH] optimise warmup code path (#2007) * optimise warmup code path * better function naming --- Cargo.toml | 1 + src/core/inverted_index_reader.rs | 15 +++--- sstable/src/dictionary.rs | 88 ++++++++++++++++++------------- 3 files changed, 62 insertions(+), 42 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7ecba5302..66b4029c3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,6 +54,7 @@ itertools = "0.10.3" measure_time = "0.8.2" async-trait = "0.1.53" arc-swap = "1.5.0" +futures = "0.3.28" columnar = { version="0.1", path="./columnar", package ="tantivy-columnar" } sstable = { version="0.1", path="./sstable", package ="tantivy-sstable", optional = true } diff --git a/src/core/inverted_index_reader.rs b/src/core/inverted_index_reader.rs index 87773c1f7..2d0dd120b 100644 --- a/src/core/inverted_index_reader.rs +++ b/src/core/inverted_index_reader.rs @@ -213,13 +213,16 @@ impl InvertedIndexReader { pub async fn warm_postings(&self, term: &Term, with_positions: bool) -> io::Result<()> { let term_info_opt: Option = self.get_term_info_async(term).await?; if let Some(term_info) = term_info_opt { - self.postings_file_slice - .read_bytes_slice_async(term_info.postings_range.clone()) - .await?; + let postings = self + .postings_file_slice + .read_bytes_slice_async(term_info.postings_range.clone()); if with_positions { - self.positions_file_slice - .read_bytes_slice_async(term_info.positions_range.clone()) - .await?; + let positions = self + .positions_file_slice + .read_bytes_slice_async(term_info.positions_range.clone()); + futures::future::try_join(postings, positions).await?; + } else { + postings.await?; } } Ok(()) diff --git a/sstable/src/dictionary.rs b/sstable/src/dictionary.rs index ef4ab25ee..bbc733eb4 100644 --- a/sstable/src/dictionary.rs +++ b/sstable/src/dictionary.rs @@ -66,17 +66,6 @@ impl Dictionary { Ok(TSSTable::reader(data)) } - pub(crate) async fn sstable_reader_block_async( - &self, - block_addr: BlockAddr, - ) -> io::Result> { - let data = self - .sstable_slice - .read_bytes_slice_async(block_addr.byte_range) - .await?; - Ok(TSSTable::reader(data)) - } - pub(crate) async fn sstable_delta_reader_for_key_range_async( &self, key_range: impl RangeBounds<[u8]>, @@ -105,6 +94,17 @@ impl Dictionary { Ok(TSSTable::delta_reader(data)) } + pub(crate) async fn sstable_delta_reader_block_async( + &self, + block_addr: BlockAddr, + ) -> io::Result> { + let data = self + .sstable_slice + .read_bytes_slice_async(block_addr.byte_range) + .await?; + Ok(TSSTable::delta_reader(data)) + } + /// This function returns a file slice covering a set of sstable blocks /// that include the key range passed in arguments. Optionally returns /// only block for up to `limit` matching terms. @@ -228,17 +228,19 @@ impl Dictionary { self.num_terms as usize } - /// Returns the ordinal associated with a given term. - pub fn term_ord>(&self, key: K) -> io::Result> { + /// Decode a DeltaReader up to key, returning the number of terms traversed + /// + /// If the key was not found, returns Ok(None). + /// After calling this function, it is possible to call `DeltaReader::value` to get the + /// associated value. + fn decode_up_to_key>( + &self, + key: K, + sstable_delta_reader: &mut DeltaReader, + ) -> io::Result> { + let mut term_ord = 0; let key_bytes = key.as_ref(); - - let Some(block_addr) = self.sstable_index.get_block_with_key(key_bytes) else { - return Ok(None); - }; - - let mut term_ord = block_addr.first_ordinal; let mut ok_bytes = 0; - let mut sstable_delta_reader = self.sstable_delta_reader_block(block_addr)?; while sstable_delta_reader.advance()? { let prefix_len = sstable_delta_reader.common_prefix_len(); let suffix = sstable_delta_reader.suffix(); @@ -277,6 +279,20 @@ impl Dictionary { Ok(None) } + /// Returns the ordinal associated with a given term. + pub fn term_ord>(&self, key: K) -> io::Result> { + let key_bytes = key.as_ref(); + + let Some(block_addr) = self.sstable_index.get_block_with_key(key_bytes) else { + return Ok(None); + }; + + let first_ordinal = block_addr.first_ordinal; + let mut sstable_delta_reader = self.sstable_delta_reader_block(block_addr)?; + self.decode_up_to_key(key_bytes, &mut sstable_delta_reader) + .map(|opt| opt.map(|ord| ord + first_ordinal)) + } + /// Returns the term associated with a given term ordinal. /// /// Term ordinals are defined as the position of the term in @@ -322,14 +338,8 @@ impl Dictionary { /// Lookups the value corresponding to the key. pub fn get>(&self, key: K) -> io::Result> { if let Some(block_addr) = self.sstable_index.get_block_with_key(key.as_ref()) { - let mut sstable_reader = self.sstable_reader_block(block_addr)?; - let key_bytes = key.as_ref(); - while sstable_reader.advance()? { - if sstable_reader.key() == key_bytes { - let value = sstable_reader.value().clone(); - return Ok(Some(value)); - } - } + let sstable_reader = self.sstable_delta_reader_block(block_addr)?; + return self.do_get(key, sstable_reader); } Ok(None) } @@ -337,18 +347,24 @@ impl Dictionary { /// Lookups the value corresponding to the key. pub async fn get_async>(&self, key: K) -> io::Result> { if let Some(block_addr) = self.sstable_index.get_block_with_key(key.as_ref()) { - let mut sstable_reader = self.sstable_reader_block_async(block_addr).await?; - let key_bytes = key.as_ref(); - while sstable_reader.advance()? { - if sstable_reader.key() == key_bytes { - let value = sstable_reader.value().clone(); - return Ok(Some(value)); - } - } + let sstable_reader = self.sstable_delta_reader_block_async(block_addr).await?; + return self.do_get(key, sstable_reader); } Ok(None) } + fn do_get>( + &self, + key: K, + mut reader: DeltaReader, + ) -> io::Result> { + if let Some(_ord) = self.decode_up_to_key(key, &mut reader)? { + Ok(Some(reader.value().clone())) + } else { + Ok(None) + } + } + /// Returns a range builder, to stream all of the terms /// within an interval. pub fn range(&self) -> StreamerBuilder {