From 09782858daea57d08b4ea3da620e629d01b4a84a Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Wed, 1 Feb 2017 10:06:32 +0900 Subject: [PATCH] issue/43 Segment have a commit opstamp --- src/core/index.rs | 78 ++++++++++++++++++++++++++------- src/core/index_meta.rs | 4 +- src/core/segment.rs | 42 ++---------------- src/core/segment_meta.rs | 2 + src/error.rs | 2 +- src/indexer/index_writer.rs | 41 +++++++++-------- src/indexer/segment_manager.rs | 19 +++++--- src/indexer/segment_register.rs | 5 ++- src/indexer/segment_updater.rs | 66 +++++++++++++++++++--------- src/postings/mod.rs | 4 +- 10 files changed, 157 insertions(+), 106 deletions(-) diff --git a/src/core/index.rs b/src/core/index.rs index 05296188a..6b62763bd 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -21,6 +21,7 @@ use core::IndexMeta; use core::META_FILEPATH; use super::segment::create_segment; use indexer::segment_updater::save_new_metas; +use directory::error::{FileError, OpenWriteError}; const NUM_SEARCHERS: usize = 12; @@ -41,6 +42,45 @@ pub struct Index { docstamp: u64, } + + + +/// Deletes all of the document of the segment. +/// This is called when there is a merge or a rollback. +/// +/// # Disclaimer +/// If deletion of a file fails (e.g. a file +/// was read-only.), the method does not +/// fail and just logs an error when it fails. +pub fn delete_segment(directory: &Directory, segment_id: SegmentId) { + info!("Deleting segment {:?}", segment_id); + let segment_filepaths_res = directory.ls_starting_with( + &*segment_id.uuid_string() + ); + + match segment_filepaths_res { + Ok(segment_filepaths) => { + for segment_filepath in &segment_filepaths { + if let Err(err) = directory.delete(&segment_filepath) { + match err { + FileError::FileDoesNotExist(_) => { + // this is normal behavior. + // the position file for instance may not exists. + } + FileError::IOError(err) => { + error!("Failed to remove {:?} : {:?}", segment_id, err); + } + } + } + } + } + Err(_) => { + error!("Failed to list files of segment {:?} for deletion.", segment_id.uuid_string()); + } + } +} + + impl Index { /// Creates a new index using the `RAMDirectory`. /// @@ -76,7 +116,7 @@ impl Index { /// Creates a new index given a directory and an `IndexMeta`. fn create_from_metas(directory: Box, metas: IndexMeta) -> Result { let schema = metas.schema.clone(); - let docstamp = metas.docstamp; + let docstamp = metas.opstamp; // TODO log somethings is uncommitted is not empty. let index = Index { directory: directory, @@ -143,27 +183,33 @@ impl Index { /// Returns the list of segments that are searchable pub fn searchable_segments(&self) -> Result> { - let searchable_segment_ids = self.searchable_segment_ids()?; + let metas = load_metas(self.directory())?; + let searchable_segment_ids = metas + .committed_segments + .iter() + .map(|segment_meta| segment_meta.segment_id) + .collect::>(); + let commit_opstamp = metas.opstamp; Ok(searchable_segment_ids .into_iter() - .map(|segment_id| self.segment(segment_id)) + .map(|segment_id| self.segment(segment_id, commit_opstamp)) .collect()) } - + /// Remove all of the file associated with the segment. /// /// This method cannot fail. If a problem occurs, /// some files may end up never being removed. /// The error will only be logged. pub fn delete_segment(&self, segment_id: SegmentId) { - self.segment(segment_id).delete(); + delete_segment(self.directory(), segment_id); } /// Return a segment object given a `segment_id` /// /// The segment may or may not exist. - pub fn segment(&self, segment_id: SegmentId) -> Segment { - create_segment(self.clone(), segment_id) + pub fn segment(&self, segment_id: SegmentId, commit_opstamp: u64) -> Segment { + create_segment(self.clone(), segment_id, commit_opstamp) } /// Return a reference to the index directory. @@ -179,24 +225,22 @@ impl Index { /// Reads the meta.json and returns the list of /// committed segments. pub fn committed_segments(&self) -> Result> { + Ok(load_metas(self.directory())?.committed_segments) } /// Returns the list of segment ids that are searchable. pub fn searchable_segment_ids(&self) -> Result> { - self.committed_segments() - .map(|commited_segments| { - commited_segments - .iter() - .map(|segment_meta| segment_meta.segment_id) - .collect() - }) - + Ok(load_metas(self.directory())? + .committed_segments + .iter() + .map(|segment_meta| segment_meta.segment_id) + .collect()) } /// Creates a new segment. - pub fn new_segment(&self) -> Segment { - self.segment(SegmentId::generate_random()) + pub fn new_segment(&self, opstamp: u64) -> Segment { + self.segment(SegmentId::generate_random(), opstamp) } /// Creates a new generation of searchers after diff --git a/src/core/index_meta.rs b/src/core/index_meta.rs index c6d7f4bc5..c97ed9570 100644 --- a/src/core/index_meta.rs +++ b/src/core/index_meta.rs @@ -14,7 +14,7 @@ pub struct IndexMeta { pub committed_segments: Vec, pub uncommitted_segments: Vec, pub schema: Schema, - pub docstamp: u64, + pub opstamp: u64, } impl IndexMeta { @@ -23,7 +23,7 @@ impl IndexMeta { committed_segments: Vec::new(), uncommitted_segments: Vec::new(), schema: schema, - docstamp: 0u64, + opstamp: 0u64, } } } diff --git a/src/core/segment.rs b/src/core/segment.rs index aab25574c..4e719c0e3 100644 --- a/src/core/segment.rs +++ b/src/core/segment.rs @@ -11,13 +11,12 @@ use core::Index; use std::result; use directory::error::{FileError, OpenWriteError}; - - /// A segment is a piece of the index. #[derive(Clone)] pub struct Segment { index: Index, segment_id: SegmentId, + commit_opstamp: u64, } impl fmt::Debug for Segment { @@ -30,10 +29,11 @@ impl fmt::Debug for Segment { /// Creates a new segment given an `Index` and a `SegmentId` /// /// The function is here to make it private outside `tantivy`. -pub fn create_segment(index: Index, segment_id: SegmentId) -> Segment { +pub fn create_segment(index: Index, segment_id: SegmentId, commit_opstamp: u64) -> Segment { Segment { index: index, segment_id: segment_id, + commit_opstamp: commit_opstamp, } } @@ -59,42 +59,6 @@ impl Segment { self.segment_id.relative_path(component) } - /// Deletes all of the document of the segment. - /// This is called when there is a merge or a rollback. - /// - /// # Disclaimer - /// If deletion of a file fails (e.g. a file - /// was read-only.), the method does not - /// fail and just logs an error when it fails. - pub fn delete(&self) { - info!("Deleting segment {:?}", self.segment_id); - let segment_filepaths_res = self.index.directory().ls_starting_with( - &*self.segment_id.uuid_string() - ); - - match segment_filepaths_res { - Ok(segment_filepaths) => { - for segment_filepath in &segment_filepaths { - if let Err(err) = self.index.directory().delete(&segment_filepath) { - match err { - FileError::FileDoesNotExist(_) => { - // this is normal behavior. - // the position file for instance may not exists. - } - FileError::IOError(err) => { - error!("Failed to remove {:?} : {:?}", self.segment_id, err); - } - } - } - } - } - Err(_) => { - error!("Failed to list files of segment {:?} for deletion.", self.segment_id.uuid_string()); - } - } - } - - /// Open one of the component file for read. pub fn open_read(&self, component: SegmentComponent) -> result::Result { let path = self.relative_path(component); diff --git a/src/core/segment_meta.rs b/src/core/segment_meta.rs index ef7521818..3d001e896 100644 --- a/src/core/segment_meta.rs +++ b/src/core/segment_meta.rs @@ -5,6 +5,7 @@ pub struct SegmentMeta { pub segment_id: SegmentId, pub num_docs: u32, pub num_deleted_docs: u32, + pub opstamp: u64, } #[cfg(test)] @@ -14,6 +15,7 @@ impl SegmentMeta { segment_id: segment_id, num_docs: num_docs, num_deleted_docs: 0, + opstamp: 0u64, } } } \ No newline at end of file diff --git a/src/error.rs b/src/error.rs index 6699c7134..6a82c239d 100644 --- a/src/error.rs +++ b/src/error.rs @@ -32,7 +32,7 @@ pub enum Error { /// The data within is corrupted. /// /// For instance, it contains invalid JSON. - CorruptedFile(PathBuf, Box), + CorruptedFile(PathBuf, Box), /// Invalid argument was passed by the user. InvalidArgument(String), /// An Error happened in one of the thread diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index e4578c51f..84e375acf 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -125,6 +125,7 @@ fn index_documents(heap: &mut Heap, segment_id: segment_id, num_docs: num_docs, num_deleted_docs: num_deleted_docs as u32, + opstamp: last_opstamp, }; let segment_entry = SegmentEntry::new(segment_meta, delete_cursor.clone()); @@ -185,14 +186,13 @@ impl IndexWriter { let generation = self.generation; - let join_handle: JoinHandle> = try!(thread::Builder::new() + let join_handle: JoinHandle> = + thread::Builder::new() .name(format!("indexing thread {} for gen {}", self.worker_id, generation)) .spawn(move || { let mut delete_cursor_clone = delete_cursor.clone(); loop { - let segment = index.new_segment(); - let mut document_iterator = document_receiver_clone.clone() .into_iter() .peekable(); @@ -203,25 +203,28 @@ impl IndexWriter { // this is a valid guarantee as the // peeked document now belongs to // our local iterator. - if document_iterator.peek().is_some() { - let valid_generation = try!(index_documents(&mut heap, - segment, - &schema, - generation, - &mut document_iterator, - &mut segment_updater, - &mut delete_cursor_clone)); - if valid_generation { - return Ok(()); - } - } else { + let opstamp: u64; + if let Some(operation) = document_iterator.peek() { + opstamp = operation.opstamp; + } + else { // No more documents. // Happens when there is a commit, or if the `IndexWriter` // was dropped. - return Ok(()); + opstamp = 0u64; + return Ok(()) } + + let segment = index.new_segment(opstamp); + let valid_generation = index_documents(&mut heap, + segment, + &schema, + generation, + &mut document_iterator, + &mut segment_updater, + &mut delete_cursor_clone)?; } - })); + })?; self.worker_id += 1; self.workers_join_handle.push(join_handle); Ok(()) @@ -308,8 +311,8 @@ impl IndexWriter { } /// Merges a given list of segments - pub fn merge(&mut self, segments: &[SegmentId]) -> impl Future { - self.segment_updater.start_merge(segments.to_vec()) + pub fn merge(&mut self, segment_ids: &[SegmentId]) -> impl Future { + self.segment_updater.start_merge(segment_ids) } /// Closes the current document channel send. diff --git a/src/indexer/segment_manager.rs b/src/indexer/segment_manager.rs index 20ccfcdfa..41d0ded70 100644 --- a/src/indexer/segment_manager.rs +++ b/src/indexer/segment_manager.rs @@ -63,6 +63,14 @@ impl SegmentManager { }), } } + + pub fn segment_entry(&self, segment_id: &SegmentId) -> Option { + let registers = self.read(); + registers + .committed + .segment_entry(segment_id) + .or_else(|| registers.uncommitted.segment_entry(segment_id)) + } // Lock poisoning should never happen : // The lock is acquired and released within this class, @@ -113,16 +121,17 @@ impl SegmentManager { registers_lock.uncommitted.add_segment_entry(segment_entry); } - pub fn end_merge(&self, merged_segment_ids: &[SegmentId], merged_segment_entry: SegmentEntry) { + pub fn end_merge(&self, merged_segment_metas: &[SegmentMeta], merged_segment_entry: SegmentEntry) { let mut registers_lock = self.write(); - if registers_lock.uncommitted.contains_all(merged_segment_ids) { - for segment_id in merged_segment_ids { + let merged_segment_ids: Vec = merged_segment_metas.iter().map(|meta| meta.segment_id).collect(); + if registers_lock.uncommitted.contains_all(&merged_segment_ids) { + for segment_id in &merged_segment_ids { registers_lock.uncommitted.remove_segment(segment_id); } registers_lock.uncommitted.add_segment_entry(merged_segment_entry); } - else if registers_lock.committed.contains_all(merged_segment_ids) { - for segment_id in merged_segment_ids { + else if registers_lock.committed.contains_all(&merged_segment_ids) { + for segment_id in &merged_segment_ids { registers_lock.committed.remove_segment(segment_id); } registers_lock.committed.add_segment_entry(merged_segment_entry); diff --git a/src/indexer/segment_register.rs b/src/indexer/segment_register.rs index 8f8f20611..982e320c0 100644 --- a/src/indexer/segment_register.rs +++ b/src/indexer/segment_register.rs @@ -32,6 +32,10 @@ impl SegmentEntry { pub fn segment_id(&self) -> SegmentId { self.meta.segment_id } + + pub fn meta(&self) -> &SegmentMeta { + &self.meta + } fn start_merge(&mut self,) { self.state = SegmentState::InMerge; @@ -120,7 +124,6 @@ impl SegmentRegister { .collect() } - #[cfg(test)] pub fn segment_entry(&self, segment_id: &SegmentId) -> Option { self.segment_states .get(&segment_id) diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index 2622d97a1..d3e274d69 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -1,6 +1,7 @@ #![allow(for_kv_map)] use core::Index; +use Error; use core::Segment; use indexer::{MergePolicy, DefaultMergePolicy}; use core::SegmentId; @@ -35,13 +36,13 @@ use std::io::Write; use super::segment_manager::{SegmentManager, get_segments}; -fn create_metas(segment_manager: &SegmentManager, schema: Schema, docstamp: u64) -> IndexMeta { +fn create_metas(segment_manager: &SegmentManager, schema: Schema, opstamp: u64) -> IndexMeta { let (committed_segments, uncommitted_segments) = segment_manager.segment_metas(); IndexMeta { committed_segments: committed_segments, uncommitted_segments: uncommitted_segments, schema: schema, - docstamp: docstamp, + opstamp: opstamp, } } @@ -104,7 +105,7 @@ struct InnerSegmentUpdater { segment_manager: SegmentManager, merge_policy: RwLock>, merging_thread_id: AtomicUsize, - merging_threads: RwLock>>, + merging_threads: RwLock>>>, generation: AtomicUsize, } @@ -184,28 +185,52 @@ impl SegmentUpdater { } - pub fn start_merge(&self, segment_ids: Vec) -> impl Future { + pub fn start_merge(&self, segment_ids: &[SegmentId]) -> impl Future { - self.0.segment_manager.start_merge(&segment_ids); + self.0.segment_manager.start_merge(segment_ids); let segment_updater_clone = self.clone(); - + + let segment_ids_vec = segment_ids.to_vec(); + let merging_thread_id = self.get_merging_thread_id(); let (merging_future_send, merging_future_recv) = oneshot(); + + if segment_ids.is_empty() { + return merging_future_recv; + } + let merging_join_handle = thread::spawn(move || { - info!("Start merge: {:?}", segment_ids); + info!("Start merge: {:?}", segment_ids_vec); let ref index = segment_updater_clone.0.index; let schema = index.schema(); - let segments: Vec = segment_ids + let segment_metas: Vec = segment_ids_vec .iter() - .map(|&segment_id| index.segment(segment_id)) + .map(|segment_id| + segment_updater_clone.0.segment_manager + .segment_entry(segment_id) + .map(|segment_entry| segment_entry.meta().clone()) + .ok_or(Error::InvalidArgument(format!("Segment({:?}) does not exist anymore", segment_id))) + ) + .collect::>()?; + + let segments: Vec = segment_metas + .iter() + .map(|ref segment_metas| index.segment(segment_metas.segment_id, segment_metas.opstamp)) .collect(); // An IndexMerger is like a "view" of our merged segments. // TODO unwrap let merger: IndexMerger = IndexMerger::open(schema, &segments[..]).expect("Creating index merger failed"); - let mut merged_segment = index.new_segment(); + + let opstamp = segment_metas + .iter() + .map(|meta| meta.opstamp) + .max() + .unwrap(); + + let mut merged_segment = index.new_segment(opstamp); // ... we just serialize this index merger in our new segment // to merge the two segments. @@ -215,19 +240,20 @@ impl SegmentUpdater { segment_id: merged_segment.id(), num_docs: num_docs, num_deleted_docs: 0u32, + opstamp: opstamp, }; - + // TODO fix delete cursor let delete_queue = DeleteQueue::default(); let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor()); segment_updater_clone - .end_merge(segment_ids.clone(), segment_entry.clone()) + .end_merge(segment_metas.clone(), segment_entry.clone()) .wait() .unwrap(); merging_future_send.complete(segment_entry.clone()); segment_updater_clone.0.merging_threads.write().unwrap().remove(&merging_thread_id); - segment_entry + Ok(segment_entry) }); self.0.merging_threads.write().unwrap().insert(merging_thread_id, merging_join_handle); merging_future_recv @@ -242,26 +268,26 @@ impl SegmentUpdater { let mut merge_candidates = merge_policy.compute_merge_candidates(&uncommitted_segments); let committed_merge_candidates = merge_policy.compute_merge_candidates(&committed_segments); merge_candidates.extend_from_slice(&committed_merge_candidates[..]); - for MergeCandidate(segment_ids) in merge_candidates { - self.start_merge(segment_ids); + for MergeCandidate(segment_metas) in merge_candidates { + self.start_merge(&segment_metas); } } - fn end_merge(&self, - merged_segment_ids: Vec, + fn end_merge(&self, + merged_segment_metas: Vec, resulting_segment_entry: SegmentEntry) -> impl Future { self.run_async(move |segment_updater| { - segment_updater.0.segment_manager.end_merge(&merged_segment_ids, resulting_segment_entry); + segment_updater.0.segment_manager.end_merge(&merged_segment_metas, resulting_segment_entry); let mut directory = segment_updater.0.index.directory().box_clone(); save_metas( &segment_updater.0.segment_manager, segment_updater.0.index.schema(), segment_updater.0.index.docstamp(), directory.borrow_mut()).expect("Could not save metas."); - for segment_id in merged_segment_ids { - segment_updater.0.index.delete_segment(segment_id); + for segment_meta in merged_segment_metas { + segment_updater.0.index.delete_segment(segment_meta.segment_id); } }) diff --git a/src/postings/mod.rs b/src/postings/mod.rs index f9898b9fc..b7676710f 100644 --- a/src/postings/mod.rs +++ b/src/postings/mod.rs @@ -61,7 +61,7 @@ mod tests { let text_field = schema_builder.add_text_field("text", TEXT); let schema = schema_builder.build(); let index = Index::create_in_ram(schema); - let mut segment = index.new_segment(); + let mut segment = index.new_segment(0u64); let mut posting_serializer = PostingsSerializer::open(&mut segment).unwrap(); let term = Term::from_field_text(text_field, "abc"); posting_serializer.new_term(&term, 3).unwrap(); @@ -81,7 +81,7 @@ mod tests { let text_field = schema_builder.add_text_field("text", TEXT); let schema = schema_builder.build(); let index = Index::create_in_ram(schema.clone()); - let segment = index.new_segment(); + let segment = index.new_segment(0u64); let heap = Heap::with_capacity(10_000_000); { let mut segment_writer = SegmentWriter::for_segment(&heap, segment.clone(), &schema).unwrap();