diff --git a/src/core/segment_reader.rs b/src/core/segment_reader.rs index 5e8676882..f1713f931 100644 --- a/src/core/segment_reader.rs +++ b/src/core/segment_reader.rs @@ -147,7 +147,6 @@ impl SegmentReader { .open_read(SegmentComponent::POSITIONS) .unwrap_or_else(|_| ReadOnlySource::empty()); - // TODO 0u64 let delete_bitset = if segment.meta().has_deletes() { let delete_data = segment.open_read(SegmentComponent::DELETE)?; diff --git a/src/datastruct/skip/skiplist_builder.rs b/src/datastruct/skip/skiplist_builder.rs index b83406029..9806a69af 100644 --- a/src/datastruct/skip/skiplist_builder.rs +++ b/src/datastruct/skip/skiplist_builder.rs @@ -36,7 +36,7 @@ impl LayerBuilder { fn insert(&mut self, doc_id: DocId, value: &T) -> io::Result> { self.remaining -= 1; self.len += 1; - let offset = self.written_size() as u32; // TODO not sure if we want after or here + let offset = self.written_size() as u32; try!(doc_id.serialize(&mut self.buffer)); try!(value.serialize(&mut self.buffer)); Ok(if self.remaining == 0 { diff --git a/src/indexer/doc_opstamp_mapping.rs b/src/indexer/doc_opstamp_mapping.rs index 843002416..16eb1ff28 100644 --- a/src/indexer/doc_opstamp_mapping.rs +++ b/src/indexer/doc_opstamp_mapping.rs @@ -1,6 +1,22 @@ use std::sync::Arc; use DocId; + +// Doc to opstamp is used to identify which +// document should be deleted. +// +// Since the docset matching the query of a delete operation +// is not computed right when the delete operation is received, +// we need to find a way to evaluate, for each document, +// whether the document was added before or after +// the delete operation. This anteriority is used by comparing +// the docstamp of the document. +// +// The doc to opstamp mapping stores precisely an array +// indexed by doc id and storing the opstamp of the document. +// +// This mapping is (for the moment) stricly increasing +// because of the way document id are allocated. #[derive(Clone)] pub enum DocToOpstampMapping { WithMap(Arc>), @@ -13,12 +29,18 @@ impl From> for DocToOpstampMapping { } } -impl DocToOpstampMapping { - // TODO Unit test - pub fn compute_doc_limit(&self, opstamp: u64) -> DocId { +impl DocToOpstampMapping { + + /// Given an opstamp return the limit doc id L + /// such that all doc id D such that + // D >= L iff opstamp(D) >= than `target_opstamp`. + // + // The edge case opstamp = some doc opstamp is in practise + // never called. + pub fn compute_doc_limit(&self, target_opstamp: u64) -> DocId { match *self { DocToOpstampMapping::WithMap(ref doc_opstamps) => { - match doc_opstamps.binary_search(&opstamp) { + match doc_opstamps.binary_search(&target_opstamp) { Ok(doc_id) => doc_id as DocId, Err(doc_id) => doc_id as DocId, } @@ -28,3 +50,44 @@ impl DocToOpstampMapping { } } +#[cfg(test)] +mod tests { + + use super::DocToOpstampMapping; + + #[test] + fn test_doc_to_opstamp_mapping_none() { + let doc_to_opstamp_mapping = DocToOpstampMapping::None; + assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(1), u32::max_value()); + } + + #[test] + fn test_doc_to_opstamp_mapping_complex() { + { + let doc_to_opstamp_mapping = DocToOpstampMapping::from(vec!()); + assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(0u64), 0); + assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(2u64), 0); + } + { + let doc_to_opstamp_mapping = DocToOpstampMapping::from(vec!(1u64)); + assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(0u64), 0); + assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(2u64), 1); + } + { + let doc_to_opstamp_mapping = DocToOpstampMapping::from(vec!(1u64, 12u64, 17u64, 23u64)); + assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(0u64), 0); + for i in 2u64..13u64 { + assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(i), 1); + } + for i in 13u64..18u64 { + assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(i), 2); + } + for i in 18u64..24u64 { + assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(i), 3); + } + for i in 24u64..30u64 { + assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(i), 4); + } + } + } +} \ No newline at end of file diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 78fe1e811..549de17c1 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -209,8 +209,6 @@ impl IndexWriter { let mut segment_updater = self.segment_updater.clone(); let mut heap = Heap::with_capacity(self.heap_size_in_bytes_per_thread); - // TODO fix this. the cursor might be too advanced - // at this point. let generation = self.generation; let join_handle: JoinHandle> = @@ -369,7 +367,6 @@ impl IndexWriter { // pending add segment commands will be dismissed. self.generation += 1; - // TODO requires a new delete queue... let rollback_future = self.segment_updater.rollback(self.generation); // we cannot drop segment ready receiver yet @@ -457,13 +454,11 @@ impl IndexWriter { // committed segments. self.committed_opstamp = self.stamp(); - // TODO remove clone - let future = self.segment_updater.commit(self.committed_opstamp); - // wait for the segment update thread to have processed the info - // TODO remove unwrap - future.wait().unwrap(); - + self.segment_updater + .commit(self.committed_opstamp) + .wait()?; + self.delete_queue.clear(); Ok(self.committed_opstamp) } diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 0c28a4d79..fa8c94c99 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -149,9 +149,7 @@ impl IndexMerger { } assert!(min_val <= max_val); - - // TODO test deleting all documents off the index. - + try!(fast_field_serializer.new_u32_fast_field(field, min_val, max_val)); for (max_doc, u32_reader, delete_bitset) in u32_readers { for doc_id in 0..max_doc { diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index b9c4e47d2..f7f9ea868 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -136,21 +136,21 @@ impl SegmentUpdater { } - fn run_async T>(&self, f: F) -> impl Future { + fn run_async T>(&self, f: F) -> impl Future { let me_clone = self.clone(); self.0.pool.spawn_fn(move || { Ok(f(me_clone)) }) } - pub fn rollback(&mut self, generation: usize) -> impl Future { + pub fn rollback(&mut self, generation: usize) -> impl Future { self.0.generation.store(generation, Ordering::Release); self.run_async(|segment_updater| { segment_updater.0.segment_manager.rollback(); }) } - pub fn add_segment(&self, generation: usize, segment_entry: SegmentEntry) -> impl Future { + pub fn add_segment(&self, generation: usize, segment_entry: SegmentEntry) -> impl Future { if generation >= self.0.generation.load(Ordering::Acquire) { future::Either::A(self.run_async(|segment_updater| { segment_updater.0.segment_manager.add_segment(segment_entry); @@ -174,7 +174,7 @@ impl SegmentUpdater { .collect() } - pub fn commit(&self, opstamp: u64) -> impl Future { + pub fn commit(&self, opstamp: u64) -> impl Future { self.run_async(move |segment_updater| { let segment_metas = segment_updater.purge_deletes().expect("Failed purge deletes"); let segment_entries = segment_metas @@ -242,8 +242,7 @@ impl SegmentUpdater { .collect(); // An IndexMerger is like a "view" of our merged segments. - // TODO unwrap - let merger: IndexMerger = IndexMerger::open(schema, &segments[..]).expect("Creating index merger failed"); + let merger: IndexMerger = IndexMerger::open(schema, &segments[..])?; let mut merged_segment = index.new_segment(); // ... we just serialize this index merger in our new segment @@ -284,7 +283,7 @@ impl SegmentUpdater { fn end_merge(&self, merged_segment_metas: Vec, - resulting_segment_entry: SegmentEntry) -> impl Future { + resulting_segment_entry: SegmentEntry) -> impl Future { self.run_async(move |segment_updater| { segment_updater.0.segment_manager.end_merge(&merged_segment_metas, resulting_segment_entry); diff --git a/src/query/boolean_query/mod.rs b/src/query/boolean_query/mod.rs index c04349687..a38d72f81 100644 --- a/src/query/boolean_query/mod.rs +++ b/src/query/boolean_query/mod.rs @@ -102,8 +102,6 @@ mod tests { } { let boolean_query = BooleanQuery::from(vec![(Occur::MustNot, make_term_query("d")),]); - // TODO optimize this use case : only MustNot subqueries... no need - // to read any postings. assert_eq!(matching_docs(&boolean_query), Vec::new()); } }