Opportunistically seed forked block caches from current one.

This commit is contained in:
Adam Reichold
2023-12-11 11:32:39 +01:00
parent 0361a1edaa
commit d1177fe22f
2 changed files with 16 additions and 4 deletions

View File

@@ -188,10 +188,11 @@ impl Searcher {
let futures = groups
.into_iter()
.map(|((segment_ord, _cache_key), doc_ids)| {
.map(|((segment_ord, cache_key), doc_ids)| {
// Each group fetches documents from exactly one block and
// therefore gets an independent block cache of size one.
let store_reader = self.inner.store_readers[segment_ord as usize].fork_cache(1);
let store_reader =
self.inner.store_readers[segment_ord as usize].fork_cache(1, &[cache_key]);
async move {
let mut docs = Vec::new();

View File

@@ -148,15 +148,26 @@ impl StoreReader {
}
/// Clones the given store reader with an independent block cache of the given size.
///
/// `cache_keys` is used to seed the forked cache from the current cache
/// if some blocks are already available.
#[cfg(feature = "quickwit")]
pub(crate) fn fork_cache(&self, cache_num_blocks: usize) -> Self {
Self {
pub(crate) fn fork_cache(&self, cache_num_blocks: usize, cache_keys: &[CacheKey]) -> Self {
let forked = Self {
decompressor: self.decompressor,
data: self.data.clone(),
cache: BlockCache::new(cache_num_blocks),
skip_index: Arc::clone(&self.skip_index),
space_usage: self.space_usage.clone(),
};
for &CacheKey(pos) in cache_keys {
if let Some(block) = self.cache.get_from_cache(pos) {
forked.cache.put_into_cache(pos, block);
}
}
forked
}
pub(crate) fn block_checkpoints(&self) -> impl Iterator<Item = Checkpoint> + '_ {