diff --git a/src/core/segment.rs b/src/core/segment.rs index 113706e78..da5efb7b4 100644 --- a/src/core/segment.rs +++ b/src/core/segment.rs @@ -3,19 +3,59 @@ use crate::core::Index; use crate::core::SegmentId; use crate::core::SegmentMeta; use crate::directory::error::{OpenReadError, OpenWriteError}; -use crate::directory::Directory; +use crate::directory::{Directory, ManagedDirectory, RAMDirectory}; use crate::directory::{ReadOnlySource, WritePtr}; use crate::indexer::segment_serializer::SegmentSerializer; use crate::schema::Schema; use crate::Opstamp; use std::fmt; +use std::ops::{Deref, DerefMut}; use std::path::PathBuf; +#[derive(Clone)] +pub(crate) enum SegmentDirectory { + Persisted(ManagedDirectory), + Volatile(RAMDirectory), +} + +impl SegmentDirectory { + pub fn new_volatile() -> SegmentDirectory { + SegmentDirectory::Volatile(RAMDirectory::default()) + } +} + +impl From for SegmentDirectory { + fn from(directory: ManagedDirectory) -> Self { + SegmentDirectory::Persisted(directory) + } +} + +impl Deref for SegmentDirectory { + type Target = dyn Directory; + + fn deref(&self) -> &Self::Target { + match self { + SegmentDirectory::Volatile(dir) => dir, + SegmentDirectory::Persisted(dir) => dir, + } + } +} + +impl DerefMut for SegmentDirectory { + fn deref_mut(&mut self) -> &mut Self::Target { + match self { + SegmentDirectory::Volatile(dir) => dir, + SegmentDirectory::Persisted(dir) => dir, + } + } +} + /// A segment is a piece of the index. #[derive(Clone)] pub struct Segment { - index: Index, + schema: Schema, meta: SegmentMeta, + directory: SegmentDirectory, } impl fmt::Debug for Segment { @@ -25,19 +65,51 @@ impl fmt::Debug for Segment { } impl Segment { - /// Creates a new segment given an `Index` and a `SegmentId` - pub(crate) fn for_index(index: Index, meta: SegmentMeta) -> Segment { - Segment { index, meta } + pub(crate) fn new_persisted( + meta: SegmentMeta, + directory: ManagedDirectory, + schema: Schema, + ) -> Segment { + Segment { + meta, + schema, + directory: SegmentDirectory::from(directory), + } } - /// Returns the index the segment belongs to. - pub fn index(&self) -> &Index { - &self.index + /// Creates a new segment that embeds its own `RAMDirectory`. + /// + /// That segment is entirely dissociated from the index directory. + /// It will be persisted by a background thread in charge of IO. + pub fn new_volatile(meta: SegmentMeta, schema: Schema) -> Segment { + Segment { + schema, + meta, + directory: SegmentDirectory::new_volatile(), + } + } + + /// Creates a new segment given an `Index` and a `SegmentId` + pub(crate) fn for_index(index: Index, meta: SegmentMeta) -> Segment { + let segment_directory = index.directory().clone(); + Segment::new_persisted(meta, segment_directory, index.schema()) + } + + pub fn persist(&mut self, mut dest_directory: ManagedDirectory) -> crate::Result<()> { + if let SegmentDirectory::Persisted(_) = self.directory { + // this segment is already persisted. + return Ok(()); + } + if let SegmentDirectory::Volatile(ram_directory) = &self.directory { + ram_directory.persist(&mut dest_directory)?; + } + self.directory = SegmentDirectory::Persisted(dest_directory); + Ok(()) } /// Returns our index's schema. pub fn schema(&self) -> Schema { - self.index.schema() + self.schema.clone() } /// Returns the segment meta-information @@ -51,16 +123,18 @@ impl Segment { /// as we finalize a fresh new segment. pub(crate) fn with_max_doc(self, max_doc: u32) -> Segment { Segment { - index: self.index, + schema: self.schema, meta: self.meta.with_max_doc(max_doc), + directory: self.directory, } } #[doc(hidden)] pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: Opstamp) -> Segment { Segment { - index: self.index, + schema: self.schema, meta: self.meta.with_delete_meta(num_deleted_docs, opstamp), + directory: self.directory, } } @@ -80,15 +154,15 @@ impl Segment { /// Open one of the component file for a *regular* read. pub fn open_read(&self, component: SegmentComponent) -> Result { let path = self.relative_path(component); - let source = self.index.directory().open_read(&path)?; + let source = self.directory.open_read(&path)?; Ok(source) } /// Open one of the component file for *regular* write. pub fn open_write(&mut self, component: SegmentComponent) -> Result { let path = self.relative_path(component); - let write = self.index.directory_mut().open_write(&path)?; - Ok(write) + let wrt = self.directory.open_write(&path)?; + Ok(wrt) } } diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index a1e5eaa13..97602797e 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -23,6 +23,7 @@ use crate::indexer::SegmentWriter; use crate::schema::Document; use crate::schema::IndexRecordOption; use crate::schema::Term; +use crate::tokenizer::TokenizerManager; use crate::Opstamp; use crossbeam::channel; use futures::executor::block_on; @@ -194,11 +195,13 @@ fn index_documents( segment: Segment, grouped_document_iterator: &mut dyn Iterator, segment_updater: &mut SegmentUpdater, + tokenizers: &TokenizerManager, mut delete_cursor: DeleteCursor, ) -> crate::Result { let schema = segment.schema(); - let mut segment_writer = SegmentWriter::for_segment(memory_budget, segment.clone(), &schema)?; + let mut segment_writer = + SegmentWriter::for_segment(memory_budget, segment.clone(), &schema, tokenizers)?; for document_group in grouped_document_iterator { for doc in document_group { segment_writer.add_document(doc, &schema)?; @@ -439,6 +442,7 @@ impl IndexWriter { segment, &mut document_iterator, &mut segment_updater, + index.tokenizers(), delete_cursor.clone(), )?; } diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index a7afc70b2..010c73ca1 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -13,7 +13,7 @@ use crate::schema::Value; use crate::schema::{Field, FieldEntry}; use crate::store::StoreWriter; use crate::tokenizer::{BoxTokenStream, PreTokenizedStream}; -use crate::tokenizer::{FacetTokenizer, TextAnalyzer}; +use crate::tokenizer::{TokenizerManager, FacetTokenizer, TextAnalyzer}; use crate::tokenizer::{TokenStreamChain, Tokenizer}; use crate::Opstamp; use crate::{DocId, SegmentComponent}; @@ -67,6 +67,7 @@ impl SegmentWriter { memory_budget: usize, segment: Segment, schema: &Schema, + tokenizer_manager: &TokenizerManager, ) -> crate::Result { let table_num_bits = initial_table_size(memory_budget)?; let multifield_postings = MultiFieldPostingsWriter::new(schema, table_num_bits); @@ -78,7 +79,7 @@ impl SegmentWriter { .get_indexing_options() .and_then(|text_index_option| { let tokenizer_name = &text_index_option.tokenizer(); - segment.index().tokenizers().get(tokenizer_name) + tokenizer_manager.get(tokenizer_name) }), _ => None, }, diff --git a/src/postings/mod.rs b/src/postings/mod.rs index b66beb413..1adbb77af 100644 --- a/src/postings/mod.rs +++ b/src/postings/mod.rs @@ -220,7 +220,8 @@ pub mod tests { { let mut segment_writer = - SegmentWriter::for_segment(3_000_000, segment.clone(), &schema).unwrap(); + SegmentWriter::for_segment(3_000_000, segment.clone(), &schema, index.tokenizers()) + .unwrap(); { let mut doc = Document::default(); // checking that position works if the field has two values