From e337c35721098d889affb1555ec7513b31fc3786 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Tue, 7 Feb 2017 22:42:05 +0900 Subject: [PATCH] issue/43 SegmentMeta refactoring --- src/core/index.rs | 18 +++------ src/core/segment.rs | 30 +++++++-------- src/core/segment_meta.rs | 14 ++++--- src/core/segment_reader.rs | 17 +++++---- src/indexer/index_writer.rs | 68 +++++++++++++++++++-------------- src/indexer/log_merge_policy.rs | 51 +++++++++++++++---------- src/indexer/segment_register.rs | 6 +-- src/indexer/segment_updater.rs | 22 +++++------ src/postings/mod.rs | 4 +- 9 files changed, 121 insertions(+), 109 deletions(-) diff --git a/src/core/index.rs b/src/core/index.rs index 936087dd5..b287bd429 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -186,7 +186,7 @@ impl Index { let metas = load_metas(self.directory())?; Ok(metas .committed_segments - .iter() + .into_iter() .map(|segment_meta| self.segment(segment_meta)) .collect()) } @@ -200,20 +200,14 @@ impl Index { delete_segment(self.directory(), segment_id); } - /// Return a segment object given a `segment_id` - /// - /// The segment may or may not exist. - // pub fn segment(&self, segment_id: SegmentId, opstamp: u64) -> Segment { - // (self.clone(), segment_id, opstamp) - // } - - pub fn segment(&self, segment_meta: &SegmentMeta) -> Segment { - create_segment(self.clone(), segment_meta.segment_id, segment_meta.opstamp) + pub fn segment(&self, segment_meta: SegmentMeta) -> Segment { + create_segment(self.clone(), segment_meta) } /// Creates a new segment. - pub fn new_segment(&self, opstamp: u64) -> Segment { - create_segment(self.clone(), SegmentId::generate_random(), opstamp) + pub fn new_segment(&self) -> Segment { + let segment_meta = SegmentMeta::new(SegmentId::generate_random()); + create_segment(self.clone(), segment_meta) } /// Return a reference to the index directory. diff --git a/src/core/segment.rs b/src/core/segment.rs index 2eb750c58..2dcb774ba 100644 --- a/src/core/segment.rs +++ b/src/core/segment.rs @@ -9,30 +9,29 @@ use indexer::segment_serializer::SegmentSerializer; use super::SegmentComponent; use core::Index; use std::result; +use core::SegmentMeta; use directory::error::{FileError, OpenWriteError}; /// A segment is a piece of the index. #[derive(Clone)] pub struct Segment { index: Index, - segment_id: SegmentId, - opstamp: u64, + meta: SegmentMeta, } impl fmt::Debug for Segment { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "Segment({:?})", self.segment_id.uuid_string()) + write!(f, "Segment({:?})", self.id().uuid_string()) } } /// Creates a new segment given an `Index` and a `SegmentId` /// /// The function is here to make it private outside `tantivy`. -pub fn create_segment(index: Index, segment_id: SegmentId, opstamp: u64) -> Segment { +pub fn create_segment(index: Index, meta: SegmentMeta) -> Segment { Segment { index: index, - segment_id: segment_id, - opstamp: opstamp, + meta: meta, } } @@ -43,20 +42,21 @@ impl Segment { self.index.schema() } - pub fn opstamp(&self) -> u64 { - self.opstamp + pub fn meta(&self,) -> &SegmentMeta { + &self.meta } /// Returns the segment's id. pub fn id(&self,) -> SegmentId { - self.segment_id + self.meta.segment_id } - pub fn with_opstamp(&self, opstamp: u64) -> Segment { + pub fn with_delete_opstamp(self, opstamp: u64) -> Segment { + let mut meta = self.meta; + meta.delete_opstamp = Some(opstamp); Segment { - index: self.index.clone(), - segment_id: self.segment_id.clone(), - opstamp: opstamp, + index: self.index, + meta: meta, } } @@ -66,7 +66,7 @@ impl Segment { /// associated to a segment component. pub fn relative_path(&self, component: SegmentComponent) -> PathBuf { use self::SegmentComponent::*; - let mut path = self.segment_id.uuid_string(); + let mut path = self.id().uuid_string(); path.push_str(&*match component { POSITIONS => ".pos".to_string(), INFO => ".info".to_string(), @@ -75,7 +75,7 @@ impl Segment { STORE => ".store".to_string(), FASTFIELDS => ".fast".to_string(), FIELDNORMS => ".fieldnorm".to_string(), - DELETE => {format!(".{}.del", self.opstamp)}, + DELETE => {format!(".{}.del", self.meta.delete_opstamp.unwrap_or(0))}, }); PathBuf::from(path) } diff --git a/src/core/segment_meta.rs b/src/core/segment_meta.rs index 3d001e896..d8e9f8e6d 100644 --- a/src/core/segment_meta.rs +++ b/src/core/segment_meta.rs @@ -1,21 +1,23 @@ use core::SegmentId; + +// TODO Option + #[derive(Clone, Debug, RustcDecodable,RustcEncodable)] pub struct SegmentMeta { pub segment_id: SegmentId, pub num_docs: u32, pub num_deleted_docs: u32, - pub opstamp: u64, + pub delete_opstamp: Option, } -#[cfg(test)] impl SegmentMeta { - pub fn new(segment_id: SegmentId, num_docs: u32) -> SegmentMeta { + pub fn new(segment_id: SegmentId) -> SegmentMeta { SegmentMeta { segment_id: segment_id, - num_docs: num_docs, + num_docs: 0, num_deleted_docs: 0, - opstamp: 0u64, + delete_opstamp: None, } } -} \ No newline at end of file +} diff --git a/src/core/segment_reader.rs b/src/core/segment_reader.rs index f2ed4e7a4..af0e648a8 100644 --- a/src/core/segment_reader.rs +++ b/src/core/segment_reader.rs @@ -147,14 +147,15 @@ impl SegmentReader { .unwrap_or_else(|_| ReadOnlySource::empty()); // TODO 0u64 - let delete_data_res = segment.open_read(SegmentComponent::DELETE); - let delete_bitset; - if let Err(FileError::FileDoesNotExist(_)) = delete_data_res { - delete_bitset = DeleteBitSet::empty(); - } - else { - delete_bitset = DeleteBitSet::open(delete_data_res?); - } + let delete_bitset = + if segment.meta().delete_opstamp.is_some() { + let delete_data = segment.open_read(SegmentComponent::DELETE)?; + DeleteBitSet::open(delete_data) + } + else { + DeleteBitSet::empty() + }; + let schema = segment.schema(); Ok(SegmentReader { segment_info: segment_info, diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index dcc9126af..efc20d171 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -116,16 +116,11 @@ impl DocToOpstampMapping { pub fn advance_deletes( segment: &Segment, delete_cursor: &mut DeleteQueueCursor, - doc_opstamps: DocToOpstampMapping) -> Result<(u64, BitSet)> { + doc_opstamps: DocToOpstampMapping) -> Result> { let segment_reader = SegmentReader::open(segment.clone())?; - let mut delete_bitset = BitSet::new(); - for doc in 0u32..segment_reader.max_doc() { - if segment_reader.is_deleted(doc) { - delete_bitset.insert(doc as usize); - } - } - let mut has_changed = false; - let mut last_opstamp = segment.opstamp();//segment + let mut delete_bitset = BitSet::with_capacity(segment_reader.max_doc() as usize); + + let mut last_opstamp_opt: Option = None; for delete_op in delete_cursor { // A delete operation should only affect // document that were inserted after it. @@ -135,17 +130,26 @@ pub fn advance_deletes( let limit_doc = doc_opstamps.compute_doc_limit(delete_op.opstamp); if let Some(mut docset) = segment_reader.read_postings(&delete_op.term, SegmentPostingsOption::NoFreq) { while docset.advance() { - has_changed = true; let deleted_doc = docset.doc(); if deleted_doc < limit_doc { - has_changed = true; delete_bitset.insert(deleted_doc as usize); } } + last_opstamp_opt = Some(delete_op.opstamp); } - last_opstamp = delete_op.opstamp; } - Ok((last_opstamp, delete_bitset)) + + if let Some(last_opstamp) = last_opstamp_opt { + for doc in 0u32..segment_reader.max_doc() { + if segment_reader.is_deleted(doc) { + delete_bitset.insert(doc as usize); + } + } + Ok(Some((last_opstamp, delete_bitset))) + } + else { + Ok(None) + } } fn index_documents(heap: &mut Heap, @@ -175,20 +179,25 @@ fn index_documents(heap: &mut Heap, let doc_opstamps: Vec = segment_writer.finalize()?; - let (last_opstamp_after_deletes, deleted_docset) = advance_deletes(&segment, delete_cursor, DocToOpstampMapping::WithMap(doc_opstamps))?; - - { - let mut delete_file = segment.with_opstamp(last_opstamp_after_deletes).open_write(SegmentComponent::DELETE)?; - write_delete_bitset(&deleted_docset, &mut delete_file)?; - } - let num_deleted_docs = deleted_docset.len() as DocId; - - let segment_meta = SegmentMeta { - segment_id: segment_id, - num_docs: num_docs, - num_deleted_docs: num_deleted_docs, - opstamp: last_opstamp_after_deletes, - }; + let segment_meta = + if let Some((last_opstamp_after_deletes, deleted_docset)) = advance_deletes(&segment, delete_cursor, DocToOpstampMapping::WithMap(doc_opstamps))? { + let mut delete_file = segment.with_delete_opstamp(last_opstamp_after_deletes).open_write(SegmentComponent::DELETE)?; + write_delete_bitset(&deleted_docset, &mut delete_file)?; + SegmentMeta { + segment_id: segment_id, + num_docs: num_docs, + num_deleted_docs: deleted_docset.len() as DocId, + delete_opstamp: Some(last_opstamp_after_deletes), + } + } + else { + SegmentMeta { + segment_id: segment_id, + num_docs: num_docs, + num_deleted_docs: 0, + delete_opstamp: None, + } + }; let segment_entry = SegmentEntry::new(segment_meta, delete_cursor.clone()); @@ -252,6 +261,7 @@ impl IndexWriter { let mut document_iterator = document_receiver_clone.clone() .into_iter() .peekable(); + // the peeking here is to avoid // creating a new segment's files // if no document are available. @@ -269,8 +279,8 @@ impl IndexWriter { // was dropped. return Ok(()) } - - let segment = index.new_segment(opstamp); + + let segment = index.new_segment(); let valid_generation = index_documents(&mut heap, segment, &schema, diff --git a/src/indexer/log_merge_policy.rs b/src/indexer/log_merge_policy.rs index 3eebdf78f..cc4cd56bc 100644 --- a/src/indexer/log_merge_policy.rs +++ b/src/indexer/log_merge_policy.rs @@ -120,11 +120,20 @@ mod tests { assert!(result_list.is_empty()); } + fn seg_meta(num_docs: u32) -> SegmentMeta { + SegmentMeta { + segment_id: SegmentId::generate_random(), + num_docs: num_docs, + num_deleted_docs: 0u32, + delete_opstamp: None, + } + } + #[test] fn test_log_merge_policy_pair() { - let test_input = vec![SegmentMeta::new(SegmentId::generate_random(), 10), - SegmentMeta::new(SegmentId::generate_random(), 10), - SegmentMeta::new(SegmentId::generate_random(), 10)]; + let test_input = vec![seg_meta(10), + seg_meta(10), + seg_meta(10)]; let result_list = test_merge_policy().compute_merge_candidates(&test_input); assert_eq!(result_list.len(), 1); } @@ -132,12 +141,12 @@ mod tests { #[test] fn test_log_merge_policy_levels() { // multiple levels all get merged correctly - let test_input = vec![SegmentMeta::new(SegmentId::generate_random(), 10), - SegmentMeta::new(SegmentId::generate_random(), 10), - SegmentMeta::new(SegmentId::generate_random(), 10), - SegmentMeta::new(SegmentId::generate_random(), 1000), - SegmentMeta::new(SegmentId::generate_random(), 1000), - SegmentMeta::new(SegmentId::generate_random(), 1000)]; + let test_input = vec![seg_meta(10), + seg_meta(10), + seg_meta(10), + seg_meta(1000), + seg_meta(1000), + seg_meta(1000)]; let result_list = test_merge_policy().compute_merge_candidates(&test_input); assert_eq!(result_list.len(), 2); } @@ -145,24 +154,24 @@ mod tests { #[test] fn test_log_merge_policy_within_levels() { // multiple levels all get merged correctly - let test_input = vec![SegmentMeta::new(SegmentId::generate_random(), 10), - SegmentMeta::new(SegmentId::generate_random(), 11), - SegmentMeta::new(SegmentId::generate_random(), 12), - SegmentMeta::new(SegmentId::generate_random(), 1000), - SegmentMeta::new(SegmentId::generate_random(), 1000), - SegmentMeta::new(SegmentId::generate_random(), 1000)]; + let test_input = vec![seg_meta(10), + seg_meta(11), + seg_meta(12), + seg_meta(1000), + seg_meta(1000), + seg_meta(1000)]; let result_list = test_merge_policy().compute_merge_candidates(&test_input); assert_eq!(result_list.len(), 2); } #[test] fn test_log_merge_policy_small_segments() { // multiple levels all get merged correctly - let test_input = vec![SegmentMeta::new(SegmentId::generate_random(), 1), - SegmentMeta::new(SegmentId::generate_random(), 1), - SegmentMeta::new(SegmentId::generate_random(), 1), - SegmentMeta::new(SegmentId::generate_random(), 2), - SegmentMeta::new(SegmentId::generate_random(), 2), - SegmentMeta::new(SegmentId::generate_random(), 2)]; + let test_input = vec![seg_meta(1), + seg_meta(1), + seg_meta(1), + seg_meta(2), + seg_meta(2), + seg_meta(2)]; let result_list = test_merge_policy().compute_merge_candidates(&test_input); assert_eq!(result_list.len(), 1); } diff --git a/src/indexer/segment_register.rs b/src/indexer/segment_register.rs index eb28ae5c4..ce3271efa 100644 --- a/src/indexer/segment_register.rs +++ b/src/indexer/segment_register.rs @@ -195,14 +195,14 @@ mod tests { let segment_id_merged = SegmentId::generate_random(); { - let segment_meta = SegmentMeta::new(segment_id_a, 10); + let segment_meta = SegmentMeta::new(segment_id_a); let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor()); segment_register.add_segment_entry(segment_entry); } assert_eq!(segment_register.segment_entry(&segment_id_a).unwrap().state, SegmentState::Ready); assert_eq!(segment_register.segment_ids(), vec!(segment_id_a)); { - let segment_meta = SegmentMeta::new(segment_id_b, 20); + let segment_meta = SegmentMeta::new(segment_id_b); let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor()); segment_register.add_segment_entry(segment_entry); } @@ -214,7 +214,7 @@ mod tests { segment_register.remove_segment(&segment_id_a); segment_register.remove_segment(&segment_id_b); { - let segment_meta_merged = SegmentMeta::new(segment_id_merged, 10 + 20); + let segment_meta_merged = SegmentMeta::new(segment_id_merged); let segment_entry = SegmentEntry::new(segment_meta_merged, delete_queue.cursor()); segment_register.add_segment_entry(segment_entry); } diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index 1c518dc98..b6f129323 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -177,13 +177,13 @@ impl SegmentUpdater { fn purge_deletes(&self, target_opstamp: u64) -> Result<()> { let uncommitted = self.0.segment_manager.segment_entries(); for mut segment_entry in uncommitted { - let mut segment = self.0.index.segment(segment_entry.meta()); - let (_, deleted_docset) = advance_deletes( + let mut segment = self.0.index.segment(segment_entry.meta().clone()); + if let Some((_, deleted_docset)) = advance_deletes( &segment, segment_entry.delete_cursor(), - DocToOpstampMapping::None).unwrap(); - { - let mut delete_file = segment.with_opstamp(target_opstamp).open_write(SegmentComponent::DELETE)?; + DocToOpstampMapping::None).unwrap() + { + let mut delete_file = segment.with_delete_opstamp(target_opstamp).open_write(SegmentComponent::DELETE)?; write_delete_bitset(&deleted_docset, &mut delete_file)?; } } @@ -237,20 +237,16 @@ impl SegmentUpdater { let segments: Vec = segment_metas .iter() - .map(|ref segment_meta| index.segment(segment_meta)) + .cloned() + .map(|segment_meta| index.segment(segment_meta)) .collect(); // An IndexMerger is like a "view" of our merged segments. // TODO unwrap let merger: IndexMerger = IndexMerger::open(schema, &segments[..]).expect("Creating index merger failed"); - let opstamp = segment_metas - .iter() - .map(|meta| meta.opstamp) - .max() - .unwrap(); - let mut merged_segment = index.new_segment(opstamp); + let mut merged_segment = index.new_segment(); // ... we just serialize this index merger in our new segment // to merge the two segments. @@ -260,7 +256,7 @@ impl SegmentUpdater { segment_id: merged_segment.id(), num_docs: num_docs, num_deleted_docs: 0u32, - opstamp: opstamp, + delete_opstamp: None, // TODO fix delete_opstamp }; // TODO fix delete cursor diff --git a/src/postings/mod.rs b/src/postings/mod.rs index b7676710f..f9898b9fc 100644 --- a/src/postings/mod.rs +++ b/src/postings/mod.rs @@ -61,7 +61,7 @@ mod tests { let text_field = schema_builder.add_text_field("text", TEXT); let schema = schema_builder.build(); let index = Index::create_in_ram(schema); - let mut segment = index.new_segment(0u64); + let mut segment = index.new_segment(); let mut posting_serializer = PostingsSerializer::open(&mut segment).unwrap(); let term = Term::from_field_text(text_field, "abc"); posting_serializer.new_term(&term, 3).unwrap(); @@ -81,7 +81,7 @@ mod tests { let text_field = schema_builder.add_text_field("text", TEXT); let schema = schema_builder.build(); let index = Index::create_in_ram(schema.clone()); - let segment = index.new_segment(0u64); + let segment = index.new_segment(); let heap = Heap::with_capacity(10_000_000); { let mut segment_writer = SegmentWriter::for_segment(&heap, segment.clone(), &schema).unwrap();