diff --git a/src/store/index/block.rs b/src/store/index/block.rs index 4c9eef490..33785748c 100644 --- a/src/store/index/block.rs +++ b/src/store/index/block.rs @@ -43,6 +43,9 @@ impl CheckpointBlock { /// Adding another checkpoint in the block. pub fn push(&mut self, checkpoint: Checkpoint) { + if let Some(prev_checkpoint) = self.checkpoints.last() { + assert!(checkpoint.follows(prev_checkpoint)); + } self.checkpoints.push(checkpoint); } diff --git a/src/store/index/mod.rs b/src/store/index/mod.rs index 471848c12..ab3bc89d4 100644 --- a/src/store/index/mod.rs +++ b/src/store/index/mod.rs @@ -26,6 +26,13 @@ pub struct Checkpoint { pub end_offset: u64, } +impl Checkpoint { + pub(crate) fn follows(&self, other: &Checkpoint) -> bool { + (self.start_doc == other.end_doc) && + (self.start_offset == other.end_offset) + } +} + impl fmt::Debug for Checkpoint { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!( diff --git a/src/store/index/skip_index_builder.rs b/src/store/index/skip_index_builder.rs index 0306c3568..b873aab96 100644 --- a/src/store/index/skip_index_builder.rs +++ b/src/store/index/skip_index_builder.rs @@ -28,18 +28,20 @@ impl LayerBuilder { /// /// If the block was empty to begin with, simply return None. fn flush_block(&mut self) -> Option { - self.block.doc_interval().map(|(start_doc, end_doc)| { + if let Some((start_doc, end_doc)) = self.block.doc_interval() { let start_offset = self.buffer.len() as u64; self.block.serialize(&mut self.buffer); let end_offset = self.buffer.len() as u64; self.block.clear(); - Checkpoint { + Some(Checkpoint { start_doc, end_doc, start_offset, end_offset, - } - }) + }) + } else { + None + } } fn push(&mut self, checkpoint: Checkpoint) { @@ -48,7 +50,7 @@ impl LayerBuilder { fn insert(&mut self, checkpoint: Checkpoint) -> Option { self.push(checkpoint); - let emit_skip_info = (self.block.len() % CHECKPOINT_PERIOD) == 0; + let emit_skip_info = self.block.len() >= CHECKPOINT_PERIOD; if emit_skip_info { self.flush_block() } else { @@ -57,13 +59,14 @@ impl LayerBuilder { } } +#[derive(Default)] pub struct SkipIndexBuilder { layers: Vec, } impl SkipIndexBuilder { pub fn new() -> SkipIndexBuilder { - SkipIndexBuilder { layers: Vec::new() } + Self::default() } fn get_layer(&mut self, layer_id: usize) -> &mut LayerBuilder { diff --git a/src/store/writer.rs b/src/store/writer.rs index 19592f9a0..3309f1a64 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -72,6 +72,7 @@ impl StoreWriter { if !self.current_block.is_empty() { self.write_and_compress_block()?; } + assert_eq!(self.first_doc_in_block, self.doc); let doc_shift = self.doc; let start_shift = self.writer.written_bytes() as u64; @@ -86,12 +87,17 @@ impl StoreWriter { checkpoint.end_doc += doc_shift; checkpoint.start_offset += start_shift; checkpoint.end_offset += start_shift; - self.offset_index_writer.insert(checkpoint); - self.doc = checkpoint.end_doc; + self.register_checkpoint(checkpoint); } Ok(()) } + fn register_checkpoint(&mut self, checkpoint: Checkpoint) { + self.offset_index_writer.insert(checkpoint); + self.first_doc_in_block = checkpoint.end_doc; + self.doc = checkpoint.end_doc; + } + fn write_and_compress_block(&mut self) -> io::Result<()> { assert!(self.doc > 0); self.intermediary_buffer.clear(); @@ -100,14 +106,13 @@ impl StoreWriter { self.writer.write_all(&self.intermediary_buffer)?; let end_offset = self.writer.written_bytes(); let end_doc = self.doc; - self.offset_index_writer.insert(Checkpoint { + self.register_checkpoint(Checkpoint { start_doc: self.first_doc_in_block, end_doc, start_offset, end_offset, }); self.current_block.clear(); - self.first_doc_in_block = self.doc; Ok(()) }