diff --git a/src/core/segment.rs b/src/core/segment.rs index da5efb7b4..cc085d1b1 100644 --- a/src/core/segment.rs +++ b/src/core/segment.rs @@ -107,6 +107,7 @@ impl Segment { Ok(()) } + /// Returns our index's schema. pub fn schema(&self) -> Schema { self.schema.clone() diff --git a/src/directory/spilling_writer.rs b/src/directory/spilling_writer.rs index ed0c3a54c..dac2c155d 100644 --- a/src/directory/spilling_writer.rs +++ b/src/directory/spilling_writer.rs @@ -63,15 +63,19 @@ impl SpillingWriter { } pub fn flush_and_finalize(self) -> io::Result<()> { - if let SpillingState::Buffer { - buffer, - write_factory, - .. - } = self.state.expect("State cannot be none") { - let mut wrt = write_factory()?; - wrt.write_all(&buffer[..])?; - wrt.flush()?; - wrt.terminate()?; + match self.state.expect("State cannot be none") { + SpillingState::Buffer { + buffer, + write_factory, + .. + } => { + let mut wrt = write_factory()?; + wrt.write_all(&buffer[..])?; + wrt.terminate()?; + } + SpillingState::Spilled(wrt) => { + wrt.terminate()?; + } } Ok(()) } diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 97602797e..74d6cf2ec 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -1,4 +1,5 @@ use super::operation::{AddOperation, UserOperation}; +use crate::indexer::segment_manager::SegmentRegisters; use super::segment_updater::SegmentUpdater; use super::PreparedCommit; use crate::common::BitSet; @@ -32,9 +33,10 @@ use smallvec::smallvec; use smallvec::SmallVec; use std::mem; use std::ops::Range; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::thread; use std::thread::JoinHandle; +use crate::indexer::segment_register::SegmentRegister; // Size of the margin for the heap. A segment is closed when the remaining memory // in the heap goes below MARGIN_IN_BYTES. @@ -71,6 +73,8 @@ pub struct IndexWriter { index: Index, + segment_registers: Arc>, + heap_size_in_bytes_per_thread: usize, workers_join_handle: Vec>>, @@ -134,7 +138,6 @@ fn compute_deleted_bitset( /// For instance, there was no delete operation between the state of the `segment_entry` and /// the `target_opstamp`, `segment_entry` is not updated. pub(crate) fn advance_deletes( - mut segment: Segment, segment_entry: &mut SegmentEntry, target_opstamp: Opstamp, ) -> crate::Result<()> { @@ -143,25 +146,33 @@ pub(crate) fn advance_deletes( return Ok(()); } - if segment_entry.delete_bitset().is_none() && segment_entry.delete_cursor().get().is_none() { + let delete_bitset_opt = segment_entry.take_delete_bitset(); + + // We avoid directly advancing the `SegmentEntry` delete cursor, because + // we do not want to end up in an invalid state if the delete bitset + // serialization fails. + let mut delete_cursor = segment_entry.delete_cursor(); + + if delete_bitset_opt.is_none() && segment_entry.delete_cursor().get().is_none() { // There has been no `DeleteOperation` between the segment status and `target_opstamp`. return Ok(()); } + // We open our current serialized segment to compute the new deleted bitset. + let segment = segment_entry.segment().clone(); let segment_reader = SegmentReader::open(&segment)?; let max_doc = segment_reader.max_doc(); - let mut delete_bitset: BitSet = match segment_entry.delete_bitset() { - Some(previous_delete_bitset) => (*previous_delete_bitset).clone(), - None => BitSet::with_max_value(max_doc), - }; + + let mut delete_bitset: BitSet = + delete_bitset_opt.unwrap_or_else(|| BitSet::with_max_value(max_doc)); let num_deleted_docs_before = segment.meta().num_deleted_docs(); compute_deleted_bitset( &mut delete_bitset, &segment_reader, - segment_entry.delete_cursor(), + &mut delete_cursor, &DocToOpstampMapping::None, target_opstamp, )?; @@ -180,13 +191,18 @@ pub(crate) fn advance_deletes( let num_deleted_docs: u32 = delete_bitset.len() as u32; if num_deleted_docs > num_deleted_docs_before { // There are new deletes. We need to write a new delete file. - segment = segment.with_delete_meta(num_deleted_docs as u32, target_opstamp); - let mut delete_file = segment.open_write(SegmentComponent::DELETE)?; + let mut delete_file = segment + .with_delete_meta(num_deleted_docs as u32, target_opstamp) + .open_write(SegmentComponent::DELETE)?; write_delete_bitset(&delete_bitset, max_doc, &mut delete_file)?; delete_file.terminate()?; + segment_entry.reset_delete_meta(num_deleted_docs as u32, target_opstamp); } - segment_entry.set_meta(segment.meta().clone()); + // Regardless of whether we did end up having to write a new file or not + // we advance the `delete_cursor`. This is an optimisation. We want to ensure we do not + // check that a given deleted term does not match any of our docs more than once. + segment_entry.set_delete_cursor(delete_cursor); Ok(()) } @@ -240,7 +256,7 @@ fn index_documents( )?; let segment_entry = SegmentEntry::new( - segment_with_max_doc.meta().clone(), + segment_with_max_doc, delete_cursor, delete_bitset_opt, ); @@ -317,10 +333,20 @@ impl IndexWriter { let current_opstamp = index.load_metas()?.opstamp; + let meta = index.load_metas()?; + let stamper = Stamper::new(current_opstamp); + let commited_segments = SegmentRegister::new( + index.directory(), + &index.schema(), + meta.segments, + &delete_queue.cursor(), + ); + let segment_registers = Arc::new(RwLock::new(SegmentRegisters::new(commited_segments))); + let segment_updater = - SegmentUpdater::create(index.clone(), stamper.clone(), &delete_queue.cursor())?; + SegmentUpdater::create(segment_registers.clone(), index.clone(), stamper.clone())?; let mut index_writer = IndexWriter { _directory_lock: Some(directory_lock), @@ -342,6 +368,7 @@ impl IndexWriter { stamper, worker_id: 0, + segment_registers }; index_writer.start_workers()?; Ok(index_writer) @@ -381,13 +408,6 @@ impl IndexWriter { result } - #[doc(hidden)] - pub fn add_segment(&self, segment_meta: SegmentMeta) -> crate::Result<()> { - let delete_cursor = self.delete_queue.cursor(); - let segment_entry = SegmentEntry::new(segment_meta, delete_cursor, None); - block_on(self.segment_updater.schedule_add_segment(segment_entry)) - } - /// Creates a new segment. /// /// This method is useful only for users trying to do complex diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index f2531a7b8..c819fb4c5 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -1,4 +1,5 @@ use crate::common::MAX_DOC_LIMIT; +use crate::directory::TerminatingWrite; use crate::core::Segment; use crate::core::SegmentReader; use crate::core::SerializableSegment; @@ -674,7 +675,8 @@ impl IndexMerger { store_writer.stack(&store_reader)?; } } - store_writer.close()?; + let store_wrt = store_writer.close()?; + store_wrt.terminate()?; Ok(()) } } diff --git a/src/indexer/segment_entry.rs b/src/indexer/segment_entry.rs index 1808fd1da..1ab52accc 100644 --- a/src/indexer/segment_entry.rs +++ b/src/indexer/segment_entry.rs @@ -3,6 +3,7 @@ use crate::core::SegmentId; use crate::core::SegmentMeta; use crate::indexer::delete_queue::DeleteCursor; use std::fmt; +use crate::{Segment, Opstamp}; /// A segment entry describes the state of /// a given segment, at a given instant. @@ -19,7 +20,7 @@ use std::fmt; /// in the .del file or in the `delete_bitset`. #[derive(Clone)] pub struct SegmentEntry { - meta: SegmentMeta, + segment: Segment, delete_bitset: Option, delete_cursor: DeleteCursor, } @@ -27,12 +28,12 @@ pub struct SegmentEntry { impl SegmentEntry { /// Create a new `SegmentEntry` pub fn new( - segment_meta: SegmentMeta, + segment: Segment, delete_cursor: DeleteCursor, delete_bitset: Option, ) -> SegmentEntry { SegmentEntry { - meta: segment_meta, + segment, delete_bitset, delete_cursor, } @@ -45,29 +46,52 @@ impl SegmentEntry { self.delete_bitset.as_ref() } - /// Set the `SegmentMeta` for this segment. - pub fn set_meta(&mut self, segment_meta: SegmentMeta) { - self.meta = segment_meta; + + pub fn set_delete_cursor(&mut self, delete_cursor: DeleteCursor) { + self.delete_cursor = delete_cursor; + } + + /// `Takes` (as in Option::take) the delete bitset of a segment entry. + /// `DocId` in this bitset are flagged as deleted. + pub fn take_delete_bitset(&mut self) -> Option { + self.delete_bitset.take() + } + + /// Reset the delete information in this segment. + /// + /// The `SegmentEntry` segment's `SegmentMeta` gets updated, and + /// any delete bitset is drop and set to None. + pub fn reset_delete_meta(&mut self, num_deleted_docs: u32, target_opstamp: Opstamp) { + self.segment = self + .segment + .clone() + .with_delete_meta(num_deleted_docs, target_opstamp); + self.delete_bitset = None; } /// Return a reference to the segment_entry's delete cursor - pub fn delete_cursor(&mut self) -> &mut DeleteCursor { - &mut self.delete_cursor + pub fn delete_cursor(&mut self) -> DeleteCursor { + self.delete_cursor.clone() } /// Returns the segment id. pub fn segment_id(&self) -> SegmentId { - self.meta.id() + self.meta().id() } + /// Returns the `segment` associated to the `SegmentEntry`. + pub fn segment(&self) -> &Segment { + &self.segment + } /// Accessor to the `SegmentMeta` pub fn meta(&self) -> &SegmentMeta { - &self.meta + self.segment.meta() } } + impl fmt::Debug for SegmentEntry { fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(formatter, "SegmentEntry({:?})", self.meta) + write!(formatter, "SegmentEntry({:?})", self.meta()) } } diff --git a/src/indexer/segment_manager.rs b/src/indexer/segment_manager.rs index 5af52313d..18532f514 100644 --- a/src/indexer/segment_manager.rs +++ b/src/indexer/segment_manager.rs @@ -2,15 +2,15 @@ use super::segment_register::SegmentRegister; use crate::core::SegmentId; use crate::core::SegmentMeta; use crate::error::TantivyError; -use crate::indexer::delete_queue::DeleteCursor; use crate::indexer::SegmentEntry; use std::collections::hash_set::HashSet; use std::fmt::{self, Debug, Formatter}; -use std::sync::RwLock; +use std::sync::{RwLock, Arc}; use std::sync::{RwLockReadGuard, RwLockWriteGuard}; +use crate::Segment; #[derive(Default)] -struct SegmentRegisters { +pub(crate) struct SegmentRegisters { uncommitted: SegmentRegister, committed: SegmentRegister, } @@ -22,6 +22,18 @@ pub(crate) enum SegmentsStatus { } impl SegmentRegisters { + + + pub fn new(committed: SegmentRegister) -> SegmentRegisters { + SegmentRegisters { + uncommitted: Default::default(), + committed, + } + } + + pub fn committed_segment(&self) -> Vec { + self.committed.segments() + } /// Check if all the segments are committed or uncommited. /// /// If some segment is missing or segments are in a different state (this should not happen @@ -44,7 +56,7 @@ impl SegmentRegisters { /// changes (merges especially) #[derive(Default)] pub struct SegmentManager { - registers: RwLock, + registers: Arc>, } impl Debug for SegmentManager { @@ -74,15 +86,11 @@ pub fn get_mergeable_segments( } impl SegmentManager { - pub fn from_segments( - segment_metas: Vec, - delete_cursor: &DeleteCursor, - ) -> SegmentManager { + + + pub(crate) fn new(registers: Arc>) -> SegmentManager { SegmentManager { - registers: RwLock::new(SegmentRegisters { - uncommitted: SegmentRegister::default(), - committed: SegmentRegister::new(segment_metas, delete_cursor), - }), + registers } } diff --git a/src/indexer/segment_register.rs b/src/indexer/segment_register.rs index 53e9e5285..a3e554a02 100644 --- a/src/indexer/segment_register.rs +++ b/src/indexer/segment_register.rs @@ -5,6 +5,9 @@ use crate::indexer::segment_entry::SegmentEntry; use std::collections::HashMap; use std::collections::HashSet; use std::fmt::{self, Debug, Formatter}; +use crate::Segment; +use crate::directory::ManagedDirectory; +use crate::schema::Schema; /// The segment register keeps track /// of the list of segment, their size as well @@ -45,6 +48,13 @@ impl SegmentRegister { .map(|segment_entry| segment_entry.meta().clone()) .collect() } + + pub fn segments(&self) -> Vec { + self.segment_states + .values() + .map(|segment_entry| segment_entry.segment().clone()) + .collect() + } pub fn segment_entries(&self) -> Vec { self.segment_states.values().cloned().collect() @@ -79,11 +89,17 @@ impl SegmentRegister { self.segment_states.get(segment_id).cloned() } - pub fn new(segment_metas: Vec, delete_cursor: &DeleteCursor) -> SegmentRegister { + pub fn new( + directory: &ManagedDirectory, + schema: &Schema, + segment_metas: Vec, + delete_cursor: &DeleteCursor, + ) -> SegmentRegister { let mut segment_states = HashMap::new(); for segment_meta in segment_metas { let segment_id = segment_meta.id(); - let segment_entry = SegmentEntry::new(segment_meta, delete_cursor.clone(), None); + let segment = Segment::new_persisted(segment_meta, directory.clone(), schema.clone()); + let segment_entry = SegmentEntry::new(segment, delete_cursor.clone(), None); segment_states.insert(segment_id, segment_entry); } SegmentRegister { segment_states } @@ -108,6 +124,7 @@ mod tests { fn test_segment_register() { let inventory = SegmentMetaInventory::default(); let delete_queue = DeleteQueue::new(); + let schema = Schema::builder().build(); let mut segment_register = SegmentRegister::default(); let segment_id_a = SegmentId::generate_random(); @@ -115,21 +132,24 @@ mod tests { let segment_id_merged = SegmentId::generate_random(); { - let segment_meta = inventory.new_segment_meta(segment_id_a, 0u32); - let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None); + let meta = inventory.new_segment_meta(segment_id_a, 0u32); + let segment = Segment::new_volatile(meta, schema.clone()); + let segment_entry = SegmentEntry::new(segment, delete_queue.cursor(), None); segment_register.add_segment_entry(segment_entry); } assert_eq!(segment_ids(&segment_register), vec![segment_id_a]); { - let segment_meta = inventory.new_segment_meta(segment_id_b, 0u32); - let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None); + let meta = inventory.new_segment_meta(segment_id_b, 0u32); + let segment = Segment::new_volatile(meta, schema.clone()); + let segment_entry = SegmentEntry::new(segment, delete_queue.cursor(), None); segment_register.add_segment_entry(segment_entry); } segment_register.remove_segment(&segment_id_a); segment_register.remove_segment(&segment_id_b); { let segment_meta_merged = inventory.new_segment_meta(segment_id_merged, 0u32); - let segment_entry = SegmentEntry::new(segment_meta_merged, delete_queue.cursor(), None); + let segment_merged = Segment::new_volatile(segment_meta_merged, schema.clone()); + let segment_entry = SegmentEntry::new(segment_merged, delete_queue.cursor(), None); segment_register.add_segment_entry(segment_entry); } assert_eq!(segment_ids(&segment_register), vec![segment_id_merged]); diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index 5d3c09562..1d41bd25a 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -7,11 +7,10 @@ use crate::core::SegmentMeta; use crate::core::SerializableSegment; use crate::core::META_FILEPATH; use crate::directory::{Directory, DirectoryClone, GarbageCollectionResult}; -use crate::indexer::delete_queue::DeleteCursor; use crate::indexer::index_writer::advance_deletes; use crate::indexer::merge_operation::MergeOperationInventory; use crate::indexer::merger::IndexMerger; -use crate::indexer::segment_manager::SegmentsStatus; +use crate::indexer::segment_manager::{SegmentsStatus, SegmentRegisters}; use crate::indexer::stamper::Stamper; use crate::indexer::SegmentEntry; use crate::indexer::SegmentSerializer; @@ -117,8 +116,7 @@ fn merge( // First we apply all of the delet to the merged segment, up to the target opstamp. for segment_entry in &mut segment_entries { - let segment = index.segment(segment_entry.meta().clone()); - advance_deletes(segment, segment_entry, target_opstamp)?; + advance_deletes( segment_entry, target_opstamp)?; } let delete_cursor = segment_entries[0].delete_cursor().clone(); @@ -137,10 +135,9 @@ fn merge( let store_wrt = merged_segment.open_write(SegmentComponent::STORE)?; merger.write_storable_fields(store_wrt)?; - let num_docs = merger.write(segment_serializer)?; - let segment_meta = index.new_segment_meta(merged_segment.id(), num_docs); + let max_doc = merger.write(segment_serializer)?; - Ok(SegmentEntry::new(segment_meta, delete_cursor, None)) + Ok(SegmentEntry::new(merged_segment.with_max_doc(max_doc), delete_cursor, None)) } pub(crate) struct InnerSegmentUpdater { @@ -164,12 +161,11 @@ pub(crate) struct InnerSegmentUpdater { impl SegmentUpdater { pub fn create( + segment_registers: Arc>, index: Index, - stamper: Stamper, - delete_cursor: &DeleteCursor, + stamper: Stamper ) -> crate::Result { - let segments = index.searchable_segment_metas()?; - let segment_manager = SegmentManager::from_segments(segments, delete_cursor); + let segment_manager = SegmentManager::new(segment_registers); let pool = ThreadPoolBuilder::new() .name_prefix("segment_updater") .pool_size(1) @@ -264,8 +260,7 @@ impl SegmentUpdater { fn purge_deletes(&self, target_opstamp: Opstamp) -> crate::Result> { let mut segment_entries = self.segment_manager.segment_entries(); for segment_entry in &mut segment_entries { - let segment = self.index.segment(segment_entry.meta().clone()); - advance_deletes(segment, segment_entry, target_opstamp)?; + advance_deletes(segment_entry, target_opstamp)?; } Ok(segment_entries) } @@ -480,10 +475,7 @@ impl SegmentUpdater { if let Some(delete_operation) = delete_cursor.get() { let committed_opstamp = segment_updater.load_metas().opstamp; if delete_operation.opstamp < committed_opstamp { - let index = &segment_updater.index; - let segment = index.segment(after_merge_segment_entry.meta().clone()); if let Err(e) = advance_deletes( - segment, &mut after_merge_segment_entry, committed_opstamp, ) { diff --git a/src/store/writer.rs b/src/store/writer.rs index c74a16ad9..60306ce09 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -100,6 +100,8 @@ impl StoreWriter { /// /// Compress the last unfinished block if any, /// and serializes the skip list index on disc. + /// + /// The returned writer is not flushed. pub fn close(mut self) -> io::Result { if !self.current_block.is_empty() { self.write_and_compress_block()?; @@ -108,7 +110,6 @@ impl StoreWriter { self.offset_index_writer.write(&mut self.writer)?; header_offset.serialize(&mut self.writer)?; self.doc.serialize(&mut self.writer)?; - self.writer.flush()?; let (wrt, _) = self.writer.finish()?; Ok(wrt)