optimise warmup code path (#2007)

* optimise warmup code path

* better function naming
This commit is contained in:
trinity-1686a
2023-04-21 11:23:09 +02:00
committed by GitHub
parent 74f9eafefc
commit 9c93bfeb51
3 changed files with 62 additions and 42 deletions

View File

@@ -54,6 +54,7 @@ itertools = "0.10.3"
measure_time = "0.8.2"
async-trait = "0.1.53"
arc-swap = "1.5.0"
futures = "0.3.28"
columnar = { version="0.1", path="./columnar", package ="tantivy-columnar" }
sstable = { version="0.1", path="./sstable", package ="tantivy-sstable", optional = true }

View File

@@ -213,13 +213,16 @@ impl InvertedIndexReader {
pub async fn warm_postings(&self, term: &Term, with_positions: bool) -> io::Result<()> {
let term_info_opt: Option<TermInfo> = self.get_term_info_async(term).await?;
if let Some(term_info) = term_info_opt {
self.postings_file_slice
.read_bytes_slice_async(term_info.postings_range.clone())
.await?;
let postings = self
.postings_file_slice
.read_bytes_slice_async(term_info.postings_range.clone());
if with_positions {
self.positions_file_slice
.read_bytes_slice_async(term_info.positions_range.clone())
.await?;
let positions = self
.positions_file_slice
.read_bytes_slice_async(term_info.positions_range.clone());
futures::future::try_join(postings, positions).await?;
} else {
postings.await?;
}
}
Ok(())

View File

@@ -66,17 +66,6 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
Ok(TSSTable::reader(data))
}
pub(crate) async fn sstable_reader_block_async(
&self,
block_addr: BlockAddr,
) -> io::Result<Reader<TSSTable::ValueReader>> {
let data = self
.sstable_slice
.read_bytes_slice_async(block_addr.byte_range)
.await?;
Ok(TSSTable::reader(data))
}
pub(crate) async fn sstable_delta_reader_for_key_range_async(
&self,
key_range: impl RangeBounds<[u8]>,
@@ -105,6 +94,17 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
Ok(TSSTable::delta_reader(data))
}
pub(crate) async fn sstable_delta_reader_block_async(
&self,
block_addr: BlockAddr,
) -> io::Result<DeltaReader<TSSTable::ValueReader>> {
let data = self
.sstable_slice
.read_bytes_slice_async(block_addr.byte_range)
.await?;
Ok(TSSTable::delta_reader(data))
}
/// This function returns a file slice covering a set of sstable blocks
/// that include the key range passed in arguments. Optionally returns
/// only block for up to `limit` matching terms.
@@ -228,17 +228,19 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
self.num_terms as usize
}
/// Returns the ordinal associated with a given term.
pub fn term_ord<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TermOrdinal>> {
/// Decode a DeltaReader up to key, returning the number of terms traversed
///
/// If the key was not found, returns Ok(None).
/// After calling this function, it is possible to call `DeltaReader::value` to get the
/// associated value.
fn decode_up_to_key<K: AsRef<[u8]>>(
&self,
key: K,
sstable_delta_reader: &mut DeltaReader<TSSTable::ValueReader>,
) -> io::Result<Option<TermOrdinal>> {
let mut term_ord = 0;
let key_bytes = key.as_ref();
let Some(block_addr) = self.sstable_index.get_block_with_key(key_bytes) else {
return Ok(None);
};
let mut term_ord = block_addr.first_ordinal;
let mut ok_bytes = 0;
let mut sstable_delta_reader = self.sstable_delta_reader_block(block_addr)?;
while sstable_delta_reader.advance()? {
let prefix_len = sstable_delta_reader.common_prefix_len();
let suffix = sstable_delta_reader.suffix();
@@ -277,6 +279,20 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
Ok(None)
}
/// Returns the ordinal associated with a given term.
pub fn term_ord<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TermOrdinal>> {
let key_bytes = key.as_ref();
let Some(block_addr) = self.sstable_index.get_block_with_key(key_bytes) else {
return Ok(None);
};
let first_ordinal = block_addr.first_ordinal;
let mut sstable_delta_reader = self.sstable_delta_reader_block(block_addr)?;
self.decode_up_to_key(key_bytes, &mut sstable_delta_reader)
.map(|opt| opt.map(|ord| ord + first_ordinal))
}
/// Returns the term associated with a given term ordinal.
///
/// Term ordinals are defined as the position of the term in
@@ -322,14 +338,8 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
/// Lookups the value corresponding to the key.
pub fn get<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TSSTable::Value>> {
if let Some(block_addr) = self.sstable_index.get_block_with_key(key.as_ref()) {
let mut sstable_reader = self.sstable_reader_block(block_addr)?;
let key_bytes = key.as_ref();
while sstable_reader.advance()? {
if sstable_reader.key() == key_bytes {
let value = sstable_reader.value().clone();
return Ok(Some(value));
}
}
let sstable_reader = self.sstable_delta_reader_block(block_addr)?;
return self.do_get(key, sstable_reader);
}
Ok(None)
}
@@ -337,18 +347,24 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
/// Lookups the value corresponding to the key.
pub async fn get_async<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TSSTable::Value>> {
if let Some(block_addr) = self.sstable_index.get_block_with_key(key.as_ref()) {
let mut sstable_reader = self.sstable_reader_block_async(block_addr).await?;
let key_bytes = key.as_ref();
while sstable_reader.advance()? {
if sstable_reader.key() == key_bytes {
let value = sstable_reader.value().clone();
return Ok(Some(value));
}
}
let sstable_reader = self.sstable_delta_reader_block_async(block_addr).await?;
return self.do_get(key, sstable_reader);
}
Ok(None)
}
fn do_get<K: AsRef<[u8]>>(
&self,
key: K,
mut reader: DeltaReader<TSSTable::ValueReader>,
) -> io::Result<Option<TSSTable::Value>> {
if let Some(_ord) = self.decode_up_to_key(key, &mut reader)? {
Ok(Some(reader.value().clone()))
} else {
Ok(None)
}
}
/// Returns a range builder, to stream all of the terms
/// within an interval.
pub fn range(&self) -> StreamerBuilder<TSSTable> {