diff --git a/src/functional_test.rs b/src/functional_test.rs index 8c93bbb91..478a99686 100644 --- a/src/functional_test.rs +++ b/src/functional_test.rs @@ -1,9 +1,9 @@ -use std::collections::HashSet; -use rand::thread_rng; use crate::Index; use crate::Searcher; use crate::{doc, schema::*}; +use rand::thread_rng; use rand::Rng; +use std::collections::HashSet; fn check_index_content(searcher: &Searcher, vals: &[u64]) -> crate::Result<()> { assert!(searcher.segment_readers().len() < 20); @@ -84,7 +84,10 @@ fn test_functional_indexing() -> crate::Result<()> { reader.reload()?; let searcher = reader.searcher(); // check that everything is correct. - check_index_content(&searcher, &committed_docs.iter().cloned().collect::>())?; + check_index_content( + &searcher, + &committed_docs.iter().cloned().collect::>(), + )?; } else { if committed_docs.remove(&random_val) || uncommitted_docs.remove(&random_val) { let doc_id_term = Term::from_field_u64(id_field, random_val); diff --git a/src/store/index/block.rs b/src/store/index/block.rs index 4c9eef490..33785748c 100644 --- a/src/store/index/block.rs +++ b/src/store/index/block.rs @@ -43,6 +43,9 @@ impl CheckpointBlock { /// Adding another checkpoint in the block. pub fn push(&mut self, checkpoint: Checkpoint) { + if let Some(prev_checkpoint) = self.checkpoints.last() { + assert!(checkpoint.follows(prev_checkpoint)); + } self.checkpoints.push(checkpoint); } diff --git a/src/store/index/mod.rs b/src/store/index/mod.rs index f0fe8fe2e..708b6fbfe 100644 --- a/src/store/index/mod.rs +++ b/src/store/index/mod.rs @@ -26,6 +26,13 @@ pub struct Checkpoint { pub end_offset: u64, } +impl Checkpoint { + pub(crate) fn follows(&self, other: &Checkpoint) -> bool { + (self.start_doc == other.end_doc) && + (self.start_offset == other.end_offset) + } +} + impl fmt::Debug for Checkpoint { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!( @@ -39,13 +46,16 @@ impl fmt::Debug for Checkpoint { #[cfg(test)] mod tests { - use std::io; + use std::{io, iter}; + use futures::executor::block_on; use proptest::strategy::{BoxedStrategy, Strategy}; use crate::directory::OwnedBytes; + use crate::indexer::NoMergePolicy; + use crate::schema::{SchemaBuilder, STORED, STRING}; use crate::store::index::Checkpoint; - use crate::DocId; + use crate::{DocAddress, DocId, Index, Term}; use super::{SkipIndex, SkipIndexBuilder}; @@ -133,6 +143,40 @@ mod tests { (doc as u64) * (doc as u64) } + #[test] + fn test_merge_store_with_stacking_reproducing_issue969() -> crate::Result<()> { + let mut schema_builder = SchemaBuilder::default(); + let text = schema_builder.add_text_field("text", STORED | STRING); + let body = schema_builder.add_text_field("body", STORED); + let schema = schema_builder.build(); + let index = Index::create_in_ram(schema); + let mut index_writer = index.writer_for_tests()?; + index_writer.set_merge_policy(Box::new(NoMergePolicy)); + let long_text: String = iter::repeat("abcdefghijklmnopqrstuvwxyz") + .take(1_000) + .collect(); + for _ in 0..20 { + index_writer.add_document(doc!(body=>long_text.clone())); + } + index_writer.commit()?; + index_writer.add_document(doc!(text=>"testb")); + for _ in 0..10 { + index_writer.add_document(doc!(text=>"testd", body=>long_text.clone())); + } + index_writer.commit()?; + index_writer.delete_term(Term::from_field_text(text, "testb")); + index_writer.commit()?; + let segment_ids = index.searchable_segment_ids()?; + block_on(index_writer.merge(&segment_ids))?; + let reader = index.reader()?; + let searcher = reader.searcher(); + assert_eq!(searcher.num_docs(), 30); + for i in 0..searcher.num_docs() as u32 { + let _doc = searcher.doc(DocAddress(0u32, i))?; + } + Ok(()) + } + #[test] fn test_skip_index_long() -> io::Result<()> { let mut output: Vec = Vec::new(); diff --git a/src/store/index/skip_index.rs b/src/store/index/skip_index.rs index c78acc96e..e8b431246 100644 --- a/src/store/index/skip_index.rs +++ b/src/store/index/skip_index.rs @@ -59,7 +59,6 @@ pub struct SkipIndex { } impl SkipIndex { - pub fn open(mut data: OwnedBytes) -> SkipIndex { let offsets: Vec = Vec::::deserialize(&mut data) .unwrap() @@ -108,6 +107,4 @@ impl SkipIndex { } Some(cur_checkpoint) } - - } diff --git a/src/store/index/skip_index_builder.rs b/src/store/index/skip_index_builder.rs index 0306c3568..6d46dabed 100644 --- a/src/store/index/skip_index_builder.rs +++ b/src/store/index/skip_index_builder.rs @@ -28,18 +28,20 @@ impl LayerBuilder { /// /// If the block was empty to begin with, simply return None. fn flush_block(&mut self) -> Option { - self.block.doc_interval().map(|(start_doc, end_doc)| { + if let Some((start_doc, end_doc)) = self.block.doc_interval() { let start_offset = self.buffer.len() as u64; self.block.serialize(&mut self.buffer); let end_offset = self.buffer.len() as u64; self.block.clear(); - Checkpoint { + Some(Checkpoint { start_doc, end_doc, start_offset, end_offset, - } - }) + }) + } else { + None + } } fn push(&mut self, checkpoint: Checkpoint) { @@ -48,7 +50,7 @@ impl LayerBuilder { fn insert(&mut self, checkpoint: Checkpoint) -> Option { self.push(checkpoint); - let emit_skip_info = (self.block.len() % CHECKPOINT_PERIOD) == 0; + let emit_skip_info = self.block.len() >= CHECKPOINT_PERIOD; if emit_skip_info { self.flush_block() } else { diff --git a/src/store/writer.rs b/src/store/writer.rs index 19592f9a0..3309f1a64 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -72,6 +72,7 @@ impl StoreWriter { if !self.current_block.is_empty() { self.write_and_compress_block()?; } + assert_eq!(self.first_doc_in_block, self.doc); let doc_shift = self.doc; let start_shift = self.writer.written_bytes() as u64; @@ -86,12 +87,17 @@ impl StoreWriter { checkpoint.end_doc += doc_shift; checkpoint.start_offset += start_shift; checkpoint.end_offset += start_shift; - self.offset_index_writer.insert(checkpoint); - self.doc = checkpoint.end_doc; + self.register_checkpoint(checkpoint); } Ok(()) } + fn register_checkpoint(&mut self, checkpoint: Checkpoint) { + self.offset_index_writer.insert(checkpoint); + self.first_doc_in_block = checkpoint.end_doc; + self.doc = checkpoint.end_doc; + } + fn write_and_compress_block(&mut self) -> io::Result<()> { assert!(self.doc > 0); self.intermediary_buffer.clear(); @@ -100,14 +106,13 @@ impl StoreWriter { self.writer.write_all(&self.intermediary_buffer)?; let end_offset = self.writer.written_bytes(); let end_doc = self.doc; - self.offset_index_writer.insert(Checkpoint { + self.register_checkpoint(Checkpoint { start_doc: self.first_doc_in_block, end_doc, start_offset, end_offset, }); self.current_block.clear(); - self.first_doc_in_block = self.doc; Ok(()) }