diff --git a/src/core/index.rs b/src/core/index.rs index 28d49a1dd..dd678a7e6 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -338,7 +338,7 @@ impl Index { /// Creates a new segment. pub fn new_segment(&self) -> Segment { - let segment_meta = self + let mut segment_meta = self .inventory .new_segment_meta(SegmentId::generate_random(), 0); self.segment(segment_meta) diff --git a/src/core/index_meta.rs b/src/core/index_meta.rs index 5cc2e5814..734452651 100644 --- a/src/core/index_meta.rs +++ b/src/core/index_meta.rs @@ -35,6 +35,7 @@ impl SegmentMetaInventory { segment_id, max_doc, deletes: None, + bundled: false, }; SegmentMeta::from(self.inventory.track(inner)) } @@ -81,6 +82,19 @@ impl SegmentMeta { self.tracked.segment_id } + pub fn with_bundled(self) -> SegmentMeta { + SegmentMeta::from(self.tracked.map(|inner| InnerSegmentMeta { + segment_id: inner.segment_id, + max_doc: inner.max_doc, + deletes: inner.deletes.clone(), + bundled: true, + })) + } + + pub fn is_bundled(&self) -> bool { + self.tracked.bundled + } + /// Returns the number of deleted documents. pub fn num_deleted_docs(&self) -> u32 { self.tracked @@ -107,8 +121,12 @@ impl SegmentMeta { /// It just joins the segment id with the extension /// associated to a segment component. pub fn relative_path(&self, component: SegmentComponent) -> PathBuf { - let mut path = self.id().uuid_string(); - path.push_str(&*match component { + let suffix = self.suffix(component); + self.relative_path_from_suffix(&suffix) + } + + fn suffix(&self, component: SegmentComponent) -> String { + match component { SegmentComponent::POSTINGS => ".idx".to_string(), SegmentComponent::POSITIONS => ".pos".to_string(), SegmentComponent::POSITIONSSKIP => ".posidx".to_string(), @@ -117,7 +135,17 @@ impl SegmentMeta { SegmentComponent::FASTFIELDS => ".fast".to_string(), SegmentComponent::FIELDNORMS => ".fieldnorm".to_string(), SegmentComponent::DELETE => format!(".{}.del", self.delete_opstamp().unwrap_or(0)), - }); + } + } + + /// Returns the relative path of a component of our segment. + /// + /// It just joins the segment id with the extension + /// associated to a segment component. + pub fn relative_path_from_suffix(&self, suffix: &str) -> PathBuf { + let mut path = self.id().uuid_string(); + path.push_str("."); + path.push_str(&suffix); PathBuf::from(path) } @@ -161,6 +189,7 @@ impl SegmentMeta { segment_id: inner_meta.segment_id, max_doc, deletes: None, + bundled: inner_meta.bundled, }); SegmentMeta { tracked } } @@ -175,6 +204,7 @@ impl SegmentMeta { segment_id: inner_meta.segment_id, max_doc: inner_meta.max_doc, deletes: Some(delete_meta), + bundled: inner_meta.bundled, }); SegmentMeta { tracked } } @@ -185,6 +215,7 @@ struct InnerSegmentMeta { segment_id: SegmentId, max_doc: u32, deletes: Option, + bundled: bool, } impl InnerSegmentMeta { diff --git a/src/core/segment.rs b/src/core/segment.rs index 910ce1371..0a7ba6e8c 100644 --- a/src/core/segment.rs +++ b/src/core/segment.rs @@ -8,10 +8,8 @@ use crate::directory::{ReadOnlyDirectory, ReadOnlySource, WritePtr}; use crate::indexer::segment_serializer::SegmentSerializer; use crate::schema::Schema; use crate::Opstamp; -use crate::Result; use std::fmt; use std::path::PathBuf; -use std::result; /// A segment is a piece of the index. #[derive(Clone)] @@ -83,23 +81,30 @@ impl Segment { } /// Open one of the component file for a *regular* read. - pub fn open_read( - &self, - component: SegmentComponent, - ) -> result::Result { + pub fn open_read(&self, component: SegmentComponent) -> Result { let path = self.relative_path(component); let source = self.index.directory().open_read(&path)?; Ok(source) } /// Open one of the component file for *regular* write. - pub fn open_write( + pub fn open_write(&mut self, component: SegmentComponent) -> Result { + let path = self.relative_path(component); + self.index.directory_mut().open_write(&path) + } + + pub fn open_bundle_writer(&mut self) -> Result { + let path = self.meta.relative_path_from_suffix("bundle"); + self.index.directory_mut().open_write(&path) + } + + pub(crate) fn open_write_in_directory( &mut self, component: SegmentComponent, - ) -> result::Result { + directory: &mut dyn Directory, + ) -> Result { let path = self.relative_path(component); - let write = self.index.directory_mut().open_write(&path)?; - Ok(write) + directory.open_write(&path) } } @@ -109,5 +114,5 @@ pub trait SerializableSegment { /// /// # Returns /// The number of documents in the segment. - fn write(&self, serializer: SegmentSerializer) -> Result; + fn write(&self, serializer: SegmentSerializer) -> crate::Result; } diff --git a/src/indexer/segment_serializer.rs b/src/indexer/segment_serializer.rs index 94c666e43..e14db720f 100644 --- a/src/indexer/segment_serializer.rs +++ b/src/indexer/segment_serializer.rs @@ -1,10 +1,13 @@ -use crate::Result; +use crate::Directory; use crate::core::Segment; use crate::core::SegmentComponent; +use crate::directory::error::OpenWriteError; +use crate::directory::{DirectoryClone, RAMDirectory, TerminatingWrite, WritePtr}; use crate::fastfield::FastFieldSerializer; use crate::fieldnorm::FieldNormsSerializer; use crate::postings::InvertedIndexSerializer; +use crate::schema::Schema; use crate::store::StoreWriter; /// Segment serializer is in charge of laying out on disk @@ -14,25 +17,50 @@ pub struct SegmentSerializer { fast_field_serializer: FastFieldSerializer, fieldnorms_serializer: FieldNormsSerializer, postings_serializer: InvertedIndexSerializer, + bundle_writer: Option<(RAMDirectory, WritePtr)>, +} + +pub(crate) struct SegmentSerializerWriters { + postings_wrt: WritePtr, + positions_skip_wrt: WritePtr, + positions_wrt: WritePtr, + terms_wrt: WritePtr, + fast_field_wrt: WritePtr, + fieldnorms_wrt: WritePtr, + store_wrt: WritePtr, +} + +impl SegmentSerializerWriters { + pub(crate) fn for_segment(segment: &mut Segment) -> Result { + Ok(SegmentSerializerWriters { + postings_wrt: segment.open_write(SegmentComponent::POSTINGS)?, + positions_skip_wrt: segment.open_write(SegmentComponent::POSITIONS)?, + positions_wrt: segment.open_write(SegmentComponent::POSITIONSSKIP)?, + terms_wrt: segment.open_write(SegmentComponent::TERMS)?, + fast_field_wrt: segment.open_write(SegmentComponent::FASTFIELDS)?, + fieldnorms_wrt: segment.open_write(SegmentComponent::FIELDNORMS)?, + store_wrt: segment.open_write(SegmentComponent::STORE)?, + }) + } } impl SegmentSerializer { - /// Creates a new `SegmentSerializer`. - pub fn for_segment(segment: &mut Segment) -> Result { - let store_write = segment.open_write(SegmentComponent::STORE)?; - - let fast_field_write = segment.open_write(SegmentComponent::FASTFIELDS)?; - let fast_field_serializer = FastFieldSerializer::from_write(fast_field_write)?; - - let fieldnorms_write = segment.open_write(SegmentComponent::FIELDNORMS)?; - let fieldnorms_serializer = FieldNormsSerializer::from_write(fieldnorms_write)?; - - let postings_serializer = InvertedIndexSerializer::open(segment)?; + pub(crate) fn new(schema: Schema, writers: SegmentSerializerWriters) -> crate::Result { + let fast_field_serializer = FastFieldSerializer::from_write(writers.fast_field_wrt)?; + let fieldnorms_serializer = FieldNormsSerializer::from_write(writers.fieldnorms_wrt)?; + let postings_serializer = InvertedIndexSerializer::open( + schema, + writers.terms_wrt, + writers.postings_wrt, + writers.positions_wrt, + writers.positions_skip_wrt, + ); Ok(SegmentSerializer { - store_writer: StoreWriter::new(store_write), + store_writer: StoreWriter::new(writers.store_wrt), fast_field_serializer, fieldnorms_serializer, postings_serializer, + bundle_writer: None, }) } @@ -57,11 +85,15 @@ impl SegmentSerializer { } /// Finalize the segment serialization. - pub fn close(self) -> Result<()> { + pub fn close(mut self) -> crate::Result<()> { self.fast_field_serializer.close()?; self.postings_serializer.close()?; self.store_writer.close()?; self.fieldnorms_serializer.close()?; + if let Some((ram_directory, mut bundle_wrt)) = self.bundle_writer.take() { + ram_directory.serialize_bundle(&mut bundle_wrt)?; + bundle_wrt.terminate()?; + } Ok(()) } } diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index 16fbc6071..c59fd3b76 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -12,6 +12,7 @@ use crate::indexer::index_writer::advance_deletes; use crate::indexer::merge_operation::MergeOperationInventory; use crate::indexer::merger::IndexMerger; use crate::indexer::segment_manager::SegmentsStatus; +use crate::indexer::segment_serializer::SegmentSerializerWriters; use crate::indexer::stamper::Stamper; use crate::indexer::SegmentEntry; use crate::indexer::SegmentSerializer; @@ -132,7 +133,9 @@ fn merge( let merger: IndexMerger = IndexMerger::open(index.schema(), &segments[..])?; // ... we just serialize this index merger in our new segment to merge the two segments. - let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment)?; + let segment_serializer_wrts = SegmentSerializerWriters::for_segment(&mut merged_segment)?; + let segment_serializer = + SegmentSerializer::new(merged_segment.schema(), segment_serializer_wrts)?; let num_docs = merger.write(segment_serializer)?; diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index 8ed1025ba..77564c3f4 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -3,7 +3,7 @@ use crate::core::Segment; use crate::core::SerializableSegment; use crate::fastfield::FastFieldsWriter; use crate::fieldnorm::FieldNormsWriter; -use crate::indexer::segment_serializer::SegmentSerializer; +use crate::indexer::segment_serializer::{SegmentSerializer, SegmentSerializerWriters}; use crate::postings::compute_table_size; use crate::postings::MultiFieldPostingsWriter; use crate::schema::FieldType; @@ -69,7 +69,8 @@ impl SegmentWriter { schema: &Schema, ) -> Result { let table_num_bits = initial_table_size(memory_budget)?; - let segment_serializer = SegmentSerializer::for_segment(&mut segment)?; + let segment_serializer_wrts = SegmentSerializerWriters::for_segment(&mut segment)?; + let segment_serializer = SegmentSerializer::new(segment.schema(), segment_serializer_wrts)?; let multifield_postings = MultiFieldPostingsWriter::new(schema, table_num_bits); let tokenizers = schema .fields() diff --git a/src/postings/mod.rs b/src/postings/mod.rs index b66beb413..10a50a243 100644 --- a/src/postings/mod.rs +++ b/src/postings/mod.rs @@ -75,7 +75,7 @@ pub mod tests { let schema = schema_builder.build(); let index = Index::create_in_ram(schema); let mut segment = index.new_segment(); - let mut posting_serializer = InvertedIndexSerializer::open(&mut segment).unwrap(); + let mut posting_serializer = InvertedIndexSerializer::for_segment(&mut segment).unwrap(); { let mut field_serializer = posting_serializer.new_field(text_field, 120 * 4).unwrap(); field_serializer.new_term("abc".as_bytes()).unwrap(); diff --git a/src/postings/serializer.rs b/src/postings/serializer.rs index 79e362a62..c80cab703 100644 --- a/src/postings/serializer.rs +++ b/src/postings/serializer.rs @@ -10,8 +10,8 @@ use crate::postings::USE_SKIP_INFO_LIMIT; use crate::schema::Schema; use crate::schema::{Field, FieldEntry, FieldType}; use crate::termdict::{TermDictionaryBuilder, TermOrdinal}; -use crate::DocId; use crate::Result; +use crate::{Directory, DocId}; use std::io::{self, Write}; /// `InvertedIndexSerializer` is in charge of serializing @@ -54,33 +54,36 @@ pub struct InvertedIndexSerializer { } impl InvertedIndexSerializer { - /// Open a new `InvertedIndexSerializer` for the given segment - fn create( - terms_write: CompositeWrite, - postings_write: CompositeWrite, - positions_write: CompositeWrite, - positionsidx_write: CompositeWrite, - schema: Schema, - ) -> Result { - Ok(InvertedIndexSerializer { - terms_write, - postings_write, - positions_write, - positionsidx_write, + pub(crate) fn for_segment(segment: &mut Segment) -> crate::Result { + let schema = segment.schema(); + use crate::core::SegmentComponent; + let terms_wrt = segment.open_write(SegmentComponent::TERMS)?; + let postings_wrt = segment.open_write(SegmentComponent::POSTINGS)?; + let positions_wrt = segment.open_write(SegmentComponent::POSITIONS)?; + let positions_idx_wrt = segment.open_write(SegmentComponent::POSITIONSSKIP)?; + Ok(Self::open( schema, - }) + terms_wrt, + postings_wrt, + positions_wrt, + positions_idx_wrt, + )) } - /// Open a new `PostingsSerializer` for the given segment - pub fn open(segment: &mut Segment) -> Result { - use crate::SegmentComponent::{POSITIONS, POSITIONSSKIP, POSTINGS, TERMS}; - InvertedIndexSerializer::create( - CompositeWrite::wrap(segment.open_write(TERMS)?), - CompositeWrite::wrap(segment.open_write(POSTINGS)?), - CompositeWrite::wrap(segment.open_write(POSITIONS)?), - CompositeWrite::wrap(segment.open_write(POSITIONSSKIP)?), - segment.schema(), - ) + pub(crate) fn open( + schema: Schema, + terms_wrt: WritePtr, + postings_wrt: WritePtr, + positions_wrt: WritePtr, + positions_idx_wrt: WritePtr, + ) -> InvertedIndexSerializer { + InvertedIndexSerializer { + terms_write: CompositeWrite::wrap(terms_wrt), + postings_write: CompositeWrite::wrap(postings_wrt), + positions_write: CompositeWrite::wrap(positions_wrt), + positionsidx_write: CompositeWrite::wrap(positions_idx_wrt), + schema, + } } /// Must be called before starting pushing terms of