diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index fb218fc93..d53bb9ad3 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -140,7 +140,6 @@ fn compute_deleted_bitset( /// For instance, there was no delete operation between the state of the `segment_entry` and /// the `target_opstamp`, `segment_entry` is not updated. pub(crate) fn advance_deletes( - mut segment: Segment, segment_entry: &mut SegmentEntry, target_opstamp: Opstamp, ) -> crate::Result<()> { @@ -149,28 +148,38 @@ pub(crate) fn advance_deletes( return Ok(()); } - if segment_entry.delete_bitset().is_none() && segment_entry.delete_cursor().get().is_none() { + let delete_bitset_opt = segment_entry.take_delete_bitset(); + + // We avoid directly advancing the `SegmentEntry` delete cursor, because + // we do not want to end up in an invalid state if the delete bitset + // serialization fails. + let mut delete_cursor = segment_entry.delete_cursor(); + + if delete_bitset_opt.is_none() && delete_cursor.get().is_none() { // There has been no `DeleteOperation` between the segment status and `target_opstamp`. return Ok(()); } + // We open our current serialized segment to compute the new deleted bitset. + let segment = segment_entry.segment().clone(); let segment_reader = SegmentReader::open(&segment)?; let max_doc = segment_reader.max_doc(); - let mut delete_bitset: BitSet = match segment_entry.delete_bitset() { - Some(previous_delete_bitset) => (*previous_delete_bitset).clone(), - None => BitSet::with_max_value(max_doc), - }; + let mut delete_bitset: BitSet = + delete_bitset_opt.unwrap_or_else(|| BitSet::with_max_value(max_doc)); + + let num_deleted_docs_before = segment.meta().num_deleted_docs(); compute_deleted_bitset( &mut delete_bitset, &segment_reader, - segment_entry.delete_cursor(), + &mut delete_cursor, &DocToOpstampMapping::None, target_opstamp, )?; - // TODO optimize + // TODO optimize... We are simply manipulating bitsets here. + // We should be able to compute the union much faster. if let Some(seg_delete_bitset) = segment_reader.delete_bitset() { for doc in 0u32..max_doc { if seg_delete_bitset.is_deleted(doc) { @@ -179,15 +188,23 @@ pub(crate) fn advance_deletes( } } - let num_deleted_docs = delete_bitset.len(); - if num_deleted_docs > 0 { - segment = segment.with_delete_meta(num_deleted_docs as u32, target_opstamp); - let mut delete_file = segment.open_write(SegmentComponent::DELETE)?; + let num_deleted_docs = delete_bitset.len() as u32; + + if num_deleted_docs > num_deleted_docs_before { + // We need to write a new delete file. + let mut delete_file = segment + .with_delete_meta(num_deleted_docs, target_opstamp) + .open_write(SegmentComponent::DELETE)?; write_delete_bitset(&delete_bitset, max_doc, &mut delete_file)?; delete_file.terminate()?; + segment_entry.reset_delete_meta(num_deleted_docs as u32, target_opstamp); } - segment_entry.set_segment(segment); + // Regardless of whether we did end up having to write a new file or not + // we advance the `delete_cursor`. This is an optimisation. We want to ensure we do not + // check that a given deleted term does not match any of our docs more than once. + segment_entry.set_delete_cursor(delete_cursor); + Ok(()) } diff --git a/src/indexer/prepared_commit.rs b/src/indexer/prepared_commit.rs index 67e88501a..a9e8f7bc6 100644 --- a/src/indexer/prepared_commit.rs +++ b/src/indexer/prepared_commit.rs @@ -32,12 +32,12 @@ impl<'a> PreparedCommit<'a> { pub fn commit(self) -> crate::Result { info!("committing {}", self.opstamp); - let _ = block_on( + block_on( self.index_writer .segment_updater() .schedule_commit(self.opstamp, self.payload), - ); - let _ = block_on(self.index_writer.trigger_commit()); + )?; + block_on(self.index_writer.trigger_commit()); Ok(self.opstamp) } } diff --git a/src/indexer/segment_entry.rs b/src/indexer/segment_entry.rs index 60db7ddb2..ba5df20eb 100644 --- a/src/indexer/segment_entry.rs +++ b/src/indexer/segment_entry.rs @@ -3,7 +3,7 @@ use crate::core::SegmentId; use crate::core::SegmentMeta; use crate::directory::ManagedDirectory; use crate::indexer::delete_queue::DeleteCursor; -use crate::Segment; +use crate::{Opstamp, Segment}; use std::fmt; /// A segment entry describes the state of @@ -50,21 +50,32 @@ impl SegmentEntry { &self.segment } - /// Return a reference to the segment entry deleted bitset. + /// `Takes` (as in Option::take) the delete bitset of + /// a segment entry. /// /// `DocId` in this bitset are flagged as deleted. - pub fn delete_bitset(&self) -> Option<&BitSet> { - self.delete_bitset.as_ref() + pub fn take_delete_bitset(&mut self) -> Option { + self.delete_bitset.take() } - /// Set the `SegmentMeta` for this segment. - pub fn set_segment(&mut self, segment: Segment) { - self.segment = segment; + /// Reset the delete informmationo in this segment. + /// + /// The `SegmentEntry` segment's `SegmentMeta` gets updated, and + /// any delete bitset is drop and set to None. + pub fn reset_delete_meta(&mut self, num_deleted_docs: u32, target_opstamp: Opstamp) { + self.segment = self + .segment + .clone() + .with_delete_meta(num_deleted_docs, target_opstamp); + self.delete_bitset = None; } + pub fn set_delete_cursor(&mut self, delete_cursor: DeleteCursor) { + self.delete_cursor = delete_cursor; + } /// Return a reference to the segment_entry's delete cursor - pub fn delete_cursor(&mut self) -> &mut DeleteCursor { - &mut self.delete_cursor + pub fn delete_cursor(&mut self) -> DeleteCursor { + self.delete_cursor.clone() } /// Returns the segment id. diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index 502b42c29..119739dfa 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -116,8 +116,7 @@ fn merge( // First we apply all of the delet to the merged segment, up to the target opstamp. for segment_entry in &mut segment_entries { - let segment = index.segment(segment_entry.meta().clone()); - advance_deletes(segment, segment_entry, target_opstamp)?; + advance_deletes(segment_entry, target_opstamp)?; } let delete_cursor = segment_entries[0].delete_cursor().clone(); @@ -232,12 +231,11 @@ impl SegmentUpdater { pub fn schedule_add_segment( &self, - mut segment_entry: SegmentEntry, + segment_entry: SegmentEntry, ) -> impl Future> { // TODO temporary: serializing the segment at this point. let segment_updater = self.clone(); self.schedule_future(async move { - segment_entry.persist(segment_updater.index.directory().clone())?; segment_updater.segment_manager.add_segment(segment_entry); segment_updater.consider_merge_options().await; Ok(()) @@ -264,8 +262,7 @@ impl SegmentUpdater { fn purge_deletes(&self, target_opstamp: Opstamp) -> crate::Result> { let mut segment_entries = self.segment_manager.segment_entries(); for segment_entry in &mut segment_entries { - let segment = self.index.segment(segment_entry.meta().clone()); - advance_deletes(segment, segment_entry, target_opstamp)?; + advance_deletes(segment_entry, target_opstamp)?; } Ok(segment_entries) } @@ -335,8 +332,13 @@ impl SegmentUpdater { payload: Option, ) -> impl Future> { let segment_updater: SegmentUpdater = self.clone(); + let directory = self.index.directory().clone(); self.schedule_future(async move { - let segment_entries = segment_updater.purge_deletes(opstamp)?; + let mut segment_entries = segment_updater.purge_deletes(opstamp)?; + for segment_entry in &mut segment_entries { + let directory = directory.clone(); + segment_entry.persist(directory)?; + } segment_updater.segment_manager.commit(segment_entries); segment_updater.save_metas(opstamp, payload)?; let _ = garbage_collect_files(segment_updater.clone()).await; @@ -476,17 +478,14 @@ impl SegmentUpdater { let end_merge_future = self.schedule_future(async move { info!("End merge {:?}", after_merge_segment_entry.meta()); { - let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone(); + let mut delete_cursor = after_merge_segment_entry.delete_cursor(); if let Some(delete_operation) = delete_cursor.get() { let committed_opstamp = segment_updater.load_metas().opstamp; if delete_operation.opstamp < committed_opstamp { - let index = &segment_updater.index; - let segment = index.segment(after_merge_segment_entry.meta().clone()); - if let Err(e) = advance_deletes( - segment, - &mut after_merge_segment_entry, - committed_opstamp, - ) { + let _index = &segment_updater.index; + if let Err(e) = + advance_deletes(&mut after_merge_segment_entry, committed_opstamp) + { error!( "Merge of {:?} was cancelled (advancing deletes failed): {:?}", merge_operation.segment_ids(), diff --git a/src/query/term_query/mod.rs b/src/query/term_query/mod.rs index d38756b7e..396439fb2 100644 --- a/src/query/term_query/mod.rs +++ b/src/query/term_query/mod.rs @@ -112,6 +112,7 @@ mod tests { let term_a = Term::from_field_text(text_field, "a"); let term_query = TermQuery::new(term_a, IndexRecordOption::Basic); let reader = index.reader().unwrap(); + assert_eq!(reader.searcher().segment_readers().len(), 1); assert_eq!(term_query.count(&*reader.searcher()).unwrap(), 1); }