From 96f24b078e0b824387852858427eaf9785801ba6 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Thu, 7 Jan 2021 22:43:28 +0900 Subject: [PATCH 1/3] Added failing unit test. --- src/functional_test.rs | 9 +++++--- src/store/index/mod.rs | 41 +++++++++++++++++++++++++++++++++-- src/store/index/skip_index.rs | 3 --- 3 files changed, 45 insertions(+), 8 deletions(-) diff --git a/src/functional_test.rs b/src/functional_test.rs index 8c93bbb91..478a99686 100644 --- a/src/functional_test.rs +++ b/src/functional_test.rs @@ -1,9 +1,9 @@ -use std::collections::HashSet; -use rand::thread_rng; use crate::Index; use crate::Searcher; use crate::{doc, schema::*}; +use rand::thread_rng; use rand::Rng; +use std::collections::HashSet; fn check_index_content(searcher: &Searcher, vals: &[u64]) -> crate::Result<()> { assert!(searcher.segment_readers().len() < 20); @@ -84,7 +84,10 @@ fn test_functional_indexing() -> crate::Result<()> { reader.reload()?; let searcher = reader.searcher(); // check that everything is correct. - check_index_content(&searcher, &committed_docs.iter().cloned().collect::>())?; + check_index_content( + &searcher, + &committed_docs.iter().cloned().collect::>(), + )?; } else { if committed_docs.remove(&random_val) || uncommitted_docs.remove(&random_val) { let doc_id_term = Term::from_field_u64(id_field, random_val); diff --git a/src/store/index/mod.rs b/src/store/index/mod.rs index f0fe8fe2e..471848c12 100644 --- a/src/store/index/mod.rs +++ b/src/store/index/mod.rs @@ -39,13 +39,16 @@ impl fmt::Debug for Checkpoint { #[cfg(test)] mod tests { - use std::io; + use std::{io, iter}; + use futures::executor::block_on; use proptest::strategy::{BoxedStrategy, Strategy}; use crate::directory::OwnedBytes; + use crate::indexer::NoMergePolicy; + use crate::schema::{SchemaBuilder, STORED, STRING}; use crate::store::index::Checkpoint; - use crate::DocId; + use crate::{DocAddress, DocId, Index, Term}; use super::{SkipIndex, SkipIndexBuilder}; @@ -133,6 +136,40 @@ mod tests { (doc as u64) * (doc as u64) } + #[test] + fn test_merge_store_with_stacking() -> crate::Result<()> { + let mut schema_builder = SchemaBuilder::default(); + let text = schema_builder.add_text_field("text", STORED | STRING); + let body = schema_builder.add_text_field("body", STORED); + let schema = schema_builder.build(); + let index = Index::create_in_ram(schema); + let mut index_writer = index.writer_for_tests()?; + index_writer.set_merge_policy(Box::new(NoMergePolicy)); + let long_text: String = iter::repeat("abcdefghijklmnopqrstuvwxyz") + .take(1_000) + .collect(); + for _ in 0..20 { + index_writer.add_document(doc!(body=>long_text.clone())); + } + index_writer.commit()?; + index_writer.add_document(doc!(text=>"testb")); + for _ in 0..10 { + index_writer.add_document(doc!(text=>"testd", body=>long_text.clone())); + } + index_writer.commit()?; + index_writer.delete_term(Term::from_field_text(text, "testb")); + index_writer.commit()?; + let segment_ids = index.searchable_segment_ids()?; + block_on(index_writer.merge(&segment_ids))?; + let reader = index.reader()?; + let searcher = reader.searcher(); + assert_eq!(searcher.num_docs(), 30); + for i in 0..searcher.num_docs() as u32 { + let _doc = searcher.doc(DocAddress(0u32, i))?; + } + Ok(()) + } + #[test] fn test_skip_index_long() -> io::Result<()> { let mut output: Vec = Vec::new(); diff --git a/src/store/index/skip_index.rs b/src/store/index/skip_index.rs index c78acc96e..e8b431246 100644 --- a/src/store/index/skip_index.rs +++ b/src/store/index/skip_index.rs @@ -59,7 +59,6 @@ pub struct SkipIndex { } impl SkipIndex { - pub fn open(mut data: OwnedBytes) -> SkipIndex { let offsets: Vec = Vec::::deserialize(&mut data) .unwrap() @@ -108,6 +107,4 @@ impl SkipIndex { } Some(cur_checkpoint) } - - } From caf2a38b7e4c107ed9be02c348302c1834209fa4 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Thu, 7 Jan 2021 18:36:12 +0900 Subject: [PATCH 2/3] Closes #969. The segment stacking optimization is not updating "first_doc_in_block". --- src/store/index/block.rs | 3 +++ src/store/index/mod.rs | 7 +++++++ src/store/index/skip_index_builder.rs | 15 +++++++++------ src/store/writer.rs | 13 +++++++++---- 4 files changed, 28 insertions(+), 10 deletions(-) diff --git a/src/store/index/block.rs b/src/store/index/block.rs index 4c9eef490..33785748c 100644 --- a/src/store/index/block.rs +++ b/src/store/index/block.rs @@ -43,6 +43,9 @@ impl CheckpointBlock { /// Adding another checkpoint in the block. pub fn push(&mut self, checkpoint: Checkpoint) { + if let Some(prev_checkpoint) = self.checkpoints.last() { + assert!(checkpoint.follows(prev_checkpoint)); + } self.checkpoints.push(checkpoint); } diff --git a/src/store/index/mod.rs b/src/store/index/mod.rs index 471848c12..ab3bc89d4 100644 --- a/src/store/index/mod.rs +++ b/src/store/index/mod.rs @@ -26,6 +26,13 @@ pub struct Checkpoint { pub end_offset: u64, } +impl Checkpoint { + pub(crate) fn follows(&self, other: &Checkpoint) -> bool { + (self.start_doc == other.end_doc) && + (self.start_offset == other.end_offset) + } +} + impl fmt::Debug for Checkpoint { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!( diff --git a/src/store/index/skip_index_builder.rs b/src/store/index/skip_index_builder.rs index 0306c3568..b873aab96 100644 --- a/src/store/index/skip_index_builder.rs +++ b/src/store/index/skip_index_builder.rs @@ -28,18 +28,20 @@ impl LayerBuilder { /// /// If the block was empty to begin with, simply return None. fn flush_block(&mut self) -> Option { - self.block.doc_interval().map(|(start_doc, end_doc)| { + if let Some((start_doc, end_doc)) = self.block.doc_interval() { let start_offset = self.buffer.len() as u64; self.block.serialize(&mut self.buffer); let end_offset = self.buffer.len() as u64; self.block.clear(); - Checkpoint { + Some(Checkpoint { start_doc, end_doc, start_offset, end_offset, - } - }) + }) + } else { + None + } } fn push(&mut self, checkpoint: Checkpoint) { @@ -48,7 +50,7 @@ impl LayerBuilder { fn insert(&mut self, checkpoint: Checkpoint) -> Option { self.push(checkpoint); - let emit_skip_info = (self.block.len() % CHECKPOINT_PERIOD) == 0; + let emit_skip_info = self.block.len() >= CHECKPOINT_PERIOD; if emit_skip_info { self.flush_block() } else { @@ -57,13 +59,14 @@ impl LayerBuilder { } } +#[derive(Default)] pub struct SkipIndexBuilder { layers: Vec, } impl SkipIndexBuilder { pub fn new() -> SkipIndexBuilder { - SkipIndexBuilder { layers: Vec::new() } + Self::default() } fn get_layer(&mut self, layer_id: usize) -> &mut LayerBuilder { diff --git a/src/store/writer.rs b/src/store/writer.rs index 19592f9a0..3309f1a64 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -72,6 +72,7 @@ impl StoreWriter { if !self.current_block.is_empty() { self.write_and_compress_block()?; } + assert_eq!(self.first_doc_in_block, self.doc); let doc_shift = self.doc; let start_shift = self.writer.written_bytes() as u64; @@ -86,12 +87,17 @@ impl StoreWriter { checkpoint.end_doc += doc_shift; checkpoint.start_offset += start_shift; checkpoint.end_offset += start_shift; - self.offset_index_writer.insert(checkpoint); - self.doc = checkpoint.end_doc; + self.register_checkpoint(checkpoint); } Ok(()) } + fn register_checkpoint(&mut self, checkpoint: Checkpoint) { + self.offset_index_writer.insert(checkpoint); + self.first_doc_in_block = checkpoint.end_doc; + self.doc = checkpoint.end_doc; + } + fn write_and_compress_block(&mut self) -> io::Result<()> { assert!(self.doc > 0); self.intermediary_buffer.clear(); @@ -100,14 +106,13 @@ impl StoreWriter { self.writer.write_all(&self.intermediary_buffer)?; let end_offset = self.writer.written_bytes(); let end_doc = self.doc; - self.offset_index_writer.insert(Checkpoint { + self.register_checkpoint(Checkpoint { start_doc: self.first_doc_in_block, end_doc, start_offset, end_offset, }); self.current_block.clear(); - self.first_doc_in_block = self.doc; Ok(()) } From 203b0256a3d6b6968164320cc2305e03bb6379cf Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Thu, 7 Jan 2021 22:45:03 +0900 Subject: [PATCH 3/3] Minor renaming --- src/store/index/mod.rs | 2 +- src/store/index/skip_index_builder.rs | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/store/index/mod.rs b/src/store/index/mod.rs index ab3bc89d4..708b6fbfe 100644 --- a/src/store/index/mod.rs +++ b/src/store/index/mod.rs @@ -144,7 +144,7 @@ mod tests { } #[test] - fn test_merge_store_with_stacking() -> crate::Result<()> { + fn test_merge_store_with_stacking_reproducing_issue969() -> crate::Result<()> { let mut schema_builder = SchemaBuilder::default(); let text = schema_builder.add_text_field("text", STORED | STRING); let body = schema_builder.add_text_field("body", STORED); diff --git a/src/store/index/skip_index_builder.rs b/src/store/index/skip_index_builder.rs index b873aab96..6d46dabed 100644 --- a/src/store/index/skip_index_builder.rs +++ b/src/store/index/skip_index_builder.rs @@ -59,14 +59,13 @@ impl LayerBuilder { } } -#[derive(Default)] pub struct SkipIndexBuilder { layers: Vec, } impl SkipIndexBuilder { pub fn new() -> SkipIndexBuilder { - Self::default() + SkipIndexBuilder { layers: Vec::new() } } fn get_layer(&mut self, layer_id: usize) -> &mut LayerBuilder {