From ddb2b8d807b1405f220fe69699fdbae6e6f9804e Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Sun, 26 Mar 2017 17:33:42 +0900 Subject: [PATCH] test passing. SegmentWriter create SegmentEntry which contain a delete_bitset --- src/indexer/index_writer.rs | 59 +++++++++++++++++----------------- src/indexer/merger.rs | 2 +- src/indexer/segment_updater.rs | 8 ++++- 3 files changed, 38 insertions(+), 31 deletions(-) diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 6b7a6419d..6a373f568 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -160,8 +160,9 @@ pub fn compute_deleted_bitset( segment_reader: &SegmentReader, delete_cursor: &mut DeleteCursor, doc_opstamps: DocToOpstampMapping, - target_opstamp: u64) -> Result<(Option)> { + target_opstamp: u64) -> Result { + let mut might_have_changed = false; loop { if let Some(delete_op) = delete_cursor.peek() { @@ -180,6 +181,7 @@ pub fn compute_deleted_bitset( let deleted_doc = docset.doc(); if deleted_doc < limit_doc { delete_bitset.insert(deleted_doc as usize); + might_have_changed = true; } } } @@ -190,13 +192,7 @@ pub fn compute_deleted_bitset( } delete_cursor.advance(); } - - if !delete_bitset.is_empty() { - Ok(Some(delete_bitset)) - } - else { - Ok(None) - } + Ok(might_have_changed) } @@ -213,44 +209,45 @@ pub fn advance_deletes( segment_entry: &mut SegmentEntry, target_opstamp: u64) -> Result<()> { - { - + { + if let Some(previous_opstamp) = segment_entry.meta().delete_opstamp() { + // We are already up-to-date here. + if target_opstamp == previous_opstamp { + return Ok(()); + } + } let segment_reader = SegmentReader::open(segment.clone())?; - + let max_doc = segment_reader.max_doc(); + let mut delete_bitset: BitSet = match segment_entry.reset_delete_bitset() { Some(previous_delete_bitset) => { previous_delete_bitset }, None => { - BitSet::with_capacity(segment_reader.max_doc() as usize) + BitSet::with_capacity(max_doc as usize) } }; let delete_cursor = segment_entry.delete_cursor(); - let new_deleted_bitset = compute_deleted_bitset( + compute_deleted_bitset( + &mut delete_bitset, &segment_reader, delete_cursor, - &mut delete_bitset, DocToOpstampMapping::None, target_opstamp)?; - // we only write the result different - // iff we ended ended up increasing the delete opstamp - // - // TODO just move the file if there was no new delete? - if let Some(mut delete_bitset) = new_deleted_bitset { - for doc in 0u32..segment_reader.max_doc() { - if segment_reader.is_deleted(doc) { - delete_bitset.insert(doc as usize); - } + for doc in 0u32..max_doc { + if segment_reader.is_deleted(doc) { + delete_bitset.insert(doc as usize); } - let num_deleted_docs = delete_bitset.len(); - segment.set_delete_meta(num_deleted_docs as u32, target_opstamp); - let mut delete_file = segment.open_write(SegmentComponent::DELETE)?; - write_delete_bitset(&delete_bitset, &mut delete_file)?; } + + let num_deleted_docs = delete_bitset.len(); + segment.set_delete_meta(num_deleted_docs as u32, target_opstamp); + let mut delete_file = segment.open_write(SegmentComponent::DELETE)?; + write_delete_bitset(&delete_bitset, &mut delete_file)?; } segment_entry.set_meta(segment.meta().clone()); @@ -291,7 +288,9 @@ fn index_documents(heap: &mut Heap, let doc_to_opstamps = DocToOpstampMapping::from(doc_opstamps); let segment_reader = SegmentReader::open(segment)?; - let delete_bitset = compute_deleted_bitset( + let mut deleted_bitset = BitSet::with_capacity(num_docs as usize); + let may_have_deletes = compute_deleted_bitset( + &mut deleted_bitset, &segment_reader, &mut delete_cursor, doc_to_opstamps, @@ -301,7 +300,9 @@ fn index_documents(heap: &mut Heap, let segment_entry = SegmentEntry::new( segment_meta, delete_cursor, - delete_bitset); + { if may_have_deletes { Some(deleted_bitset) } + else { None } } + ); segment_updater .add_segment(generation, segment_entry) diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index c087b93f4..596c4d696 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -307,7 +307,7 @@ mod tests { use futures::Future; #[test] - fn test_index_merger() { + fn test_index_merger_no_deletes() { let mut schema_builder = schema::SchemaBuilder::default(); let text_fieldtype = schema::TextOptions::default() .set_indexing_options(TextIndexingOptions::TokenizedWithFreq) diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index 3d325d904..c12ac59e7 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -300,7 +300,13 @@ impl SegmentUpdater { .end_merge(segment_ids_vec, after_merge_segment_entry) .wait() .expect("Segment updater thread is corrupted."); - merging_future_send.complete(merged_segment_meta); + + // the future may fail if the listener of the oneshot future + // has been destroyed. + // + // This is not a problem here, so we just ignore any + // possible error. + let _merging_future_res = merging_future_send.send(merged_segment_meta); } Err(_) => { // ... cancel merge