From dfbe337fe2bf9bda0ff2836cb56bacda2f499a63 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Fri, 13 Dec 2019 09:50:00 +0900 Subject: [PATCH] Optimize deletes (#723) Closes #710 --- Cargo.toml | 1 - src/directory/mod.rs | 3 ++ src/fastfield/delete.rs | 25 +++++------ src/indexer/index_writer.rs | 82 +++++++++++++++++++--------------- src/indexer/segment_entry.rs | 2 +- src/indexer/segment_updater.rs | 16 ++++--- 6 files changed, 70 insertions(+), 59 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 53f3cf73d..8e81ea7ae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,7 +33,6 @@ fs2={version="0.4", optional=true} itertools = "0.8" levenshtein_automata = {version="0.1", features=["fst_automaton"]} notify = {version="4", optional=true} -bit-set = "0.5" uuid = { version = "0.8", features = ["v4", "serde"] } crossbeam = "0.7" futures = {version = "0.3", features=["thread-pool"] } diff --git a/src/directory/mod.rs b/src/directory/mod.rs index 5efd87be3..ceabbc3cc 100644 --- a/src/directory/mod.rs +++ b/src/directory/mod.rs @@ -48,6 +48,9 @@ pub use self::mmap_directory::MmapDirectory; pub use self::managed_directory::ManagedDirectory; /// Struct used to prevent from calling [`terminate_ref`](trait.TerminatingWrite#method.terminate_ref) directly +/// +/// The point is that while the type is public, it cannot be built by anyone +/// outside of this module. pub struct AntiCallToken(()); /// Trait used to indicate when no more write need to be done on a writer diff --git a/src/fastfield/delete.rs b/src/fastfield/delete.rs index faee047ba..7be1f911e 100644 --- a/src/fastfield/delete.rs +++ b/src/fastfield/delete.rs @@ -1,9 +1,8 @@ -use crate::common::HasLen; +use crate::common::{BitSet, HasLen}; use crate::directory::ReadOnlySource; use crate::directory::WritePtr; use crate::space_usage::ByteCount; use crate::DocId; -use bit_set::BitSet; use std::io; use std::io::Write; @@ -17,7 +16,7 @@ pub fn write_delete_bitset( ) -> io::Result<()> { let mut byte = 0u8; let mut shift = 0u8; - for doc in 0..(max_doc as usize) { + for doc in 0..max_doc { if delete_bitset.contains(doc) { byte |= 1 << shift; } @@ -32,7 +31,7 @@ pub fn write_delete_bitset( if max_doc % 8 > 0 { writer.write_all(&[byte])?; } - writer.flush() + Ok(()) } /// Set of deleted `DocId`s. @@ -86,7 +85,6 @@ impl HasLen for DeleteBitSet { mod tests { use super::*; use crate::directory::*; - use bit_set::BitSet; use std::path::PathBuf; fn test_delete_bitset_helper(bitset: &BitSet, max_doc: u32) { @@ -95,27 +93,26 @@ mod tests { { let mut writer = directory.open_write(&*test_path).unwrap(); write_delete_bitset(bitset, max_doc, &mut writer).unwrap(); + writer.terminate().unwrap(); } - { - let source = directory.open_read(&test_path).unwrap(); - let delete_bitset = DeleteBitSet::open(source); - for doc in 0..max_doc as usize { - assert_eq!(bitset.contains(doc), delete_bitset.is_deleted(doc as DocId)); - } - assert_eq!(delete_bitset.len(), bitset.len()); + let source = directory.open_read(&test_path).unwrap(); + let delete_bitset = DeleteBitSet::open(source); + for doc in 0..max_doc { + assert_eq!(bitset.contains(doc), delete_bitset.is_deleted(doc as DocId)); } + assert_eq!(delete_bitset.len(), bitset.len()); } #[test] fn test_delete_bitset() { { - let mut bitset = BitSet::with_capacity(10); + let mut bitset = BitSet::with_max_value(10); bitset.insert(1); bitset.insert(9); test_delete_bitset_helper(&bitset, 10); } { - let mut bitset = BitSet::with_capacity(8); + let mut bitset = BitSet::with_max_value(8); bitset.insert(1); bitset.insert(2); bitset.insert(3); diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 1ff96e66b..66ab10e28 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -1,6 +1,7 @@ use super::operation::{AddOperation, UserOperation}; use super::segment_updater::SegmentUpdater; use super::PreparedCommit; +use crate::common::BitSet; use crate::core::Index; use crate::core::Segment; use crate::core::SegmentComponent; @@ -23,7 +24,6 @@ use crate::schema::Document; use crate::schema::IndexRecordOption; use crate::schema::Term; use crate::Opstamp; -use bit_set::BitSet; use crossbeam::channel; use futures::executor::block_on; use futures::future::Future; @@ -115,7 +115,7 @@ fn compute_deleted_bitset( while docset.advance() { let deleted_doc = docset.doc(); if deleted_doc < limit_doc { - delete_bitset.insert(deleted_doc as usize); + delete_bitset.insert(deleted_doc); might_have_changed = true; } } @@ -126,51 +126,61 @@ fn compute_deleted_bitset( Ok(might_have_changed) } -/// Advance delete for the given segment up -/// to the target opstamp. +/// Advance delete for the given segment up to the target opstamp. +/// +/// Note that there are no guarantee that the resulting `segment_entry` delete_opstamp +/// is `==` target_opstamp. +/// For instance, there was no delete operation between the state of the `segment_entry` and +/// the `target_opstamp`, `segment_entry` is not updated. pub(crate) fn advance_deletes( mut segment: Segment, segment_entry: &mut SegmentEntry, target_opstamp: Opstamp, ) -> crate::Result<()> { - { - if segment_entry.meta().delete_opstamp() == Some(target_opstamp) { - // We are already up-to-date here. - return Ok(()); - } + if segment_entry.meta().delete_opstamp() == Some(target_opstamp) { + // We are already up-to-date here. + return Ok(()); + } - let segment_reader = SegmentReader::open(&segment)?; + let mut delete_cursor = segment_entry.delete_cursor().clone(); + if segment_entry.delete_bitset().is_none() && delete_cursor.get().is_none() { + // There has been no `DeleteOperation` between the segment status and `target_opstamp`. + return Ok(()); + } - let max_doc = segment_reader.max_doc(); - let mut delete_bitset: BitSet = match segment_entry.delete_bitset() { - Some(previous_delete_bitset) => (*previous_delete_bitset).clone(), - None => BitSet::with_capacity(max_doc as usize), - }; + let segment_reader = SegmentReader::open(&segment)?; - let delete_cursor = segment_entry.delete_cursor(); - compute_deleted_bitset( - &mut delete_bitset, - &segment_reader, - delete_cursor, - &DocToOpstampMapping::None, - target_opstamp, - )?; + let max_doc = segment_reader.max_doc(); + let mut delete_bitset: BitSet = match segment_entry.delete_bitset() { + Some(previous_delete_bitset) => (*previous_delete_bitset).clone(), + None => BitSet::with_max_value(max_doc), + }; - // TODO optimize + compute_deleted_bitset( + &mut delete_bitset, + &segment_reader, + &mut delete_cursor, + &DocToOpstampMapping::None, + target_opstamp, + )?; + + // TODO optimize + if let Some(seg_delete_bitset) = segment_reader.delete_bitset() { for doc in 0u32..max_doc { - if segment_reader.is_deleted(doc) { - delete_bitset.insert(doc as usize); + if seg_delete_bitset.is_deleted(doc) { + delete_bitset.insert(doc); } } - - let num_deleted_docs = delete_bitset.len(); - if num_deleted_docs > 0 { - segment = segment.with_delete_meta(num_deleted_docs as u32, target_opstamp); - let mut delete_file = segment.open_write(SegmentComponent::DELETE)?; - write_delete_bitset(&delete_bitset, max_doc, &mut delete_file)?; - delete_file.terminate()?; - } } + + let num_deleted_docs = delete_bitset.len(); + if num_deleted_docs > 0 { + segment = segment.with_delete_meta(num_deleted_docs as u32, target_opstamp); + let mut delete_file = segment.open_write(SegmentComponent::DELETE)?; + write_delete_bitset(&delete_bitset, max_doc, &mut delete_file)?; + delete_file.terminate()?; + } + segment_entry.set_meta(segment.meta().clone()); Ok(()) } @@ -236,7 +246,7 @@ fn apply_deletes( mut delete_cursor: &mut DeleteCursor, doc_opstamps: &[Opstamp], last_docstamp: Opstamp, -) -> crate::Result>> { +) -> crate::Result> { if delete_cursor.get().is_none() { // if there are no delete operation in the queue, no need // to even open the segment. @@ -246,7 +256,7 @@ fn apply_deletes( let doc_to_opstamps = DocToOpstampMapping::from(doc_opstamps); let max_doc = segment.meta().max_doc(); - let mut deleted_bitset = BitSet::with_capacity(max_doc as usize); + let mut deleted_bitset = BitSet::with_max_value(max_doc); let may_have_deletes = compute_deleted_bitset( &mut deleted_bitset, &segment_reader, diff --git a/src/indexer/segment_entry.rs b/src/indexer/segment_entry.rs index 7f4c8856c..1808fd1da 100644 --- a/src/indexer/segment_entry.rs +++ b/src/indexer/segment_entry.rs @@ -1,7 +1,7 @@ +use crate::common::BitSet; use crate::core::SegmentId; use crate::core::SegmentMeta; use crate::indexer::delete_queue::DeleteCursor; -use bit_set::BitSet; use std::fmt; /// A segment entry describes the state of diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index 0b65bf5f9..16fbc6071 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -214,6 +214,10 @@ impl SegmentUpdater { self.pool.spawn_ok(async move { let _ = sender.send(f.await); }); + } else { + let _ = sender.send(Err(crate::TantivyError::SystemError( + "Segment updater killed".to_string(), + ))); } receiver.unwrap_or_else(|_| { let err_msg = @@ -326,13 +330,11 @@ impl SegmentUpdater { ) -> impl Future> { let segment_updater: SegmentUpdater = self.clone(); self.schedule_future(async move { - if segment_updater.is_alive() { - let segment_entries = segment_updater.purge_deletes(opstamp)?; - segment_updater.segment_manager.commit(segment_entries); - segment_updater.save_metas(opstamp, payload)?; - let _ = garbage_collect_files(segment_updater.clone()).await; - segment_updater.consider_merge_options().await; - } + let segment_entries = segment_updater.purge_deletes(opstamp)?; + segment_updater.segment_manager.commit(segment_entries); + segment_updater.save_metas(opstamp, payload)?; + let _ = garbage_collect_files(segment_updater.clone()).await; + segment_updater.consider_merge_options().await; Ok(()) }) }