diff --git a/src/core/index.rs b/src/core/index.rs index 934e9657e..123ec14c1 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -70,10 +70,10 @@ fn load_metas(directory: &Directory) -> Result { Ok(loaded_meta) } -pub fn commit(index: &mut Index, docstamp: u64) { - index.docstamp = docstamp; - index.segment_manager.commit(); -} +// pub fn commit(index: &mut Index, docstamp: u64) { +// index.docstamp = docstamp; +// index.segment_manager.commit(); +// } /// Tantivy's Search Index pub struct Index { diff --git a/src/datastruct/stacker/heap.rs b/src/datastruct/stacker/heap.rs index df73ab242..895dd6743 100644 --- a/src/datastruct/stacker/heap.rs +++ b/src/datastruct/stacker/heap.rs @@ -1,8 +1,6 @@ use std::cell::UnsafeCell; use std::mem; use std::ptr; -use std::iter; - /// `BytesRef` refers to a slice in tantivy's custom `Heap`. #[derive(Copy, Clone)] @@ -103,11 +101,23 @@ struct InnerHeap { next_heap: Option>, } +/// initializing a long Vec is crazy slow in +/// debug mode. +fn allocate_fast(num_bytes: usize) -> Vec { + let mut scavenged = Vec::with_capacity(num_bytes); + unsafe { + let ptr = scavenged.as_mut_ptr(); + mem::forget(scavenged); + Vec::from_raw_parts(ptr, num_bytes, num_bytes) + } +} + impl InnerHeap { pub fn with_capacity(num_bytes: usize) -> InnerHeap { + let buffer: Vec = allocate_fast(num_bytes); InnerHeap { - buffer: iter::repeat(0u8).take(num_bytes).collect(), + buffer: buffer, buffer_len: num_bytes as u32, next_heap: None, used: 0u32, diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 904689ea3..12695717d 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -11,6 +11,7 @@ use std::clone::Clone; use std::io; use indexer::MergePolicy; use std::thread; +use std::mem; use indexer::merger::IndexMerger; use core::SegmentId; use datastruct::stacker::Heap; @@ -18,7 +19,7 @@ use std::mem::swap; use chan; use core::SegmentMeta; use super::super::core::index::get_segment_manager; -use super::segment_manager::{SegmentManager, get_segment_ready_for_commit}; +use super::segment_manager::{CommitState, SegmentManager, get_segment_ready_for_commit}; use Result; use Error; @@ -63,6 +64,10 @@ pub struct IndexWriter { docstamp: u64, } +// IndexWriter cannot be sent to another thread. +impl !Send for IndexWriter {} +impl !Sync for IndexWriter {} + fn index_documents(heap: &mut Heap, segment: Segment, @@ -97,65 +102,73 @@ pub enum SegmentUpdate { EndMerge(Vec, SegmentMeta), CancelGeneration, NewGeneration, + Terminate, + Commit, } -// Process a single segment update. -// -// If the segment manager has been changed a result, -// return true. (else return false) -fn process_segment_update( +impl SegmentUpdate { + + // Process a single segment update. + pub fn process( + self, index: &Index, segment_manager: &SegmentManager, - segment_update: SegmentUpdate, is_cancelled_generation: &mut bool) -> bool { + + info!("Segment update: {:?}", self); - info!("Segment update: {:?}", segment_update); - - match segment_update { - SegmentUpdate::AddSegment(segment_meta) => { - if !*is_cancelled_generation { - segment_manager.add_segment(segment_meta); + match self { + SegmentUpdate::AddSegment(segment_meta) => { + if !*is_cancelled_generation { + segment_manager.add_segment(segment_meta); + } + else { + // rollback has been called and this + // segment actually belong to the + // documents that have been dropped. + // + // Let's just remove its files. + index.delete_segment(segment_meta.segment_id); + } } - else { - index.delete_segment(segment_meta.segment_id); + SegmentUpdate::EndMerge(segment_ids, segment_meta) => { + segment_manager.end_merge(&segment_ids, &segment_meta); + for segment_id in segment_ids { + index.delete_segment(segment_id); + } } - true - }, - SegmentUpdate::EndMerge(segment_ids, segment_meta) => { - segment_manager.end_merge(&segment_ids, &segment_meta); - for segment_id in segment_ids { - index.delete_segment(segment_id); + SegmentUpdate::CancelGeneration => { + // Called during rollback. The segment + // that will arrive will be ignored + // until a NewGeneration is update arrives. + *is_cancelled_generation = true; + } + SegmentUpdate::NewGeneration => { + // After rollback, we can resume + // indexing new documents. + *is_cancelled_generation = false; + } + SegmentUpdate::Commit => { + segment_manager.commit(); + } + SegmentUpdate::Terminate => { + return true; } - true - }, - SegmentUpdate::CancelGeneration => { - *is_cancelled_generation = true; - false - }, - SegmentUpdate::NewGeneration => { - *is_cancelled_generation = false; - false } + return false; } } - -fn consider_merge_options(index: &Index, merge_policy: &MergePolicy) -> Vec { - let segment_manager = get_segment_manager(index); - let (committed_segments, uncommitted_segments) = get_segment_ready_for_commit(&*segment_manager); - // committed segments cannot be merged with uncommitted_segments. - let mut merge_candidates = merge_policy.compute_merge_candidates(&committed_segments); - merge_candidates.extend_from_slice(&merge_policy.compute_merge_candidates(&uncommitted_segments)[..]); + + +fn consider_merge_options(segment_manager: &SegmentManager, merge_policy: &MergePolicy) -> Vec { + let (committed_segments, uncommitted_segments) = get_segment_ready_for_commit(segment_manager); + // Committed segments cannot be merged with uncommitted_segments. + // We therefore consider merges using these two sets of segments independantly. + let mut merge_candidates = merge_policy.compute_merge_candidates(&uncommitted_segments); + merge_candidates.extend_from_slice(&merge_policy.compute_merge_candidates(&committed_segments)[..]); merge_candidates } -fn on_segment_change(index: &mut Index) -> Result<()> { - // saving the meta file. - try!(index.save_metas()); - // update the searcher so that they eventually will - // use the new segments. - try!(index.load_searchers()); - Ok(()) -} // Consumes the `segment_update_receiver` channel // for segment updates and apply them. @@ -168,55 +181,94 @@ fn process_segment_updates(mut index: Index, segment_manager: &SegmentManager, segment_update_receiver: SegmentUpdateReceiver, segment_update_sender: SegmentUpdateSender) { + let mut option_segment_update_sender = Some(segment_update_sender); let mut is_cancelled_generation = false; + let mut generation = segment_manager.generation(); let merge_policy = index.get_merge_policy(); + for segment_update in segment_update_receiver { - let has_changed = process_segment_update( - &index, - segment_manager, - segment_update, - &mut is_cancelled_generation); - if has_changed { - on_segment_change(&mut index); + if segment_update.process( + &index, + segment_manager, + &mut is_cancelled_generation) { + option_segment_update_sender = None; + }; - let segment_manager = get_segment_manager(&index); + let new_generation = segment_manager.generation(); + + // we check the generation number as if it was + // dirty-bit. If the value is different + // to our generation, then the segment_manager has + // been update updated and we need to + // - save meta.json + // - update the searchers + // - consider possible segment merge - for MergeCandidate(segment_ids) in consider_merge_options(&index, &*merge_policy) { - segment_manager.start_merge(&segment_ids); - let index_clone = index.clone(); - let segment_update_sender_clone = segment_update_sender.clone(); - thread::Builder::new().name(format!("merge_thread_{:?}", segment_ids[0])).spawn(move || { - info!("Start merge: {:?}", segment_ids); - let schema = index_clone.schema(); - let segments: Vec = segment_ids - .iter() - .map(|&segment_id| index_clone.segment(segment_id)) - .collect(); - // An IndexMerger is like a "view" of our merged segments. - // TODO unwrap - let merger: IndexMerger = IndexMerger::open(schema, &segments[..]).unwrap(); - let mut merged_segment = index_clone.new_segment(); - // ... we just serialize this index merger in our new segment - // to merge the two segments. - let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment).unwrap(); - let num_docs = merger.write(segment_serializer).unwrap(); - let segment_meta = SegmentMeta { - segment_id: merged_segment.id(), - num_docs: num_docs, - }; - let segment_update = SegmentUpdate::EndMerge(segment_ids, segment_meta); - segment_update_sender_clone.send(segment_update); - }).expect("Failed to spawn merge thread"); + if generation != new_generation { + generation = new_generation; + + // saving the meta file. + index.save_metas().expect("Could not save metas."); + + // update the searchers so that they eventually will + // use the new segments. + // TODO eventually have this work through watching meta.json + // so that an external process stays up to date as well. + index.load_searchers().expect("Could not load new searchers."); + + if let Some(ref segment_update_sender) = option_segment_update_sender { + for MergeCandidate(segment_ids) in consider_merge_options(&segment_manager, &*merge_policy) { + segment_manager.start_merge(&segment_ids); + let index_clone = index.clone(); + let segment_update_sender_clone = segment_update_sender.clone(); + thread::Builder::new().name(format!("merge_thread_{:?}", segment_ids[0])).spawn(move || { + info!("Start merge: {:?}", segment_ids); + let schema = index_clone.schema(); + let segments: Vec = segment_ids + .iter() + .map(|&segment_id| index_clone.segment(segment_id)) + .collect(); + // An IndexMerger is like a "view" of our merged segments. + // TODO unwrap + let merger: IndexMerger = IndexMerger::open(schema, &segments[..]).unwrap(); + let mut merged_segment = index_clone.new_segment(); + // ... we just serialize this index merger in our new segment + // to merge the two segments. + let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment).unwrap(); + let num_docs = merger.write(segment_serializer).unwrap(); + let segment_meta = SegmentMeta { + segment_id: merged_segment.id(), + num_docs: num_docs, + }; + let segment_update = SegmentUpdate::EndMerge(segment_ids, segment_meta); + segment_update_sender_clone.send(segment_update); + }).expect("Failed to spawn merge thread"); + } } + } } + } impl IndexWriter { - pub fn wait_merging_threads(self) -> Result<()> { + pub fn wait_merging_threads(mut self) -> Result<()> { + + self.segment_update_sender.send(SegmentUpdate::Terminate); + drop(self.segment_update_sender); - info!("Joining update thread"); + + // this will stop the indexing thread, + // dropping the last reference to the segment_update_sender. + drop(self.document_sender); + + let mut v = Vec::new(); + mem::swap(&mut v, &mut self.workers_join_handle); + for join_handle in v { + join_handle.join().expect("Indexer has failed"); + } + drop(self.workers_join_handle); self.segment_update_thread .join() .map_err(|err| { @@ -329,11 +381,37 @@ impl IndexWriter { /// Merges a given list of segments pub fn merge(&mut self, segments: &[Segment]) -> Result<()> { - // TODO fix commit or uncommited? + + if segments.len() < 2 { + // no segments or one segment? nothing to do. + return Ok(()); + } + + + let segment_manager = get_segment_manager(&self.index); + + { + // let's check that all these segments are in the same + // committed/uncommited state. + let first_commit_state = segment_manager.is_committed(segments[0].id()); + + for segment in segments { + let commit_state = segment_manager.is_committed(segment.id()); + if commit_state == CommitState::Missing { + return Err(Error::InvalidArgument(format!("Segment {:?} is not in the index", segments[0].id()))); + } + if commit_state != first_commit_state { + return Err(Error::InvalidArgument(String::from("You may not merge segments that are heterogenously in committed and uncommited."))); + } + } + } + let schema = self.index.schema(); + // An IndexMerger is like a "view" of our merged segments. let merger = try!(IndexMerger::open(schema, segments)); let mut merged_segment = self.index.new_segment(); + // ... we just serialize this index merger in our new segment // to merge the two segments. let segment_serializer = try!(SegmentSerializer::for_segment(&mut merged_segment)); @@ -343,7 +421,7 @@ impl IndexWriter { segment_id: merged_segment.id(), num_docs: num_docs, }; - let segment_manager = get_segment_manager(&self.index); + segment_manager.end_merge(&merged_segment_ids, &segment_meta); try!(self.index.load_searchers()); Ok(()) @@ -351,13 +429,13 @@ impl IndexWriter { /// Closes the current document channel send. /// and replace all the channels by new ones. - /// + /// /// The current workers will keep on indexing /// the pending document and stop /// when no documents are remaining. /// /// Returns the former segment_ready channel. - fn recreate_channel(&mut self,) -> DocumentReceiver { + fn recreate_document_channel(&mut self,) -> DocumentReceiver { let (mut document_sender, mut document_receiver): (DocumentSender, DocumentReceiver) = chan::sync(PIPELINE_MAX_SIZE_IN_DOCS); swap(&mut self.document_sender, &mut document_sender); swap(&mut self.document_receiver, &mut document_receiver); @@ -378,7 +456,7 @@ impl IndexWriter { // we cannot drop segment ready receiver yet // as it would block the workers. - let document_receiver = self.recreate_channel(); + let document_receiver = self.recreate_document_channel(); // Drains the document receiver pipeline : // Workers don't need to index the pending documents. @@ -442,8 +520,9 @@ impl IndexWriter { /// pub fn commit(&mut self,) -> Result { - // this will drop the current channel - self.recreate_channel(); + // this will drop the current document channel + // and recreate a new one channels. + self.recreate_document_channel(); // Docstamp of the last document in this commit. let commit_docstamp = self.docstamp; @@ -461,7 +540,9 @@ impl IndexWriter { try!(self.add_indexing_worker()); } - super::super::core::index::commit(&mut self.index, commit_docstamp); + self.segment_update_sender.send(SegmentUpdate::Commit); + + // super::super::core::index::commit(&mut self.index, commit_docstamp); try!(self.on_change()); Ok(commit_docstamp) } @@ -546,4 +627,39 @@ mod tests { index.searcher(); } + + + #[test] + fn test_with_merges() { + let mut schema_builder = schema::SchemaBuilder::default(); + let text_field = schema_builder.add_text_field("text", schema::TEXT); + let index = Index::create_in_ram(schema_builder.build()); + let num_docs_containing = |s: &str| { + let searcher = index.searcher(); + let term_a = Term::from_field_text(text_field, s); + searcher.doc_freq(&term_a) + }; + { + // writing the segment + let mut index_writer = index.writer_with_num_threads(4, 4 * 30_000_000).unwrap(); + // create 10 segments with 100 tiny docs + for _doc in 0..100 { + let mut doc = Document::default(); + doc.add_text(text_field, "a"); + index_writer.add_document(doc).unwrap(); + } + index_writer.commit().expect("commit failed"); + for _doc in 0..100 { + let mut doc = Document::default(); + doc.add_text(text_field, "a"); + index_writer.add_document(doc).unwrap(); + } + // this should create 8 segments and trigger a merge. + index_writer.commit().expect("commit failed"); + index_writer.wait_merging_threads().expect("waiting merging thread failed"); + assert_eq!(num_docs_containing("a"), 200); + assert_eq!(index.searchable_segments().len(), 1); + } + } + } \ No newline at end of file diff --git a/src/indexer/segment_manager.rs b/src/indexer/segment_manager.rs index 049902fb8..d6a3a8915 100644 --- a/src/indexer/segment_manager.rs +++ b/src/indexer/segment_manager.rs @@ -4,17 +4,25 @@ use core::SegmentMeta; use core::SegmentId; use std::sync::{RwLockReadGuard, RwLockWriteGuard}; use std::fmt::{self, Debug, Formatter}; +use std::sync::atomic::{AtomicUsize, Ordering}; struct SegmentRegisters { uncommitted: SegmentRegister, committed: SegmentRegister, } +#[derive(Eq, PartialEq)] +pub enum CommitState { + Committed, + Uncommitted, + Missing, +} + impl Default for SegmentRegisters { fn default() -> SegmentRegisters { SegmentRegisters { uncommitted: SegmentRegister::default(), - committed: SegmentRegister::default(), + committed: SegmentRegister::default() } } } @@ -27,6 +35,12 @@ impl Default for SegmentRegisters { /// changes (merges especially) pub struct SegmentManager { registers: RwLock, + // generation is an ever increasing counter that + // is incremented whenever we modify + // the segment manager. It can be useful for debugging + // purposes, and it also acts as a "dirty" marker, + // to detect when the `meta.json` should be written. + generation: AtomicUsize, } impl Debug for SegmentManager { @@ -49,27 +63,47 @@ pub fn get_segment_ready_for_commit(segment_manager: &SegmentManager,) -> (Vec CommitState { + let lock = self.read(); + if lock.uncommitted.contains(segment_id) { + CommitState::Uncommitted + } + else if lock.committed.contains(segment_id) { + CommitState::Committed + } + else { + CommitState::Missing + } + } pub fn from_segments(segment_metas: Vec) -> SegmentManager { SegmentManager { registers: RwLock::new( SegmentRegisters { uncommitted: SegmentRegister::default(), committed: SegmentRegister::from(segment_metas), - }) + }), + generation: AtomicUsize::default(), } } - // Lock poisoning should never happen : // The lock is acquired and released within this class, // and the operations cannot panic. fn read(&self,) -> RwLockReadGuard { self.registers.read().expect("Failed to acquire read lock on SegmentManager.") } + fn write(&self,) -> RwLockWriteGuard { + self.generation.fetch_add(1, Ordering::Release); self.registers.write().expect("Failed to acquire write lock on SegmentManager.") } + pub fn generation(&self,) -> usize { + self.generation.load(Ordering::Acquire) + } + /// Removes all of the uncommitted segments /// and returns them. pub fn rollback(&self,) -> Vec { diff --git a/src/indexer/segment_register.rs b/src/indexer/segment_register.rs index 7f4af4f4e..777c14670 100644 --- a/src/indexer/segment_register.rs +++ b/src/indexer/segment_register.rs @@ -4,7 +4,7 @@ use core::SegmentMeta; use std::fmt; use std::fmt::{Debug, Formatter}; -#[derive(Clone, PartialEq, Eq)] +#[derive(Clone, PartialEq, Eq, Debug)] pub enum SegmentState { Ready, InMerge, @@ -105,6 +105,11 @@ impl SegmentRegister { .map(|segment_entry| segment_entry.clone()) } + pub fn contains(&self, segment_id: SegmentId) -> bool { + self.segment_states.contains_key(&segment_id) + } + + pub fn contains_all(&mut self, segment_ids: &[SegmentId]) -> bool { segment_ids .iter() diff --git a/src/lib.rs b/src/lib.rs index 58302abfe..5bea7c2e3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,7 @@ #![allow(unknown_lints)] // for the clippy lint options #![allow(module_inception)] +#![feature(optin_builtin_traits)] #![feature(binary_heap_extras)] #![feature(conservative_impl_trait)] #![cfg_attr(test, feature(test))]