diff --git a/Cargo.toml b/Cargo.toml index 6a3647cb1..cfa706a4f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,6 +47,7 @@ murmurhash32 = "0.2" chrono = "0.4" smallvec = "1" rayon = "1" +lru = "0.6" [target.'cfg(windows)'.dependencies] winapi = "0.3" diff --git a/src/store/reader.rs b/src/store/reader.rs index f93ae825b..626341b1b 100644 --- a/src/store/reader.rs +++ b/src/store/reader.rs @@ -6,18 +6,27 @@ use crate::directory::{FileSlice, OwnedBytes}; use crate::schema::Document; use crate::space_usage::StoreSpaceUsage; use crate::DocId; -use std::cell::RefCell; +use lru::LruCache; use std::io; use std::mem::size_of; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; + +const LRU_CACHE_CAPACITY: usize = 100; + +type Block = Arc>; + +type BlockCache = Arc>>; /// Reads document off tantivy's [`Store`](./index.html) #[derive(Clone)] pub struct StoreReader { data: FileSlice, offset_index_file: OwnedBytes, - current_block_offset: RefCell, - current_block: RefCell>, max_doc: DocId, + cache: BlockCache, + cache_hits: Arc, + cache_misses: Arc, } impl StoreReader { @@ -28,9 +37,10 @@ impl StoreReader { Ok(StoreReader { data: data_file, offset_index_file: offset_index_file.read_bytes()?, - current_block_offset: RefCell::new(usize::max_value()), - current_block: RefCell::new(Vec::new()), max_doc, + cache: Arc::new(Mutex::new(LruCache::new(LRU_CACHE_CAPACITY))), + cache_hits: Default::default(), + cache_misses: Default::default(), }) } @@ -55,15 +65,22 @@ impl StoreReader { block_body.slice_to(block_len as usize).read_bytes() } - fn read_block(&self, block_offset: usize) -> io::Result<()> { - if block_offset != *self.current_block_offset.borrow() { - let mut current_block_mut = self.current_block.borrow_mut(); - current_block_mut.clear(); - let compressed_block = self.compressed_block(block_offset)?; - decompress(compressed_block.as_slice(), &mut current_block_mut)?; - *self.current_block_offset.borrow_mut() = block_offset; + fn read_block(&self, block_offset: usize) -> io::Result { + if let Some(block) = self.cache.lock().unwrap().get(&block_offset) { + self.cache_hits.fetch_add(1, Ordering::SeqCst); + return Ok(block.clone()); } - Ok(()) + + self.cache_misses.fetch_add(1, Ordering::SeqCst); + + let compressed_block = self.compressed_block(block_offset)?; + let mut decompressed_block = vec![]; + decompress(compressed_block.as_slice(), &mut decompressed_block)?; + + let block = Arc::new(decompressed_block); + self.cache.lock().unwrap().put(block_offset, block.clone()); + + Ok(block) } /// Reads a given document. @@ -75,13 +92,13 @@ impl StoreReader { /// for instance. pub fn get(&self, doc_id: DocId) -> crate::Result { let (first_doc_id, block_offset) = self.block_offset(doc_id); - self.read_block(block_offset as usize)?; - let current_block_mut = self.current_block.borrow_mut(); - let mut cursor = ¤t_block_mut[..]; + let mut cursor = &self.read_block(block_offset as usize)?[..]; + for _ in first_doc_id..doc_id { let doc_length = VInt::deserialize(&mut cursor)?.val() as usize; cursor = &cursor[doc_length..]; } + let doc_length = VInt::deserialize(&mut cursor)?.val() as usize; cursor = &cursor[..doc_length]; Ok(Document::deserialize(&mut cursor)?) @@ -107,3 +124,82 @@ fn split_file(data: FileSlice) -> io::Result<(FileSlice, FileSlice, DocId)> { max_doc, )) } + +#[cfg(test)] +mod tests { + use super::*; + use crate::schema::Document; + use crate::schema::Field; + use crate::{directory::RAMDirectory, store::tests::write_lorem_ipsum_store, Directory}; + use std::path::Path; + + fn get_text_field<'a>(doc: &'a Document, field: &'a Field) -> Option<&'a str> { + doc.get_first(*field).and_then(|f| f.text()) + } + + #[test] + fn test_store_lru_cache() -> crate::Result<()> { + let directory = RAMDirectory::create(); + let path = Path::new("store"); + let writer = directory.open_write(path)?; + let schema = write_lorem_ipsum_store(writer, 500); + let title = schema.get_field("title").unwrap(); + let store_file = directory.open_read(path)?; + let store = StoreReader::open(store_file)?; + + assert_eq!(store.cache.lock().unwrap().len(), 0); + assert_eq!(store.cache_hits.load(Ordering::SeqCst), 0); + assert_eq!(store.cache_misses.load(Ordering::SeqCst), 0); + + let doc = store.get(0)?; + assert_eq!(get_text_field(&doc, &title), Some("Doc 0")); + + assert_eq!(store.cache.lock().unwrap().len(), 1); + assert_eq!(store.cache_hits.load(Ordering::SeqCst), 0); + assert_eq!(store.cache_misses.load(Ordering::SeqCst), 1); + assert_eq!( + store + .cache + .lock() + .unwrap() + .peek_lru() + .map(|(&k, _)| k as usize), + Some(0) + ); + + let doc = store.get(499)?; + assert_eq!(get_text_field(&doc, &title), Some("Doc 499")); + + assert_eq!(store.cache.lock().unwrap().len(), 2); + assert_eq!(store.cache_hits.load(Ordering::SeqCst), 0); + assert_eq!(store.cache_misses.load(Ordering::SeqCst), 2); + + assert_eq!( + store + .cache + .lock() + .unwrap() + .peek_lru() + .map(|(&k, _)| k as usize), + Some(0) + ); + + let doc = store.get(0)?; + assert_eq!(get_text_field(&doc, &title), Some("Doc 0")); + + assert_eq!(store.cache.lock().unwrap().len(), 2); + assert_eq!(store.cache_hits.load(Ordering::SeqCst), 1); + assert_eq!(store.cache_misses.load(Ordering::SeqCst), 2); + assert_eq!( + store + .cache + .lock() + .unwrap() + .peek_lru() + .map(|(&k, _)| k as usize), + Some(18862) + ); + + Ok(()) + } +}