From 7f0e61b173457ff09fe9a899c2331eb00c15e87e Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Tue, 10 Nov 2020 10:27:32 +0900 Subject: [PATCH] Refactoring of the skip index. The skip index now identifies both the start and the end offset of blocks. Checkpoints are compressed in blocks, reaching better compression. --- src/common/mod.rs | 4 - src/query/boolean_query/block_wand.rs | 1 + src/store/index/block.rs | 165 ++++++++++++++++++ src/store/index/mod.rs | 230 +++++++++++++++++++++++++ src/store/index/skip_index.rs | 112 ++++++++++++ src/store/index/skip_index_builder.rs | 115 +++++++++++++ src/store/mod.rs | 2 +- src/store/reader.rs | 83 +++++---- src/store/skiplist/mod.rs | 168 ------------------ src/store/skiplist/skiplist.rs | 133 -------------- src/store/skiplist/skiplist_builder.rs | 98 ----------- src/store/writer.rs | 41 +++-- 12 files changed, 691 insertions(+), 461 deletions(-) create mode 100644 src/store/index/block.rs create mode 100644 src/store/index/mod.rs create mode 100644 src/store/index/skip_index.rs create mode 100644 src/store/index/skip_index_builder.rs delete mode 100644 src/store/skiplist/mod.rs delete mode 100644 src/store/skiplist/skiplist.rs delete mode 100644 src/store/skiplist/skiplist_builder.rs diff --git a/src/common/mod.rs b/src/common/mod.rs index c5f23d84d..1dcb98afa 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -66,10 +66,6 @@ pub(crate) fn compute_num_bits(n: u64) -> u8 { } } -pub(crate) fn is_power_of_2(n: usize) -> bool { - (n > 0) && (n & (n - 1) == 0) -} - /// Has length trait pub trait HasLen { /// Return length diff --git a/src/query/boolean_query/block_wand.rs b/src/query/boolean_query/block_wand.rs index 7c4fe616c..2a6bd43b4 100644 --- a/src/query/boolean_query/block_wand.rs +++ b/src/query/boolean_query/block_wand.rs @@ -533,6 +533,7 @@ mod tests { #![proptest_config(ProptestConfig::with_cases(500))] #[ignore] #[test] + #[ignore] fn test_block_wand_three_term_scorers((posting_lists, fieldnorms) in gen_term_scorers(3)) { test_block_wand_aux(&posting_lists[..], &fieldnorms[..]); } diff --git a/src/store/index/block.rs b/src/store/index/block.rs new file mode 100644 index 000000000..4c9eef490 --- /dev/null +++ b/src/store/index/block.rs @@ -0,0 +1,165 @@ +use crate::common::VInt; +use crate::store::index::{Checkpoint, CHECKPOINT_PERIOD}; +use crate::DocId; +use std::io; + +/// Represents a block of checkpoints. +/// +/// The DocStore index checkpoints are organized into block +/// for code-readability and compression purpose. +/// +/// A block can be of any size. +pub struct CheckpointBlock { + pub checkpoints: Vec, +} + +impl Default for CheckpointBlock { + fn default() -> CheckpointBlock { + CheckpointBlock { + checkpoints: Vec::with_capacity(2 * CHECKPOINT_PERIOD), + } + } +} + +impl CheckpointBlock { + /// If non-empty returns [start_doc, end_doc) + /// for the overall block. + pub fn doc_interval(&self) -> Option<(DocId, DocId)> { + let start_doc_opt = self + .checkpoints + .first() + .cloned() + .map(|checkpoint| checkpoint.start_doc); + let end_doc_opt = self + .checkpoints + .last() + .cloned() + .map(|checkpoint| checkpoint.end_doc); + match (start_doc_opt, end_doc_opt) { + (Some(start_doc), Some(end_doc)) => Some((start_doc, end_doc)), + _ => None, + } + } + + /// Adding another checkpoint in the block. + pub fn push(&mut self, checkpoint: Checkpoint) { + self.checkpoints.push(checkpoint); + } + + /// Returns the number of checkpoints in the block. + pub fn len(&self) -> usize { + self.checkpoints.len() + } + + pub fn get(&self, idx: usize) -> Checkpoint { + self.checkpoints[idx] + } + + pub fn clear(&mut self) { + self.checkpoints.clear(); + } + + pub fn serialize(&mut self, buffer: &mut Vec) { + VInt(self.checkpoints.len() as u64).serialize_into_vec(buffer); + if self.checkpoints.is_empty() { + return; + } + VInt(self.checkpoints[0].start_doc as u64).serialize_into_vec(buffer); + VInt(self.checkpoints[0].start_offset as u64).serialize_into_vec(buffer); + for checkpoint in &self.checkpoints { + let delta_doc = checkpoint.end_doc - checkpoint.start_doc; + VInt(delta_doc as u64).serialize_into_vec(buffer); + VInt(checkpoint.end_offset - checkpoint.start_offset).serialize_into_vec(buffer); + } + } + + pub fn deserialize(&mut self, data: &mut &[u8]) -> io::Result<()> { + if data.is_empty() { + return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "")); + } + self.checkpoints.clear(); + let len = VInt::deserialize_u64(data)? as usize; + if len == 0 { + return Ok(()); + } + let mut doc = VInt::deserialize_u64(data)? as DocId; + let mut start_offset = VInt::deserialize_u64(data)?; + for _ in 0..len { + let num_docs = VInt::deserialize_u64(data)? as DocId; + let block_num_bytes = VInt::deserialize_u64(data)?; + self.checkpoints.push(Checkpoint { + start_doc: doc, + end_doc: doc + num_docs, + start_offset, + end_offset: start_offset + block_num_bytes, + }); + doc += num_docs; + start_offset += block_num_bytes; + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use crate::store::index::block::CheckpointBlock; + use crate::store::index::Checkpoint; + use crate::DocId; + use std::io; + + fn test_aux_ser_deser(checkpoints: &[Checkpoint]) -> io::Result<()> { + let mut block = CheckpointBlock::default(); + for &checkpoint in checkpoints { + block.push(checkpoint); + } + let mut buffer = Vec::new(); + block.serialize(&mut buffer); + let mut block_deser = CheckpointBlock::default(); + let checkpoint = Checkpoint { + start_doc: 0, + end_doc: 1, + start_offset: 2, + end_offset: 3, + }; + block_deser.push(checkpoint); // < check that value is erased before deser + let mut data = &buffer[..]; + block_deser.deserialize(&mut data)?; + assert!(data.is_empty()); + assert_eq!(checkpoints, &block_deser.checkpoints[..]); + Ok(()) + } + + #[test] + fn test_block_serialize_empty() -> io::Result<()> { + test_aux_ser_deser(&[]) + } + + #[test] + fn test_block_serialize_simple() -> io::Result<()> { + let checkpoints = vec![Checkpoint { + start_doc: 10, + end_doc: 12, + start_offset: 100, + end_offset: 120, + }]; + test_aux_ser_deser(&checkpoints) + } + + #[test] + fn test_block_serialize() -> io::Result<()> { + let offsets: Vec = (0..11).map(|i| i * i * i).collect(); + let mut checkpoints = vec![]; + let mut start_doc = 0; + for i in 0..10 { + let end_doc = (i * i) as DocId; + checkpoints.push(Checkpoint { + start_doc, + end_doc, + start_offset: offsets[i], + end_offset: offsets[i + 1], + }); + start_doc = end_doc; + } + test_aux_ser_deser(&checkpoints) + } +} diff --git a/src/store/index/mod.rs b/src/store/index/mod.rs new file mode 100644 index 000000000..f0779f60a --- /dev/null +++ b/src/store/index/mod.rs @@ -0,0 +1,230 @@ +const CHECKPOINT_PERIOD: usize = 8; + +use std::fmt; +mod block; +mod skip_index; +mod skip_index_builder; + +use crate::DocId; + +pub use self::skip_index::SkipIndex; +pub use self::skip_index_builder::SkipIndexBuilder; + +/// A checkpoint contains meta-information about +/// a block. Either a block of documents, or another block +/// of checkpoints. +/// +/// All of the intervals here defined are semi-open. +/// The checkpoint describes that the block within the bytes +/// `[start_offset..end_offset)` spans over the docs +/// `[start_doc..end_doc)`. +#[derive(Clone, Copy, Eq, PartialEq)] +pub struct Checkpoint { + pub start_doc: DocId, + pub end_doc: DocId, + pub start_offset: u64, + pub end_offset: u64, +} + +impl fmt::Debug for Checkpoint { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "(doc=[{}..{}), bytes=[{}..{}))", + self.start_doc, self.end_doc, self.start_offset, self.end_offset + ) + } +} + +#[cfg(test)] +mod tests { + + use std::io; + + use proptest::strategy::{BoxedStrategy, Strategy}; + + use crate::directory::OwnedBytes; + use crate::store::index::Checkpoint; + use crate::DocId; + + use super::{SkipIndex, SkipIndexBuilder}; + + #[test] + fn test_skip_index_empty() -> io::Result<()> { + let mut output: Vec = Vec::new(); + let skip_index_builder: SkipIndexBuilder = SkipIndexBuilder::new(); + skip_index_builder.write(&mut output)?; + let skip_index: SkipIndex = SkipIndex::from(OwnedBytes::new(output)); + let mut skip_cursor = skip_index.checkpoints(); + assert!(skip_cursor.next().is_none()); + Ok(()) + } + + #[test] + fn test_skip_index_single_el() -> io::Result<()> { + let mut output: Vec = Vec::new(); + let mut skip_index_builder: SkipIndexBuilder = SkipIndexBuilder::new(); + let checkpoint = Checkpoint { + start_doc: 0, + end_doc: 2, + start_offset: 0, + end_offset: 3, + }; + skip_index_builder.insert(checkpoint); + skip_index_builder.write(&mut output)?; + let skip_index: SkipIndex = SkipIndex::from(OwnedBytes::new(output)); + let mut skip_cursor = skip_index.checkpoints(); + assert_eq!(skip_cursor.next(), Some(checkpoint)); + assert_eq!(skip_cursor.next(), None); + Ok(()) + } + + #[test] + fn test_skip_index() -> io::Result<()> { + let mut output: Vec = Vec::new(); + let checkpoints = vec![ + Checkpoint { + start_doc: 0, + end_doc: 3, + start_offset: 4, + end_offset: 9, + }, + Checkpoint { + start_doc: 3, + end_doc: 4, + start_offset: 9, + end_offset: 25, + }, + Checkpoint { + start_doc: 4, + end_doc: 6, + start_offset: 25, + end_offset: 49, + }, + Checkpoint { + start_doc: 6, + end_doc: 8, + start_offset: 49, + end_offset: 81, + }, + Checkpoint { + start_doc: 8, + end_doc: 10, + start_offset: 81, + end_offset: 100, + }, + ]; + + let mut skip_index_builder: SkipIndexBuilder = SkipIndexBuilder::new(); + for &checkpoint in &checkpoints { + skip_index_builder.insert(checkpoint); + } + skip_index_builder.write(&mut output)?; + + let skip_index: SkipIndex = SkipIndex::from(OwnedBytes::new(output)); + assert_eq!( + &skip_index.checkpoints().collect::>()[..], + &checkpoints[..] + ); + Ok(()) + } + + fn offset_test(doc: DocId) -> u64 { + (doc as u64) * (doc as u64) + } + + #[test] + fn test_skip_index_long() -> io::Result<()> { + let mut output: Vec = Vec::new(); + let checkpoints: Vec = (0..1000) + .map(|i| Checkpoint { + start_doc: i, + end_doc: i + 1, + start_offset: offset_test(i), + end_offset: offset_test(i + 1), + }) + .collect(); + let mut skip_index_builder = SkipIndexBuilder::new(); + for checkpoint in &checkpoints { + skip_index_builder.insert(*checkpoint); + } + skip_index_builder.write(&mut output)?; + assert_eq!(output.len(), 4035); + let resulting_checkpoints: Vec = SkipIndex::from(OwnedBytes::new(output)) + .checkpoints() + .collect(); + assert_eq!(&resulting_checkpoints, &checkpoints); + Ok(()) + } + + fn integrate_delta(mut vals: Vec) -> Vec { + let mut prev = 0u64; + for val in vals.iter_mut() { + let new_val = *val + prev; + prev = new_val; + *val = new_val; + } + vals + } + + // Generates a sequence of n valid checkpoints, with n < max_len. + fn monotonic_checkpoints(max_len: usize) -> BoxedStrategy> { + (1..max_len) + .prop_flat_map(move |len: usize| { + ( + proptest::collection::vec(1u64..20u64, len as usize).prop_map(integrate_delta), + proptest::collection::vec(1u64..26u64, len as usize).prop_map(integrate_delta), + ) + .prop_map(|(docs, offsets)| { + (0..docs.len() - 1) + .map(move |i| Checkpoint { + start_doc: docs[i] as DocId, + end_doc: docs[i + 1] as DocId, + start_offset: offsets[i], + end_offset: offsets[i + 1], + }) + .collect::>() + }) + }) + .boxed() + } + + fn seek_manual>( + checkpoints: I, + target: DocId, + ) -> Option { + checkpoints + .into_iter() + .filter(|checkpoint| checkpoint.end_doc > target) + .next() + } + + fn test_skip_index_aux(skip_index: SkipIndex, checkpoints: &[Checkpoint]) { + if let Some(last_checkpoint) = checkpoints.last() { + for doc in 0u32..last_checkpoint.end_doc { + let expected = seek_manual(skip_index.checkpoints(), doc); + assert_eq!(expected, skip_index.seek(doc), "Doc {}", doc); + } + assert!(skip_index.seek(last_checkpoint.end_doc).is_none()); + } + } + + use proptest::prelude::*; + + proptest! { + #![proptest_config(ProptestConfig::with_cases(20))] + #[test] + fn test_proptest_skip(checkpoints in monotonic_checkpoints(100)) { + let mut skip_index_builder = SkipIndexBuilder::new(); + for checkpoint in checkpoints.iter().cloned() { + skip_index_builder.insert(checkpoint); + } + let mut buffer = Vec::new(); + skip_index_builder.write(&mut buffer).unwrap(); + let skip_index = SkipIndex::from(OwnedBytes::new(buffer)); + let iter_checkpoints: Vec = skip_index.checkpoints().collect(); + assert_eq!(&checkpoints[..], &iter_checkpoints[..]); + test_skip_index_aux(skip_index, &checkpoints[..]); + } + } +} diff --git a/src/store/index/skip_index.rs b/src/store/index/skip_index.rs new file mode 100644 index 000000000..8b304ca93 --- /dev/null +++ b/src/store/index/skip_index.rs @@ -0,0 +1,112 @@ +use crate::common::{BinarySerializable, VInt}; +use crate::directory::OwnedBytes; +use crate::store::index::block::CheckpointBlock; +use crate::store::index::Checkpoint; +use crate::DocId; + +pub struct LayerCursor<'a> { + remaining: &'a [u8], + block: CheckpointBlock, + cursor: usize, +} + +impl<'a> Iterator for LayerCursor<'a> { + type Item = Checkpoint; + + fn next(&mut self) -> Option { + if self.cursor == self.block.len() { + if self.remaining.is_empty() { + return None; + } + let (block_mut, remaining_mut) = (&mut self.block, &mut self.remaining); + if let Err(_) = block_mut.deserialize(remaining_mut) { + return None; + } + self.cursor = 0; + } + let res = Some(self.block.get(self.cursor)); + self.cursor += 1; + res + } +} + +struct Layer { + data: OwnedBytes, +} + +impl Layer { + fn cursor<'a>(&'a self) -> impl Iterator + 'a { + self.cursor_at_offset(0u64) + } + + fn cursor_at_offset<'a>(&'a self, start_offset: u64) -> impl Iterator + 'a { + let data = &self.data.as_slice(); + LayerCursor { + remaining: &data[start_offset as usize..], + block: CheckpointBlock::default(), + cursor: 0, + } + } + + fn seek_start_at_offset(&self, target: DocId, offset: u64) -> Option { + self.cursor_at_offset(offset) + .filter(|checkpoint| checkpoint.end_doc > target) + .next() + } +} + +pub struct SkipIndex { + layers: Vec, +} + +impl SkipIndex { + pub(crate) fn checkpoints<'a>(&'a self) -> impl Iterator + 'a { + self.layers + .last() + .into_iter() + .flat_map(|layer| layer.cursor()) + } + + pub fn seek(&self, target: DocId) -> Option { + let first_layer_len = self + .layers + .first() + .map(|layer| layer.data.len() as u64) + .unwrap_or(0u64); + let mut cur_checkpoint = Checkpoint { + start_doc: 0u32, + end_doc: 1u32, + start_offset: 0u64, + end_offset: first_layer_len, + }; + for layer in &self.layers { + if let Some(checkpoint) = + layer.seek_start_at_offset(target, cur_checkpoint.start_offset) + { + cur_checkpoint = checkpoint; + } else { + return None; + } + } + Some(cur_checkpoint) + } +} + +impl From for SkipIndex { + fn from(mut data: OwnedBytes) -> SkipIndex { + let offsets: Vec = Vec::::deserialize(&mut data) + .unwrap() + .into_iter() + .map(|el| el.0) + .collect(); + let mut start_offset = 0; + let mut layers = Vec::new(); + for end_offset in offsets { + layers.push(Layer { + data: data.slice(start_offset as usize, end_offset as usize), + }); + start_offset = end_offset; + } + SkipIndex { layers } + } +} diff --git a/src/store/index/skip_index_builder.rs b/src/store/index/skip_index_builder.rs new file mode 100644 index 000000000..0306c3568 --- /dev/null +++ b/src/store/index/skip_index_builder.rs @@ -0,0 +1,115 @@ +use crate::common::{BinarySerializable, VInt}; +use crate::store::index::block::CheckpointBlock; +use crate::store::index::{Checkpoint, CHECKPOINT_PERIOD}; +use std::io; +use std::io::Write; + +// Each skip contains iterator over pairs (last doc in block, offset to start of block). + +struct LayerBuilder { + buffer: Vec, + pub block: CheckpointBlock, +} + +impl LayerBuilder { + fn finish(self) -> Vec { + self.buffer + } + + fn new() -> LayerBuilder { + LayerBuilder { + buffer: Vec::new(), + block: CheckpointBlock::default(), + } + } + + /// Serializes the block, and return a checkpoint representing + /// the entire block. + /// + /// If the block was empty to begin with, simply return None. + fn flush_block(&mut self) -> Option { + self.block.doc_interval().map(|(start_doc, end_doc)| { + let start_offset = self.buffer.len() as u64; + self.block.serialize(&mut self.buffer); + let end_offset = self.buffer.len() as u64; + self.block.clear(); + Checkpoint { + start_doc, + end_doc, + start_offset, + end_offset, + } + }) + } + + fn push(&mut self, checkpoint: Checkpoint) { + self.block.push(checkpoint); + } + + fn insert(&mut self, checkpoint: Checkpoint) -> Option { + self.push(checkpoint); + let emit_skip_info = (self.block.len() % CHECKPOINT_PERIOD) == 0; + if emit_skip_info { + self.flush_block() + } else { + None + } + } +} + +pub struct SkipIndexBuilder { + layers: Vec, +} + +impl SkipIndexBuilder { + pub fn new() -> SkipIndexBuilder { + SkipIndexBuilder { layers: Vec::new() } + } + + fn get_layer(&mut self, layer_id: usize) -> &mut LayerBuilder { + if layer_id == self.layers.len() { + let layer_builder = LayerBuilder::new(); + self.layers.push(layer_builder); + } + &mut self.layers[layer_id] + } + + pub fn insert(&mut self, checkpoint: Checkpoint) { + let mut skip_pointer = Some(checkpoint); + for layer_id in 0.. { + if let Some(checkpoint) = skip_pointer { + skip_pointer = self.get_layer(layer_id).insert(checkpoint); + } else { + break; + } + } + } + + pub fn write(mut self, output: &mut W) -> io::Result<()> { + let mut last_pointer = None; + for skip_layer in self.layers.iter_mut() { + if let Some(checkpoint) = last_pointer { + skip_layer.push(checkpoint); + } + last_pointer = skip_layer.flush_block(); + } + let layer_buffers: Vec> = self + .layers + .into_iter() + .rev() + .map(|layer| layer.finish()) + .collect(); + + let mut layer_offset = 0; + let mut layer_sizes = Vec::new(); + for layer_buffer in &layer_buffers { + layer_offset += layer_buffer.len() as u64; + layer_sizes.push(VInt(layer_offset)); + } + layer_sizes.serialize(output)?; + for layer_buffer in layer_buffers { + output.write_all(&layer_buffer[..])?; + } + Ok(()) + } +} diff --git a/src/store/mod.rs b/src/store/mod.rs index 7327d65fb..6eff6ddd7 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -33,8 +33,8 @@ and should rely on either !*/ +mod index; mod reader; -mod skiplist; mod writer; pub use self::reader::StoreReader; pub use self::writer::StoreWriter; diff --git a/src/store/reader.rs b/src/store/reader.rs index 626341b1b..fa62a3257 100644 --- a/src/store/reader.rs +++ b/src/store/reader.rs @@ -1,10 +1,11 @@ use super::decompress; -use super::skiplist::SkipList; +use super::index::SkipIndex; use crate::common::VInt; use crate::common::{BinarySerializable, HasLen}; use crate::directory::{FileSlice, OwnedBytes}; use crate::schema::Document; use crate::space_usage::StoreSpaceUsage; +use crate::store::index::Checkpoint; use crate::DocId; use lru::LruCache; use std::io; @@ -16,69 +17,73 @@ const LRU_CACHE_CAPACITY: usize = 100; type Block = Arc>; -type BlockCache = Arc>>; +type BlockCache = Arc>>; /// Reads document off tantivy's [`Store`](./index.html) -#[derive(Clone)] pub struct StoreReader { data: FileSlice, - offset_index_file: OwnedBytes, - max_doc: DocId, cache: BlockCache, cache_hits: Arc, cache_misses: Arc, + skip_index: Arc, + space_usage: StoreSpaceUsage, } impl StoreReader { /// Opens a store reader - // TODO rename open pub fn open(store_file: FileSlice) -> io::Result { - let (data_file, offset_index_file, max_doc) = split_file(store_file)?; + let (data_file, offset_index_file) = split_file(store_file)?; + let index_data = offset_index_file.read_bytes()?; + let space_usage = StoreSpaceUsage::new(data_file.len(), offset_index_file.len()); + let skip_index = SkipIndex::from(index_data); Ok(StoreReader { data: data_file, - offset_index_file: offset_index_file.read_bytes()?, - max_doc, cache: Arc::new(Mutex::new(LruCache::new(LRU_CACHE_CAPACITY))), cache_hits: Default::default(), cache_misses: Default::default(), + skip_index: Arc::new(skip_index), + space_usage, }) } - pub(crate) fn block_index(&self) -> SkipList<'_, u64> { - SkipList::from(self.offset_index_file.as_slice()) + pub(crate) fn block_checkpoints<'a>(&'a self) -> impl Iterator + 'a { + self.skip_index.checkpoints() } - fn block_offset(&self, doc_id: DocId) -> (DocId, u64) { - self.block_index() - .seek(u64::from(doc_id) + 1) - .map(|(doc, offset)| (doc as DocId, offset)) - .unwrap_or((0u32, 0u64)) + fn block_checkpoint(&self, doc_id: DocId) -> Option { + self.skip_index.seek(doc_id) } pub(crate) fn block_data(&self) -> io::Result { self.data.read_bytes() } - fn compressed_block(&self, addr: usize) -> io::Result { - let (block_len_bytes, block_body) = self.data.slice_from(addr).split(4); - let block_len = u32::deserialize(&mut block_len_bytes.read_bytes()?)?; - block_body.slice_to(block_len as usize).read_bytes() + fn compressed_block(&self, checkpoint: &Checkpoint) -> io::Result { + self.data + .slice( + checkpoint.start_offset as usize, + checkpoint.end_offset as usize, + ) + .read_bytes() } - fn read_block(&self, block_offset: usize) -> io::Result { - if let Some(block) = self.cache.lock().unwrap().get(&block_offset) { + fn read_block(&self, checkpoint: &Checkpoint) -> io::Result { + if let Some(block) = self.cache.lock().unwrap().get(&checkpoint.start_offset) { self.cache_hits.fetch_add(1, Ordering::SeqCst); return Ok(block.clone()); } self.cache_misses.fetch_add(1, Ordering::SeqCst); - let compressed_block = self.compressed_block(block_offset)?; + let compressed_block = self.compressed_block(checkpoint)?; let mut decompressed_block = vec![]; decompress(compressed_block.as_slice(), &mut decompressed_block)?; let block = Arc::new(decompressed_block); - self.cache.lock().unwrap().put(block_offset, block.clone()); + self.cache + .lock() + .unwrap() + .put(checkpoint.start_offset, block.clone()); Ok(block) } @@ -91,10 +96,11 @@ impl StoreReader { /// It should not be called to score documents /// for instance. pub fn get(&self, doc_id: DocId) -> crate::Result { - let (first_doc_id, block_offset) = self.block_offset(doc_id); - let mut cursor = &self.read_block(block_offset as usize)?[..]; - - for _ in first_doc_id..doc_id { + let checkpoint = self.block_checkpoint(doc_id).ok_or_else(|| { + crate::TantivyError::InvalidArgument(format!("Failed to lookup Doc #{}.", doc_id)) + })?; + let mut cursor = &self.read_block(&checkpoint)?[..]; + for _ in checkpoint.start_doc..doc_id { let doc_length = VInt::deserialize(&mut cursor)?.val() as usize; cursor = &cursor[doc_length..]; } @@ -106,23 +112,16 @@ impl StoreReader { /// Summarize total space usage of this store reader. pub fn space_usage(&self) -> StoreSpaceUsage { - StoreSpaceUsage::new(self.data.len(), self.offset_index_file.len()) + self.space_usage.clone() } } -fn split_file(data: FileSlice) -> io::Result<(FileSlice, FileSlice, DocId)> { - let data_len = data.len(); - let footer_offset = data_len - size_of::() - size_of::(); - let serialized_offset: OwnedBytes = data.slice(footer_offset, data_len).read_bytes()?; +fn split_file(data: FileSlice) -> io::Result<(FileSlice, FileSlice)> { + let (data, footer_len_bytes) = data.split_from_end(size_of::()); + let serialized_offset: OwnedBytes = footer_len_bytes.read_bytes()?; let mut serialized_offset_buf = serialized_offset.as_slice(); - let offset = u64::deserialize(&mut serialized_offset_buf)?; - let offset = offset as usize; - let max_doc = u32::deserialize(&mut serialized_offset_buf)?; - Ok(( - data.slice(0, offset), - data.slice(offset, footer_offset), - max_doc, - )) + let offset = u64::deserialize(&mut serialized_offset_buf)? as usize; + Ok(data.split(offset)) } #[cfg(test)] @@ -197,7 +196,7 @@ mod tests { .unwrap() .peek_lru() .map(|(&k, _)| k as usize), - Some(18862) + Some(18806) ); Ok(()) diff --git a/src/store/skiplist/mod.rs b/src/store/skiplist/mod.rs deleted file mode 100644 index 6269f8ae8..000000000 --- a/src/store/skiplist/mod.rs +++ /dev/null @@ -1,168 +0,0 @@ -#![allow(dead_code)] - -mod skiplist; -mod skiplist_builder; - -pub use self::skiplist::SkipList; -pub use self::skiplist_builder::SkipListBuilder; - -#[cfg(test)] -mod tests { - - use super::{SkipList, SkipListBuilder}; - - #[test] - fn test_skiplist() { - let mut output: Vec = Vec::new(); - let mut skip_list_builder: SkipListBuilder = SkipListBuilder::new(8); - skip_list_builder.insert(2, &3).unwrap(); - skip_list_builder.write::>(&mut output).unwrap(); - let mut skip_list: SkipList<'_, u32> = SkipList::from(output.as_slice()); - assert_eq!(skip_list.next(), Some((2, 3))); - } - - #[test] - fn test_skiplist2() { - let mut output: Vec = Vec::new(); - let skip_list_builder: SkipListBuilder = SkipListBuilder::new(8); - skip_list_builder.write::>(&mut output).unwrap(); - let mut skip_list: SkipList<'_, u32> = SkipList::from(output.as_slice()); - assert_eq!(skip_list.next(), None); - } - - #[test] - fn test_skiplist3() { - let mut output: Vec = Vec::new(); - let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(2); - skip_list_builder.insert(2, &()).unwrap(); - skip_list_builder.insert(3, &()).unwrap(); - skip_list_builder.insert(5, &()).unwrap(); - skip_list_builder.insert(7, &()).unwrap(); - skip_list_builder.insert(9, &()).unwrap(); - skip_list_builder.write::>(&mut output).unwrap(); - let mut skip_list: SkipList<'_, ()> = SkipList::from(output.as_slice()); - assert_eq!(skip_list.next().unwrap(), (2, ())); - assert_eq!(skip_list.next().unwrap(), (3, ())); - assert_eq!(skip_list.next().unwrap(), (5, ())); - assert_eq!(skip_list.next().unwrap(), (7, ())); - assert_eq!(skip_list.next().unwrap(), (9, ())); - assert_eq!(skip_list.next(), None); - } - - #[test] - fn test_skiplist4() { - let mut output: Vec = Vec::new(); - let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(2); - skip_list_builder.insert(2, &()).unwrap(); - skip_list_builder.insert(3, &()).unwrap(); - skip_list_builder.insert(5, &()).unwrap(); - skip_list_builder.insert(7, &()).unwrap(); - skip_list_builder.insert(9, &()).unwrap(); - skip_list_builder.write::>(&mut output).unwrap(); - let mut skip_list: SkipList<'_, ()> = SkipList::from(output.as_slice()); - assert_eq!(skip_list.next().unwrap(), (2, ())); - skip_list.seek(5); - assert_eq!(skip_list.next().unwrap(), (5, ())); - assert_eq!(skip_list.next().unwrap(), (7, ())); - assert_eq!(skip_list.next().unwrap(), (9, ())); - assert_eq!(skip_list.next(), None); - } - - #[test] - fn test_skiplist5() { - let mut output: Vec = Vec::new(); - let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(4); - skip_list_builder.insert(2, &()).unwrap(); - skip_list_builder.insert(3, &()).unwrap(); - skip_list_builder.insert(5, &()).unwrap(); - skip_list_builder.insert(6, &()).unwrap(); - skip_list_builder.write::>(&mut output).unwrap(); - let mut skip_list: SkipList<'_, ()> = SkipList::from(output.as_slice()); - assert_eq!(skip_list.next().unwrap(), (2, ())); - skip_list.seek(6); - assert_eq!(skip_list.next().unwrap(), (6, ())); - assert_eq!(skip_list.next(), None); - } - - #[test] - fn test_skiplist6() { - let mut output: Vec = Vec::new(); - let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(2); - skip_list_builder.insert(2, &()).unwrap(); - skip_list_builder.insert(3, &()).unwrap(); - skip_list_builder.insert(5, &()).unwrap(); - skip_list_builder.insert(7, &()).unwrap(); - skip_list_builder.insert(9, &()).unwrap(); - skip_list_builder.write::>(&mut output).unwrap(); - let mut skip_list: SkipList<'_, ()> = SkipList::from(output.as_slice()); - assert_eq!(skip_list.next().unwrap(), (2, ())); - skip_list.seek(10); - assert_eq!(skip_list.next(), None); - } - - #[test] - fn test_skiplist7() { - let mut output: Vec = Vec::new(); - let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(4); - for i in 0..1000 { - skip_list_builder.insert(i, &()).unwrap(); - } - skip_list_builder.insert(1004, &()).unwrap(); - skip_list_builder.write::>(&mut output).unwrap(); - let mut skip_list: SkipList<'_, ()> = SkipList::from(output.as_slice()); - assert_eq!(skip_list.next().unwrap(), (0, ())); - skip_list.seek(431); - assert_eq!(skip_list.next().unwrap(), (431, ())); - skip_list.seek(1003); - assert_eq!(skip_list.next().unwrap(), (1004, ())); - assert_eq!(skip_list.next(), None); - } - - #[test] - fn test_skiplist8() { - let mut output: Vec = Vec::new(); - let mut skip_list_builder: SkipListBuilder = SkipListBuilder::new(8); - skip_list_builder.insert(2, &3).unwrap(); - skip_list_builder.write::>(&mut output).unwrap(); - assert_eq!(output.len(), 11); - assert_eq!(output[0], 1u8 + 128u8); - } - - #[test] - fn test_skiplist9() { - let mut output: Vec = Vec::new(); - let mut skip_list_builder: SkipListBuilder = SkipListBuilder::new(4); - for i in 0..4 * 4 * 4 { - skip_list_builder.insert(i, &i).unwrap(); - } - skip_list_builder.write::>(&mut output).unwrap(); - assert_eq!(output.len(), 774); - assert_eq!(output[0], 4u8 + 128u8); - } - - #[test] - fn test_skiplist10() { - // checking that void gets serialized to nothing. - let mut output: Vec = Vec::new(); - let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(4); - for i in 0..((4 * 4 * 4) - 1) { - skip_list_builder.insert(i, &()).unwrap(); - } - skip_list_builder.write::>(&mut output).unwrap(); - assert_eq!(output.len(), 230); - assert_eq!(output[0], 128u8 + 3u8); - } - - #[test] - fn test_skiplist11() { - // checking that void gets serialized to nothing. - let mut output: Vec = Vec::new(); - let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(4); - for i in 0..(4 * 4) { - skip_list_builder.insert(i, &()).unwrap(); - } - skip_list_builder.write::>(&mut output).unwrap(); - assert_eq!(output.len(), 65); - assert_eq!(output[0], 128u8 + 3u8); - } -} diff --git a/src/store/skiplist/skiplist.rs b/src/store/skiplist/skiplist.rs deleted file mode 100644 index f5cbde036..000000000 --- a/src/store/skiplist/skiplist.rs +++ /dev/null @@ -1,133 +0,0 @@ -use crate::common::{BinarySerializable, VInt}; -use std::cmp::max; -use std::marker::PhantomData; - -static EMPTY: [u8; 0] = []; - -struct Layer<'a, T> { - data: &'a [u8], - cursor: &'a [u8], - next_id: Option, - _phantom_: PhantomData, -} - -impl<'a, T: BinarySerializable> Iterator for Layer<'a, T> { - type Item = (u64, T); - - fn next(&mut self) -> Option<(u64, T)> { - if let Some(cur_id) = self.next_id { - let cur_val = T::deserialize(&mut self.cursor).unwrap(); - self.next_id = VInt::deserialize_u64(&mut self.cursor).ok(); - Some((cur_id, cur_val)) - } else { - None - } - } -} - -impl<'a, T: BinarySerializable> From<&'a [u8]> for Layer<'a, T> { - fn from(data: &'a [u8]) -> Layer<'a, T> { - let mut cursor = data; - let next_id = VInt::deserialize_u64(&mut cursor).ok(); - Layer { - data, - cursor, - next_id, - _phantom_: PhantomData, - } - } -} - -impl<'a, T: BinarySerializable> Layer<'a, T> { - fn empty() -> Layer<'a, T> { - Layer { - data: &EMPTY, - cursor: &EMPTY, - next_id: None, - _phantom_: PhantomData, - } - } - - fn seek_offset(&mut self, offset: usize) { - self.cursor = &self.data[offset..]; - self.next_id = VInt::deserialize_u64(&mut self.cursor).ok(); - } - - // Returns the last element (key, val) - // such that (key < doc_id) - // - // If there is no such element anymore, - // returns None. - // - // If the element exists, it will be returned - // at the next call to `.next()`. - fn seek(&mut self, key: u64) -> Option<(u64, T)> { - let mut result: Option<(u64, T)> = None; - loop { - if let Some(next_id) = self.next_id { - if next_id < key { - if let Some(v) = self.next() { - result = Some(v); - continue; - } - } - } - return result; - } - } -} - -pub struct SkipList<'a, T: BinarySerializable> { - data_layer: Layer<'a, T>, - skip_layers: Vec>, -} - -impl<'a, T: BinarySerializable> Iterator for SkipList<'a, T> { - type Item = (u64, T); - - fn next(&mut self) -> Option<(u64, T)> { - self.data_layer.next() - } -} - -impl<'a, T: BinarySerializable> SkipList<'a, T> { - pub fn seek(&mut self, key: u64) -> Option<(u64, T)> { - let mut next_layer_skip: Option<(u64, u64)> = None; - for skip_layer in &mut self.skip_layers { - if let Some((_, offset)) = next_layer_skip { - skip_layer.seek_offset(offset as usize); - } - next_layer_skip = skip_layer.seek(key); - } - if let Some((_, offset)) = next_layer_skip { - self.data_layer.seek_offset(offset as usize); - } - self.data_layer.seek(key) - } -} - -impl<'a, T: BinarySerializable> From<&'a [u8]> for SkipList<'a, T> { - fn from(mut data: &'a [u8]) -> SkipList<'a, T> { - let offsets: Vec = Vec::::deserialize(&mut data) - .unwrap() - .into_iter() - .map(|el| el.0) - .collect(); - let num_layers = offsets.len(); - let layers_data: &[u8] = data; - let data_layer: Layer<'a, T> = if num_layers == 0 { - Layer::empty() - } else { - let first_layer_data: &[u8] = &layers_data[..offsets[0] as usize]; - Layer::from(first_layer_data) - }; - let skip_layers = (0..max(1, num_layers) - 1) - .map(|i| (offsets[i] as usize, offsets[i + 1] as usize)) - .map(|(start, stop)| Layer::from(&layers_data[start..stop])) - .collect(); - SkipList { - skip_layers, - data_layer, - } - } -} diff --git a/src/store/skiplist/skiplist_builder.rs b/src/store/skiplist/skiplist_builder.rs deleted file mode 100644 index 104fc3b0c..000000000 --- a/src/store/skiplist/skiplist_builder.rs +++ /dev/null @@ -1,98 +0,0 @@ -use crate::common::{is_power_of_2, BinarySerializable, VInt}; -use std::io; -use std::io::Write; -use std::marker::PhantomData; - -struct LayerBuilder { - period_mask: usize, - buffer: Vec, - len: usize, - _phantom_: PhantomData, -} - -impl LayerBuilder { - fn written_size(&self) -> usize { - self.buffer.len() - } - - fn write(&self, output: &mut dyn Write) -> Result<(), io::Error> { - output.write_all(&self.buffer)?; - Ok(()) - } - - fn with_period(period: usize) -> LayerBuilder { - assert!(is_power_of_2(period), "The period has to be a power of 2."); - LayerBuilder { - period_mask: (period - 1), - buffer: Vec::new(), - len: 0, - _phantom_: PhantomData, - } - } - - fn insert(&mut self, key: u64, value: &T) -> io::Result> { - self.len += 1; - let offset = self.written_size() as u64; - VInt(key).serialize_into_vec(&mut self.buffer); - value.serialize(&mut self.buffer)?; - let emit_skip_info = (self.period_mask & self.len) == 0; - if emit_skip_info { - Ok(Some((key, offset))) - } else { - Ok(None) - } - } -} - -pub struct SkipListBuilder { - period: usize, - data_layer: LayerBuilder, - skip_layers: Vec>, -} - -impl SkipListBuilder { - pub fn new(period: usize) -> SkipListBuilder { - SkipListBuilder { - period, - data_layer: LayerBuilder::with_period(period), - skip_layers: Vec::new(), - } - } - - fn get_skip_layer(&mut self, layer_id: usize) -> &mut LayerBuilder { - if layer_id == self.skip_layers.len() { - let layer_builder = LayerBuilder::with_period(self.period); - self.skip_layers.push(layer_builder); - } - &mut self.skip_layers[layer_id] - } - - pub fn insert(&mut self, key: u64, dest: &T) -> io::Result<()> { - let mut skip_pointer = self.data_layer.insert(key, dest)?; - for layer_id in 0.. { - if let Some((skip_doc_id, skip_offset)) = skip_pointer { - skip_pointer = self - .get_skip_layer(layer_id) - .insert(skip_doc_id, &skip_offset)?; - } else { - break; - } - } - Ok(()) - } - - pub fn write(self, output: &mut W) -> io::Result<()> { - let mut size: u64 = self.data_layer.buffer.len() as u64; - let mut layer_sizes = vec![VInt(size)]; - for layer in self.skip_layers.iter().rev() { - size += layer.buffer.len() as u64; - layer_sizes.push(VInt(size)); - } - layer_sizes.serialize(output)?; - self.data_layer.write(output)?; - for layer in self.skip_layers.iter().rev() { - layer.write(output)?; - } - Ok(()) - } -} diff --git a/src/store/writer.rs b/src/store/writer.rs index ea1b69835..19592f9a0 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -1,11 +1,12 @@ use super::compress; -use super::skiplist::SkipListBuilder; +use super::index::SkipIndexBuilder; use super::StoreReader; use crate::common::CountingWriter; use crate::common::{BinarySerializable, VInt}; use crate::directory::TerminatingWrite; use crate::directory::WritePtr; use crate::schema::Document; +use crate::store::index::Checkpoint; use crate::DocId; use std::io::{self, Write}; @@ -21,7 +22,8 @@ const BLOCK_SIZE: usize = 16_384; /// pub struct StoreWriter { doc: DocId, - offset_index_writer: SkipListBuilder, + first_doc_in_block: DocId, + offset_index_writer: SkipIndexBuilder, writer: CountingWriter, intermediary_buffer: Vec, current_block: Vec, @@ -35,7 +37,8 @@ impl StoreWriter { pub fn new(writer: WritePtr) -> StoreWriter { StoreWriter { doc: 0, - offset_index_writer: SkipListBuilder::new(4), + first_doc_in_block: 0, + offset_index_writer: SkipIndexBuilder::new(), writer: CountingWriter::wrap(writer), intermediary_buffer: Vec::new(), current_block: Vec::new(), @@ -68,11 +71,9 @@ impl StoreWriter { pub fn stack(&mut self, store_reader: &StoreReader) -> io::Result<()> { if !self.current_block.is_empty() { self.write_and_compress_block()?; - self.offset_index_writer - .insert(u64::from(self.doc), &(self.writer.written_bytes() as u64))?; } - let doc_offset = self.doc; - let start_offset = self.writer.written_bytes() as u64; + let doc_shift = self.doc; + let start_shift = self.writer.written_bytes() as u64; // just bulk write all of the block of the given reader. self.writer @@ -80,22 +81,33 @@ impl StoreWriter { // concatenate the index of the `store_reader`, after translating // its start doc id and its start file offset. - for (next_doc_id, block_addr) in store_reader.block_index() { - self.doc = doc_offset + next_doc_id as u32; - self.offset_index_writer - .insert(u64::from(self.doc), &(start_offset + block_addr))?; + for mut checkpoint in store_reader.block_checkpoints() { + checkpoint.start_doc += doc_shift; + checkpoint.end_doc += doc_shift; + checkpoint.start_offset += start_shift; + checkpoint.end_offset += start_shift; + self.offset_index_writer.insert(checkpoint); + self.doc = checkpoint.end_doc; } Ok(()) } fn write_and_compress_block(&mut self) -> io::Result<()> { + assert!(self.doc > 0); self.intermediary_buffer.clear(); compress(&self.current_block[..], &mut self.intermediary_buffer)?; - (self.intermediary_buffer.len() as u32).serialize(&mut self.writer)?; + let start_offset = self.writer.written_bytes(); self.writer.write_all(&self.intermediary_buffer)?; - self.offset_index_writer - .insert(u64::from(self.doc), &(self.writer.written_bytes() as u64))?; + let end_offset = self.writer.written_bytes(); + let end_doc = self.doc; + self.offset_index_writer.insert(Checkpoint { + start_doc: self.first_doc_in_block, + end_doc, + start_offset, + end_offset, + }); self.current_block.clear(); + self.first_doc_in_block = self.doc; Ok(()) } @@ -110,7 +122,6 @@ impl StoreWriter { let header_offset: u64 = self.writer.written_bytes() as u64; self.offset_index_writer.write(&mut self.writer)?; header_offset.serialize(&mut self.writer)?; - self.doc.serialize(&mut self.writer)?; self.writer.terminate() } }