mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-04 16:22:55 +00:00
Cache store reader blocks in an LRU fashion
This commit is contained in:
@@ -47,6 +47,7 @@ murmurhash32 = "0.2"
|
||||
chrono = "0.4"
|
||||
smallvec = "1"
|
||||
rayon = "1"
|
||||
lru = "0.6"
|
||||
|
||||
[target.'cfg(windows)'.dependencies]
|
||||
winapi = "0.3"
|
||||
|
||||
@@ -6,18 +6,27 @@ use crate::directory::{FileSlice, OwnedBytes};
|
||||
use crate::schema::Document;
|
||||
use crate::space_usage::StoreSpaceUsage;
|
||||
use crate::DocId;
|
||||
use std::cell::RefCell;
|
||||
use lru::LruCache;
|
||||
use std::io;
|
||||
use std::mem::size_of;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
const LRU_CACHE_CAPACITY: usize = 100;
|
||||
|
||||
type Block = Arc<Vec<u8>>;
|
||||
|
||||
type BlockCache = Arc<Mutex<LruCache<usize, Block>>>;
|
||||
|
||||
/// Reads document off tantivy's [`Store`](./index.html)
|
||||
#[derive(Clone)]
|
||||
pub struct StoreReader {
|
||||
data: FileSlice,
|
||||
offset_index_file: OwnedBytes,
|
||||
current_block_offset: RefCell<usize>,
|
||||
current_block: RefCell<Vec<u8>>,
|
||||
max_doc: DocId,
|
||||
cache: BlockCache,
|
||||
cache_hits: Arc<AtomicUsize>,
|
||||
cache_misses: Arc<AtomicUsize>,
|
||||
}
|
||||
|
||||
impl StoreReader {
|
||||
@@ -28,9 +37,10 @@ impl StoreReader {
|
||||
Ok(StoreReader {
|
||||
data: data_file,
|
||||
offset_index_file: offset_index_file.read_bytes()?,
|
||||
current_block_offset: RefCell::new(usize::max_value()),
|
||||
current_block: RefCell::new(Vec::new()),
|
||||
max_doc,
|
||||
cache: Arc::new(Mutex::new(LruCache::new(LRU_CACHE_CAPACITY))),
|
||||
cache_hits: Default::default(),
|
||||
cache_misses: Default::default(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -55,15 +65,22 @@ impl StoreReader {
|
||||
block_body.slice_to(block_len as usize).read_bytes()
|
||||
}
|
||||
|
||||
fn read_block(&self, block_offset: usize) -> io::Result<()> {
|
||||
if block_offset != *self.current_block_offset.borrow() {
|
||||
let mut current_block_mut = self.current_block.borrow_mut();
|
||||
current_block_mut.clear();
|
||||
let compressed_block = self.compressed_block(block_offset)?;
|
||||
decompress(compressed_block.as_slice(), &mut current_block_mut)?;
|
||||
*self.current_block_offset.borrow_mut() = block_offset;
|
||||
fn read_block(&self, block_offset: usize) -> io::Result<Block> {
|
||||
if let Some(block) = self.cache.lock().unwrap().get(&block_offset) {
|
||||
self.cache_hits.fetch_add(1, Ordering::SeqCst);
|
||||
return Ok(block.clone());
|
||||
}
|
||||
Ok(())
|
||||
|
||||
self.cache_misses.fetch_add(1, Ordering::SeqCst);
|
||||
|
||||
let compressed_block = self.compressed_block(block_offset)?;
|
||||
let mut decompressed_block = vec![];
|
||||
decompress(compressed_block.as_slice(), &mut decompressed_block)?;
|
||||
|
||||
let block = Arc::new(decompressed_block);
|
||||
self.cache.lock().unwrap().put(block_offset, block.clone());
|
||||
|
||||
Ok(block)
|
||||
}
|
||||
|
||||
/// Reads a given document.
|
||||
@@ -75,13 +92,13 @@ impl StoreReader {
|
||||
/// for instance.
|
||||
pub fn get(&self, doc_id: DocId) -> crate::Result<Document> {
|
||||
let (first_doc_id, block_offset) = self.block_offset(doc_id);
|
||||
self.read_block(block_offset as usize)?;
|
||||
let current_block_mut = self.current_block.borrow_mut();
|
||||
let mut cursor = ¤t_block_mut[..];
|
||||
let mut cursor = &self.read_block(block_offset as usize)?[..];
|
||||
|
||||
for _ in first_doc_id..doc_id {
|
||||
let doc_length = VInt::deserialize(&mut cursor)?.val() as usize;
|
||||
cursor = &cursor[doc_length..];
|
||||
}
|
||||
|
||||
let doc_length = VInt::deserialize(&mut cursor)?.val() as usize;
|
||||
cursor = &cursor[..doc_length];
|
||||
Ok(Document::deserialize(&mut cursor)?)
|
||||
@@ -107,3 +124,82 @@ fn split_file(data: FileSlice) -> io::Result<(FileSlice, FileSlice, DocId)> {
|
||||
max_doc,
|
||||
))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::schema::Document;
|
||||
use crate::schema::Field;
|
||||
use crate::{directory::RAMDirectory, store::tests::write_lorem_ipsum_store, Directory};
|
||||
use std::path::Path;
|
||||
|
||||
fn get_text_field<'a>(doc: &'a Document, field: &'a Field) -> Option<&'a str> {
|
||||
doc.get_first(*field).and_then(|f| f.text())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_store_lru_cache() -> crate::Result<()> {
|
||||
let directory = RAMDirectory::create();
|
||||
let path = Path::new("store");
|
||||
let writer = directory.open_write(path)?;
|
||||
let schema = write_lorem_ipsum_store(writer, 500);
|
||||
let title = schema.get_field("title").unwrap();
|
||||
let store_file = directory.open_read(path)?;
|
||||
let store = StoreReader::open(store_file)?;
|
||||
|
||||
assert_eq!(store.cache.lock().unwrap().len(), 0);
|
||||
assert_eq!(store.cache_hits.load(Ordering::SeqCst), 0);
|
||||
assert_eq!(store.cache_misses.load(Ordering::SeqCst), 0);
|
||||
|
||||
let doc = store.get(0)?;
|
||||
assert_eq!(get_text_field(&doc, &title), Some("Doc 0"));
|
||||
|
||||
assert_eq!(store.cache.lock().unwrap().len(), 1);
|
||||
assert_eq!(store.cache_hits.load(Ordering::SeqCst), 0);
|
||||
assert_eq!(store.cache_misses.load(Ordering::SeqCst), 1);
|
||||
assert_eq!(
|
||||
store
|
||||
.cache
|
||||
.lock()
|
||||
.unwrap()
|
||||
.peek_lru()
|
||||
.map(|(&k, _)| k as usize),
|
||||
Some(0)
|
||||
);
|
||||
|
||||
let doc = store.get(499)?;
|
||||
assert_eq!(get_text_field(&doc, &title), Some("Doc 499"));
|
||||
|
||||
assert_eq!(store.cache.lock().unwrap().len(), 2);
|
||||
assert_eq!(store.cache_hits.load(Ordering::SeqCst), 0);
|
||||
assert_eq!(store.cache_misses.load(Ordering::SeqCst), 2);
|
||||
|
||||
assert_eq!(
|
||||
store
|
||||
.cache
|
||||
.lock()
|
||||
.unwrap()
|
||||
.peek_lru()
|
||||
.map(|(&k, _)| k as usize),
|
||||
Some(0)
|
||||
);
|
||||
|
||||
let doc = store.get(0)?;
|
||||
assert_eq!(get_text_field(&doc, &title), Some("Doc 0"));
|
||||
|
||||
assert_eq!(store.cache.lock().unwrap().len(), 2);
|
||||
assert_eq!(store.cache_hits.load(Ordering::SeqCst), 1);
|
||||
assert_eq!(store.cache_misses.load(Ordering::SeqCst), 2);
|
||||
assert_eq!(
|
||||
store
|
||||
.cache
|
||||
.lock()
|
||||
.unwrap()
|
||||
.peek_lru()
|
||||
.map(|(&k, _)| k as usize),
|
||||
Some(18862)
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user