diff --git a/src/core/index.rs b/src/core/index.rs index 100714c89..afce57a21 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -39,7 +39,6 @@ pub struct Index { directory: Box, schema: Schema, searcher_pool: Arc>, - opstamp: u64, } @@ -117,12 +116,10 @@ impl Index { /// Creates a new index given a directory and an `IndexMeta`. fn create_from_metas(directory: Box, metas: IndexMeta) -> Result { let schema = metas.schema.clone(); - let opstamp = metas.opstamp; let index = Index { directory: directory, schema: schema, searcher_pool: Arc::new(Pool::new()), - opstamp: opstamp, }; try!(index.load_searchers()); Ok(index) @@ -146,7 +143,7 @@ impl Index { /// The opstamp is the number of documents that have been added /// from the beginning of time, and until the moment of the last commit. pub fn opstamp(&self) -> u64 { - self.opstamp + load_metas(self.directory()).unwrap().opstamp } /// Open a new index writer. Attempts to acquire a lockfile. @@ -294,7 +291,6 @@ impl Clone for Index { directory: self.directory.box_clone(), schema: self.schema.clone(), searcher_pool: self.searcher_pool.clone(), - opstamp: self.opstamp, } } } diff --git a/src/core/segment_id.rs b/src/core/segment_id.rs index 6515ab423..9e3a75d3d 100644 --- a/src/core/segment_id.rs +++ b/src/core/segment_id.rs @@ -79,7 +79,7 @@ impl Decodable for SegmentId { impl fmt::Debug for SegmentId { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "SegmentId({:?})", self.uuid_string()) + write!(f, "Seg({:?})", self.short_uuid_string()) } } diff --git a/src/functional_test.rs b/src/functional_test.rs new file mode 100644 index 000000000..dd713d896 --- /dev/null +++ b/src/functional_test.rs @@ -0,0 +1,61 @@ +use std::collections::HashSet; +use rand::{thread_rng, Rng}; + +use schema::*; +use Index; +use Searcher; +use rand::distributions::{IndependentSample, Range}; + +fn check_index_content(searcher: &Searcher, vals: &HashSet) { + assert!(searcher.segment_readers().len() < 20); + assert_eq!(searcher.num_docs() as usize, vals.len()); +} + +#[test] +fn test_indexing() { + + let mut schema_builder = SchemaBuilder::default(); + + let id_field = schema_builder.add_u32_field("id", U32_INDEXED); + let multiples_field = schema_builder.add_u32_field("multiples", U32_INDEXED); + let schema = schema_builder.build(); + + let index = Index::create_from_tempdir(schema).unwrap(); + + let universe = Range::new(0u32, 20u32); + let mut rng = thread_rng(); + + let mut index_writer = index.writer_with_num_threads(3, 120_000_000).unwrap(); + + let mut committed_docs: HashSet = HashSet::new(); + let mut uncommitted_docs: HashSet = HashSet::new(); + + for n in 0..200 { + let random_val = universe.ind_sample(&mut rng); + if random_val == 0 { + index_writer.commit(); + committed_docs.extend(&uncommitted_docs); + uncommitted_docs.clear(); + index.load_searchers().unwrap(); + let searcher = index.searcher(); + // check that everything is correct. + check_index_content(&searcher, &committed_docs); + } + else { + if committed_docs.remove(&random_val) || + uncommitted_docs.remove(&random_val) { + let doc_id_term = Term::from_field_u32(id_field, random_val); + index_writer.delete_term(doc_id_term); + } + else { + uncommitted_docs.insert(random_val); + let mut doc = Document::new(); + doc.add_u32(id_field, random_val); + for i in 1u32..10u32 { + doc.add_u32(multiples_field, random_val * i); + } + index_writer.add_document(doc); + } + } + } +} diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 7b9633ea3..f2cc58df1 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -160,12 +160,25 @@ pub fn advance_deletes( segment: &mut Segment, delete_operations: &DeleteQueueSnapshot, doc_opstamps: &DocToOpstampMapping) -> Result { + + let segment_reader = SegmentReader::open(segment.clone())?; + let mut delete_bitset = BitSet::with_capacity(segment_reader.max_doc() as usize); let mut last_opstamp_opt: Option = None; + let previous_delete_opstamp_opt = segment.meta().delete_opstamp(); + for delete_op in delete_operations.iter() { + + // let's skip operations that have already been deleted.0u32 + if let Some(previous_delete_opstamp) = previous_delete_opstamp_opt { + if delete_op.opstamp <= previous_delete_opstamp { + continue; + } + } + // A delete operation should only affect // document that were inserted after it. // @@ -179,11 +192,11 @@ pub fn advance_deletes( delete_bitset.insert(deleted_doc as usize); } } - last_opstamp_opt = Some(delete_op.opstamp); } + last_opstamp_opt = Some(delete_op.opstamp); } - if let Some(last_opstamp) = last_opstamp_opt { + if let Some(last_opstamp) = last_opstamp_opt { for doc in 0u32..segment_reader.max_doc() { if segment_reader.is_deleted(doc) { delete_bitset.insert(doc as usize); @@ -194,6 +207,7 @@ pub fn advance_deletes( let mut delete_file = segment.open_write(SegmentComponent::DELETE)?; write_delete_bitset(&delete_bitset, &mut delete_file)?; } + Ok(segment.meta().clone()) } @@ -365,6 +379,8 @@ impl IndexWriter { /// The opstamp at the last commit is returned. pub fn rollback(&mut self) -> Result { + info!("Rolling back to opstamp {}", self.committed_opstamp); + // by updating the generation in the segment updater, // pending add segment commands will be dismissed. self.generation += 1; @@ -428,6 +444,19 @@ impl IndexWriter { /// pub fn commit(&mut self) -> Result { + // here, because we join all of the worker threads, + // all of the segment update for this commit have been + // sent. + // + // No document belonging to the next generation have been + // pushed too, because add_document can only happen + // on this thread. + + // This will move uncommitted segments to the state of + // committed segments. + self.committed_opstamp = self.stamp(); + info!("committing {}", self.committed_opstamp); + // this will drop the current document channel // and recreate a new one channels. self.recreate_document_channel(); @@ -444,17 +473,7 @@ impl IndexWriter { try!(self.add_indexing_worker()); } - // here, because we join all of the worker threads, - // all of the segment update for this commit have been - // sent. - // - // No document belonging to the next generation have been - // pushed too, because add_document can only happen - // on this thread. - - // This will move uncommitted segments to the state of - // committed segments. - self.committed_opstamp = self.stamp(); + // wait for the segment update thread to have processed the info self.segment_updater @@ -473,13 +492,14 @@ impl IndexWriter { /// /// Like adds, the deletion itself will be visible /// only after calling `commit()`. - pub fn delete_term(&mut self, term: Term) { + pub fn delete_term(&mut self, term: Term) -> u64 { let opstamp = self.stamp(); let delete_operation = DeleteOperation { opstamp: opstamp, term: term, }; self.delete_queue.push(delete_operation); + opstamp } fn stamp(&mut self) -> u64 { @@ -498,6 +518,8 @@ impl IndexWriter { /// /// Currently it represents the number of documents that /// have been added since the creation of the index. + + // TODO remove return without Result<> pub fn add_document(&mut self, document: Document) -> io::Result { let opstamp = self.stamp(); let add_operation = AddOperation { diff --git a/src/indexer/segment_entry.rs b/src/indexer/segment_entry.rs index c8a917665..0c12e72ba 100644 --- a/src/indexer/segment_entry.rs +++ b/src/indexer/segment_entry.rs @@ -43,6 +43,10 @@ impl SegmentEntry { self.state } + pub fn set_state(&mut self, state: SegmentState) { + self.state = state; + } + pub fn set_doc_to_opstamp(&mut self, doc_to_opstamp: DocToOpstampMapping) { self.doc_to_opstamp = doc_to_opstamp; } diff --git a/src/indexer/segment_manager.rs b/src/indexer/segment_manager.rs index 8d01dee1a..250830a84 100644 --- a/src/indexer/segment_manager.rs +++ b/src/indexer/segment_manager.rs @@ -2,23 +2,17 @@ use super::segment_register::SegmentRegister; use std::sync::RwLock; use core::SegmentMeta; use core::SegmentId; -use indexer::SegmentEntry; +use indexer::{SegmentEntry, SegmentState}; + use std::sync::{RwLockReadGuard, RwLockWriteGuard}; use std::fmt::{self, Debug, Formatter}; +#[derive(Default)] struct SegmentRegisters { uncommitted: SegmentRegister, committed: SegmentRegister, } -impl Default for SegmentRegisters { - fn default() -> SegmentRegisters { - SegmentRegisters { - uncommitted: SegmentRegister::default(), - committed: SegmentRegister::default() - } - } -} /// The segment manager stores the list of segments @@ -26,6 +20,7 @@ impl Default for SegmentRegisters { /// /// It guarantees the atomicity of the /// changes (merges especially) +#[derive(Default)] pub struct SegmentManager { registers: RwLock, } @@ -71,7 +66,12 @@ impl SegmentManager { ); segment_entries } - + + pub fn segment_state(&self, segment_id: &SegmentId) -> Option { + self.segment_entry(segment_id) + .map(|segment_entry| segment_entry.state()) + } + pub fn segment_entry(&self, segment_id: &SegmentId) -> Option { let registers = self.read(); registers @@ -100,11 +100,22 @@ impl SegmentManager { segment_ids } - pub fn commit(&self, segment_entries: Vec) { + pub fn commit(&self, segment_metas: Vec) { + let committed_segment_entries = segment_metas + .into_iter() + .map(|segment_meta| { + let segment_id = segment_meta.id(); + let mut segment_entry = SegmentEntry::new(segment_meta); + if let Some(state) = self.segment_state(&segment_id) { + segment_entry.set_state(state); + } + segment_entry + }) + .collect::>(); let mut registers_lock = self.write(); registers_lock.committed.clear(); registers_lock.uncommitted.clear(); - for segment_entry in segment_entries { + for segment_entry in committed_segment_entries { registers_lock.committed.add_segment_entry(segment_entry); } } @@ -121,6 +132,9 @@ impl SegmentManager { registers_lock.committed.start_merge(segment_id); } } + else { + error!("Merge operation sent for segments that are not all uncommited or commited."); + } } pub fn add_segment(&self, segment_entry: SegmentEntry) { @@ -152,15 +166,3 @@ impl SegmentManager { registers_lock.committed.segment_metas() } } - - -impl Default for SegmentManager { - fn default() -> SegmentManager { - SegmentManager { - registers: RwLock::new( SegmentRegisters { - uncommitted: SegmentRegister::default(), - committed: SegmentRegister::default(), - }), - } - } -} \ No newline at end of file diff --git a/src/indexer/segment_register.rs b/src/indexer/segment_register.rs index 5f2216342..418d92ef7 100644 --- a/src/indexer/segment_register.rs +++ b/src/indexer/segment_register.rs @@ -15,6 +15,7 @@ use indexer::segment_entry::SegmentEntry; /// segments that are currently searchable, /// and by the index merger to identify /// merge candidates. +#[derive(Default)] pub struct SegmentRegister { segment_states: HashMap, } @@ -110,13 +111,6 @@ impl SegmentRegister { } } -impl Default for SegmentRegister { - fn default() -> SegmentRegister { - SegmentRegister { - segment_states: HashMap::new(), - } - } -} #[cfg(test)] mod tests { diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index f7f9ea868..768fe7ea0 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -105,8 +105,7 @@ struct InnerSegmentUpdater { impl SegmentUpdater { - pub fn new(index: Index, delete_queue: DeleteQueue) -> Result - { + pub fn new(index: Index, delete_queue: DeleteQueue) -> Result { let segments = index.segments()?; let segment_manager = SegmentManager::from_segments(segments); Ok( @@ -177,11 +176,7 @@ impl SegmentUpdater { pub fn commit(&self, opstamp: u64) -> impl Future { self.run_async(move |segment_updater| { let segment_metas = segment_updater.purge_deletes().expect("Failed purge deletes"); - let segment_entries = segment_metas - .into_iter() - .map(SegmentEntry::new) - .collect::>(); - segment_updater.0.segment_manager.commit(segment_entries); + segment_updater.0.segment_manager.commit(segment_metas); let mut directory = segment_updater.0.index.directory().box_clone(); save_metas( segment_updater.0.segment_manager.committed_segment_metas(), @@ -241,13 +236,15 @@ impl SegmentUpdater { .map(|segment_meta| index.segment(segment_meta)) .collect(); - // An IndexMerger is like a "view" of our merged segments. + // An IndexMerger is like a "view" of our merged segments. let merger: IndexMerger = IndexMerger::open(schema, &segments[..])?; let mut merged_segment = index.new_segment(); // ... we just serialize this index merger in our new segment // to merge the two segments. + let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment).expect("Creating index serializer failed"); + let num_docs = merger.write(segment_serializer).expect("Serializing merged index failed"); let mut segment_meta = SegmentMeta::new(merged_segment.id()); segment_meta.set_num_docs(num_docs); @@ -257,7 +254,6 @@ impl SegmentUpdater { .end_merge(segment_metas.clone(), segment_entry.clone()) .wait() .unwrap(); - merging_future_send.complete(segment_entry.clone()); segment_updater_clone.0.merging_threads.write().unwrap().remove(&merging_thread_id); Ok(segment_entry) diff --git a/src/lib.rs b/src/lib.rs index 61285583a..a34a8e93b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -55,6 +55,10 @@ extern crate libc; #[cfg(test)] extern crate test; #[cfg(test)] extern crate rand; + +#[cfg(test)] +mod functional_test; + #[macro_use] mod macros { macro_rules! get( @@ -185,8 +189,10 @@ mod tests { use Index; use core::SegmentReader; use query::BooleanQuery; + use postings::SegmentPostingsOption; use schema::*; use DocSet; + use IndexWriter; use Postings; #[test] @@ -290,7 +296,7 @@ mod tests { #[test] - fn test_delete_postings() { + fn test_delete_postings1() { let mut schema_builder = SchemaBuilder::default(); let text_field = schema_builder.add_text_field("text", TEXT); let schema = schema_builder.build(); @@ -392,10 +398,8 @@ mod tests { { index_writer.delete_term(Term::from_field_text(text_field, "c")); } - index_writer.rollback().unwrap(); - { - index_writer.delete_term(Term::from_field_text(text_field, "a")); - } + index_writer.rollback().unwrap(); + index_writer.delete_term(Term::from_field_text(text_field, "a")); index_writer.commit().unwrap(); } { @@ -425,6 +429,63 @@ mod tests { } + #[test] + fn test_indexed_u32() { + let mut schema_builder = SchemaBuilder::default(); + let field = schema_builder.add_u32_field("text", U32_INDEXED); + let schema = schema_builder.build(); + + let index = Index::create_in_ram(schema); + let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap(); + index_writer.add_document( + doc!(field=>1) + ); + index_writer.commit().unwrap(); + index.load_searchers().unwrap(); + let searcher = index.searcher(); + let term = Term::from_field_u32(field, 1u32); + let mut postings = searcher.segment_reader(0).read_postings(&term, SegmentPostingsOption::NoFreq).unwrap(); + assert!(postings.advance()); + assert_eq!(postings.doc(), 0); + assert!(!postings.advance()); + } + + #[test] + fn test_delete_postings2() { + let mut schema_builder = SchemaBuilder::default(); + let text_field = schema_builder.add_text_field("text", TEXT); + let schema = schema_builder.build(); + let index = Index::create_in_ram(schema); + + // writing the segment + let mut index_writer = index.writer_with_num_threads(2, 40_000_000).unwrap(); + + let add_document = |index_writer: &mut IndexWriter, val: &'static str| { + let doc = doc!(text_field=>val); + index_writer.add_document(doc); + }; + + let remove_document = |index_writer: &mut IndexWriter, val: &'static str| { + let delterm = Term::from_field_text(text_field, val); + index_writer.delete_term(delterm); + }; + + add_document(&mut index_writer, "63"); + add_document(&mut index_writer, "70"); + add_document(&mut index_writer, "34"); + add_document(&mut index_writer, "1"); + add_document(&mut index_writer, "38"); + add_document(&mut index_writer, "33"); + add_document(&mut index_writer, "40"); + add_document(&mut index_writer, "17"); + remove_document(&mut index_writer, "38"); + remove_document(&mut index_writer, "34"); + index_writer.commit().unwrap(); + index.load_searchers().unwrap(); + let searcher = index.searcher(); + assert_eq!(searcher.num_docs(), 6); + } + #[test] fn test_termfreq() { let mut schema_builder = SchemaBuilder::default();