diff --git a/src/core/index.rs b/src/core/index.rs index 28d49a1dd..bd340bb40 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -1,4 +1,3 @@ -use super::segment::create_segment; use super::segment::Segment; use crate::core::Executor; use crate::core::IndexMeta; @@ -331,9 +330,8 @@ impl Index { .collect()) } - #[doc(hidden)] - pub fn segment(&self, segment_meta: SegmentMeta) -> Segment { - create_segment(self.clone(), segment_meta) + pub(crate) fn segment(&self, segment_meta: SegmentMeta) -> Segment { + Segment::for_index(self.clone(), segment_meta) } /// Creates a new segment. @@ -344,6 +342,13 @@ impl Index { self.segment(segment_meta) } + /// Creates a new segment. + pub(crate) fn new_segment_unpersisted(&self) -> Segment { + let meta = self + .inventory + .new_segment_meta(SegmentId::generate_random(), 0); + Segment::new_volatile(meta, self.schema()) + } /// Return a reference to the index directory. pub fn directory(&self) -> &ManagedDirectory { &self.directory diff --git a/src/core/segment.rs b/src/core/segment.rs index 90f1139b3..03836d1d5 100644 --- a/src/core/segment.rs +++ b/src/core/segment.rs @@ -3,51 +3,70 @@ use crate::core::Index; use crate::core::SegmentId; use crate::core::SegmentMeta; use crate::directory::error::{OpenReadError, OpenWriteError}; -use crate::directory::{Directory, DirectoryClone}; +use crate::directory::{Directory, ManagedDirectory, RAMDirectory}; use crate::directory::{ReadOnlySource, WritePtr}; use crate::indexer::segment_serializer::SegmentSerializer; use crate::schema::Schema; use crate::Opstamp; use crate::Result; +use failure::_core::ops::DerefMut; use std::fmt; +use std::ops::Deref; use std::path::PathBuf; use std::result; -/// A segment is a piece of the index. -pub struct Segment { - schema: Schema, - directory: Box, - meta: SegmentMeta, +#[derive(Clone)] +pub(crate) enum SegmentDirectory { + Persisted(ManagedDirectory), + Volatile(RAMDirectory), } -impl Clone for Segment { - fn clone(&self) -> Self { - Segment { - schema: self.schema.clone(), - directory: self.directory.box_clone(), - meta: self.meta.clone(), +impl SegmentDirectory { + pub fn new_volatile() -> SegmentDirectory { + SegmentDirectory::Volatile(RAMDirectory::default()) + } +} + +impl From for SegmentDirectory { + fn from(directory: ManagedDirectory) -> Self { + SegmentDirectory::Persisted(directory) + } +} + +impl Deref for SegmentDirectory { + type Target = dyn Directory; + + fn deref(&self) -> &Self::Target { + match self { + SegmentDirectory::Volatile(dir) => dir, + SegmentDirectory::Persisted(dir) => dir, } } } +impl DerefMut for SegmentDirectory { + fn deref_mut(&mut self) -> &mut Self::Target { + match self { + SegmentDirectory::Volatile(dir) => dir, + SegmentDirectory::Persisted(dir) => dir, + } + } +} + +/// A segment is a piece of the index. +#[derive(Clone)] +pub struct Segment { + schema: Schema, + meta: SegmentMeta, + directory: SegmentDirectory, +} + impl fmt::Debug for Segment { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "Segment({:?})", self.id().uuid_string()) } } -/// Creates a new segment given an `Index` and a `SegmentId` -/// -/// The function is here to make it private outside `tantivy`. -/// #[doc(hidden)] -pub fn create_segment(index: Index, meta: SegmentMeta) -> Segment { - Segment { - directory: index.directory().box_clone(), - schema: index.schema(), - meta, - } -} - impl Segment { /// Returns our index's schema. // TODO return a ref. @@ -55,6 +74,51 @@ impl Segment { self.schema.clone() } + pub(crate) fn new_persisted( + meta: SegmentMeta, + directory: ManagedDirectory, + schema: Schema, + ) -> Segment { + Segment { + meta, + schema, + directory: SegmentDirectory::from(directory), + } + } + + /// Creates a new segment that embeds its own `RAMDirectory`. + /// + /// That segment is entirely dissociated from the index directory. + /// It will be persisted by a background thread in charge of IO. + pub fn new_unpersisted(meta: SegmentMeta, schema: Schema) -> Segment { + Segment { + schema, + meta, + directory: SegmentDirectory::new_volatile(), + } + } + + /// Creates a new segment given an `Index` and a `SegmentId` + pub(crate) fn for_index(index: Index, meta: SegmentMeta) -> Segment { + Segment { + directory: SegmentDirectory::Persisted(index.directory().clone()), + schema: index.schema(), + meta, + } + } + + pub fn persist(&mut self, mut dest_directory: ManagedDirectory) -> crate::Result<()> { + if let SegmentDirectory::Persisted(_) = self.directory { + // this segment is already persisted. + return Ok(()); + } + if let SegmentDirectory::Volatile(ram_directory) = &self.directory { + ram_directory.persist(&mut dest_directory)?; + } + self.directory = SegmentDirectory::Persisted(dest_directory); + Ok(()) + } + /// Returns the segment meta-information pub fn meta(&self) -> &SegmentMeta { &self.meta diff --git a/src/directory/ram_directory.rs b/src/directory/ram_directory.rs index 5ee271dc8..de5a8112b 100644 --- a/src/directory/ram_directory.rs +++ b/src/directory/ram_directory.rs @@ -144,6 +144,16 @@ impl RAMDirectory { pub fn total_mem_usage(&self) -> usize { self.fs.read().unwrap().total_mem_usage() } + + pub fn persist(&self, dest: &mut dyn Directory) -> crate::Result<()> { + let wlock = self.fs.write().unwrap(); + for (path, source) in wlock.fs.iter() { + let mut dest_wrt = dest.open_write(path)?; + dest_wrt.write_all(source.as_slice())?; + dest_wrt.terminate()?; + } + Ok(()) + } } impl Directory for RAMDirectory { diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index ba597b867..0b6cda747 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -181,7 +181,7 @@ pub(crate) fn advance_deletes( delete_file.terminate()?; } - segment_entry.set_meta(segment.meta().clone()); + segment_entry.set_segment(segment); Ok(()) } @@ -233,11 +233,7 @@ fn index_documents( last_docstamp, )?; - let segment_entry = SegmentEntry::new( - segment_with_max_doc.meta().clone(), - delete_cursor, - delete_bitset_opt, - ); + let segment_entry = SegmentEntry::new(segment_with_max_doc, delete_cursor, delete_bitset_opt); block_on(segment_updater.schedule_add_segment(segment_entry))?; Ok(true) } @@ -375,13 +371,6 @@ impl IndexWriter { result } - #[doc(hidden)] - pub fn add_segment(&self, segment_meta: SegmentMeta) -> crate::Result<()> { - let delete_cursor = self.delete_queue.cursor(); - let segment_entry = SegmentEntry::new(segment_meta, delete_cursor, None); - block_on(self.segment_updater.schedule_add_segment(segment_entry)) - } - /// Creates a new segment. /// /// This method is useful only for users trying to do complex @@ -430,7 +419,7 @@ impl IndexWriter { // was dropped. return Ok(()); } - let segment = index.new_segment(); + let segment = index.new_segment_unpersisted(); index_documents( mem_budget, segment, diff --git a/src/indexer/segment_entry.rs b/src/indexer/segment_entry.rs index 1808fd1da..c949255df 100644 --- a/src/indexer/segment_entry.rs +++ b/src/indexer/segment_entry.rs @@ -1,7 +1,9 @@ use crate::common::BitSet; use crate::core::SegmentId; use crate::core::SegmentMeta; +use crate::directory::ManagedDirectory; use crate::indexer::delete_queue::DeleteCursor; +use crate::Segment; use std::fmt; /// A segment entry describes the state of @@ -19,25 +21,31 @@ use std::fmt; /// in the .del file or in the `delete_bitset`. #[derive(Clone)] pub struct SegmentEntry { - meta: SegmentMeta, + segment: Segment, delete_bitset: Option, delete_cursor: DeleteCursor, } impl SegmentEntry { /// Create a new `SegmentEntry` - pub fn new( - segment_meta: SegmentMeta, + pub(crate) fn new( + segment: Segment, delete_cursor: DeleteCursor, delete_bitset: Option, ) -> SegmentEntry { SegmentEntry { - meta: segment_meta, + segment, delete_bitset, delete_cursor, } } + pub fn persist(&mut self, dest_directory: ManagedDirectory) -> crate::Result<()> { + // TODO take in account delete bitset? + self.segment.persist(dest_directory)?; + Ok(()) + } + /// Return a reference to the segment entry deleted bitset. /// /// `DocId` in this bitset are flagged as deleted. @@ -46,8 +54,8 @@ impl SegmentEntry { } /// Set the `SegmentMeta` for this segment. - pub fn set_meta(&mut self, segment_meta: SegmentMeta) { - self.meta = segment_meta; + pub fn set_segment(&mut self, segment: Segment) { + self.segment = segment; } /// Return a reference to the segment_entry's delete cursor @@ -57,17 +65,22 @@ impl SegmentEntry { /// Returns the segment id. pub fn segment_id(&self) -> SegmentId { - self.meta.id() + self.segment.id() } /// Accessor to the `SegmentMeta` pub fn meta(&self) -> &SegmentMeta { - &self.meta + self.segment.meta() } } impl fmt::Debug for SegmentEntry { fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(formatter, "SegmentEntry({:?})", self.meta) + let num_deletes = self.delete_bitset.as_ref().map(|bitset| bitset.len()); + write!( + formatter, + "SegmentEntry(seg={:?}, ndel={:?})", + self.segment, num_deletes + ) } } diff --git a/src/indexer/segment_manager.rs b/src/indexer/segment_manager.rs index 066f8f7b2..9ad2dc28f 100644 --- a/src/indexer/segment_manager.rs +++ b/src/indexer/segment_manager.rs @@ -3,6 +3,7 @@ use crate::core::SegmentId; use crate::core::SegmentMeta; use crate::indexer::delete_queue::DeleteCursor; use crate::indexer::SegmentEntry; +use crate::Index; use std::collections::hash_set::HashSet; use std::fmt::{self, Debug, Formatter}; use std::sync::RwLock; @@ -74,13 +75,19 @@ pub fn get_mergeable_segments( impl SegmentManager { pub fn from_segments( + index: &Index, segment_metas: Vec, delete_cursor: &DeleteCursor, ) -> SegmentManager { SegmentManager { registers: RwLock::new(SegmentRegisters { uncommitted: SegmentRegister::default(), - committed: SegmentRegister::new(segment_metas, delete_cursor), + committed: SegmentRegister::new( + index.directory(), + &index.schema(), + segment_metas, + delete_cursor, + ), }), } } diff --git a/src/indexer/segment_register.rs b/src/indexer/segment_register.rs index 53e9e5285..5fa025f0d 100644 --- a/src/indexer/segment_register.rs +++ b/src/indexer/segment_register.rs @@ -1,7 +1,10 @@ use crate::core::SegmentId; use crate::core::SegmentMeta; +use crate::directory::ManagedDirectory; use crate::indexer::delete_queue::DeleteCursor; use crate::indexer::segment_entry::SegmentEntry; +use crate::schema::Schema; +use crate::Segment; use std::collections::HashMap; use std::collections::HashSet; use std::fmt::{self, Debug, Formatter}; @@ -79,11 +82,17 @@ impl SegmentRegister { self.segment_states.get(segment_id).cloned() } - pub fn new(segment_metas: Vec, delete_cursor: &DeleteCursor) -> SegmentRegister { + pub fn new( + directory: &ManagedDirectory, + schema: &Schema, + segment_metas: Vec, + delete_cursor: &DeleteCursor, + ) -> SegmentRegister { let mut segment_states = HashMap::new(); - for segment_meta in segment_metas { - let segment_id = segment_meta.id(); - let segment_entry = SegmentEntry::new(segment_meta, delete_cursor.clone(), None); + for meta in segment_metas { + let segment_id = meta.id(); + let segment = Segment::new_persisted(meta, directory.clone(), schema.clone()); + let segment_entry = SegmentEntry::new(segment, delete_cursor.clone(), None); segment_states.insert(segment_id, segment_entry); } SegmentRegister { segment_states } @@ -108,6 +117,7 @@ mod tests { fn test_segment_register() { let inventory = SegmentMetaInventory::default(); let delete_queue = DeleteQueue::new(); + let schema = Schema::builder().build(); let mut segment_register = SegmentRegister::default(); let segment_id_a = SegmentId::generate_random(); @@ -115,21 +125,24 @@ mod tests { let segment_id_merged = SegmentId::generate_random(); { - let segment_meta = inventory.new_segment_meta(segment_id_a, 0u32); - let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None); + let meta = inventory.new_segment_meta(segment_id_a, 0u32); + let segment = Segment::new_volatile(meta, schema.clone()); + let segment_entry = SegmentEntry::new(segment, delete_queue.cursor(), None); segment_register.add_segment_entry(segment_entry); } assert_eq!(segment_ids(&segment_register), vec![segment_id_a]); { let segment_meta = inventory.new_segment_meta(segment_id_b, 0u32); - let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None); + let segment = Segment::new_volatile(segment_meta, schema.clone()); + let segment_entry = SegmentEntry::new(segment, delete_queue.cursor(), None); segment_register.add_segment_entry(segment_entry); } segment_register.remove_segment(&segment_id_a); segment_register.remove_segment(&segment_id_b); { let segment_meta_merged = inventory.new_segment_meta(segment_id_merged, 0u32); - let segment_entry = SegmentEntry::new(segment_meta_merged, delete_queue.cursor(), None); + let segment = Segment::new_volatile(segment_meta_merged, schema); + let segment_entry = SegmentEntry::new(segment, delete_queue.cursor(), None); segment_register.add_segment_entry(segment_entry); } assert_eq!(segment_ids(&segment_register), vec![segment_id_merged]); diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index 16fbc6071..de108cff2 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -134,11 +134,13 @@ fn merge( // ... we just serialize this index merger in our new segment to merge the two segments. let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment)?; - let num_docs = merger.write(segment_serializer)?; + let max_doc = merger.write(segment_serializer)?; - let segment_meta = index.new_segment_meta(merged_segment.id(), num_docs); - - Ok(SegmentEntry::new(segment_meta, delete_cursor, None)) + Ok(SegmentEntry::new( + merged_segment.with_max_doc(max_doc), + delete_cursor, + None, + )) } pub(crate) struct InnerSegmentUpdater { @@ -167,7 +169,7 @@ impl SegmentUpdater { delete_cursor: &DeleteCursor, ) -> crate::Result { let segments = index.searchable_segment_metas()?; - let segment_manager = SegmentManager::from_segments(segments, delete_cursor); + let segment_manager = SegmentManager::from_segments(&index, segments, delete_cursor); let pool = ThreadPoolBuilder::new() .name_prefix("segment_updater") .pool_size(1) @@ -228,10 +230,12 @@ impl SegmentUpdater { pub fn schedule_add_segment( &self, - segment_entry: SegmentEntry, + mut segment_entry: SegmentEntry, ) -> impl Future> { + // TODO temporary: serializing the segment at this point. let segment_updater = self.clone(); self.schedule_future(async move { + segment_entry.persist(segment_updater.index.directory().clone())?; segment_updater.segment_manager.add_segment(segment_entry); segment_updater.consider_merge_options().await; Ok(())