diff --git a/src/directory/mod.rs b/src/directory/mod.rs index df5e55d81..48861b815 100644 --- a/src/directory/mod.rs +++ b/src/directory/mod.rs @@ -13,6 +13,7 @@ mod footer; mod managed_directory; mod ram_directory; mod read_only_source; +mod spilling_writer; mod watch_event_router; /// Errors specific to the directory module. @@ -22,6 +23,7 @@ pub use self::directory::DirectoryLock; pub use self::directory::{Directory, DirectoryClone}; pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK}; pub use self::ram_directory::RAMDirectory; +pub(crate) use self::spilling_writer::SpillingWriter; pub use self::read_only_source::ReadOnlySource; pub use self::watch_event_router::{WatchCallback, WatchCallbackList, WatchHandle}; use std::io::{self, BufWriter, Write}; @@ -79,10 +81,16 @@ impl TerminatingWrite for BufWriter { } } +impl TerminatingWrite for Vec { + fn terminate_ref(&mut self, _a: AntiCallToken) -> io::Result<()> { + Ok(()) + } +} + #[cfg(test)] impl<'a> TerminatingWrite for &'a mut Vec { fn terminate_ref(&mut self, _a: AntiCallToken) -> io::Result<()> { - self.flush() + Ok(()) } } diff --git a/src/directory/spilling_writer.rs b/src/directory/spilling_writer.rs new file mode 100644 index 000000000..ed0c3a54c --- /dev/null +++ b/src/directory/spilling_writer.rs @@ -0,0 +1,180 @@ +use crate::directory::{WritePtr, TerminatingWrite}; +use std::io::{self, Write}; + +enum SpillingState { + Buffer { + buffer: Vec, + capacity: usize, + write_factory: Box io::Result>, + }, + Spilled(WritePtr), +} + +impl SpillingState { + + fn new( + limit: usize, + write_factory: Box io::Result>, + ) -> SpillingState { + SpillingState::Buffer { + buffer: Vec::with_capacity(limit), + capacity: limit, + write_factory, + } + } + + fn reserve(self, extra_capacity: usize) -> io::Result { + match self { + SpillingState::Buffer { + buffer, + capacity, + write_factory, + } => { + if capacity >= extra_capacity { + Ok(SpillingState::Buffer { + buffer, + capacity: capacity - extra_capacity, + write_factory, + }) + } else { + let mut wrt = write_factory()?; + wrt.write_all(&buffer[..])?; + Ok(SpillingState::Spilled(wrt)) + } + } + SpillingState::Spilled(wrt) => Ok(SpillingState::Spilled(wrt)), + } + } +} + +pub struct SpillingWriter { + state: Option, +} + +impl SpillingWriter { + pub fn new( + limit: usize, + write_factory: Box io::Result>, + ) -> SpillingWriter { + let state = SpillingState::new(limit, write_factory); + SpillingWriter { + state: Some(state) + } + } + + pub fn flush_and_finalize(self) -> io::Result<()> { + if let SpillingState::Buffer { + buffer, + write_factory, + .. + } = self.state.expect("State cannot be none") { + let mut wrt = write_factory()?; + wrt.write_all(&buffer[..])?; + wrt.flush()?; + wrt.terminate()?; + } + Ok(()) + } + + pub fn finalize(self) -> io::Result { + match self.state.expect("state cannot be None") { + SpillingState::Spilled(mut wrt) => { + wrt.flush()?; + Ok(SpillingResult::Spilled) + } + SpillingState::Buffer { buffer, .. } => Ok(SpillingResult::Buffer(buffer)), + + } + } +} + +pub enum SpillingResult { + Spilled, + Buffer(Vec), +} + +impl io::Write for SpillingWriter { + fn write(&mut self, payload: &[u8]) -> io::Result { + self.write_all(payload)?; + Ok(payload.len()) + } + + fn flush(&mut self) -> io::Result<()> { + if let Some(SpillingState::Spilled(wrt)) = &mut self.state { + wrt.flush()?; + } + Ok(()) + } + + fn write_all(&mut self, payload: &[u8]) -> io::Result<()> { + let state_opt: Option> = self.state + .take() + .map(|mut state| { + state = state.reserve(payload.len())?; + match &mut state { + SpillingState::Buffer { buffer, .. } => { + buffer.extend_from_slice(payload); + } + SpillingState::Spilled(wrt) => { + wrt.write_all(payload)?; + } + } + Ok(state) + }); + self.state = state_opt.transpose()?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::SpillingWriter; + use crate::directory::spilling_writer::SpillingResult; + use crate::directory::RAMDirectory; + use crate::Directory; + use std::io::{self, Write}; + use std::path::Path; + + #[test] + fn test_no_spilling() { + let ram_directory = RAMDirectory::create(); + let mut ram_directory_clone = ram_directory.clone(); + let path = Path::new("test"); + let write_factory = Box::new(move || { + ram_directory_clone + .open_write(path) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err)) + }); + let mut spilling_wrt = SpillingWriter::new(10, write_factory); + assert!(spilling_wrt.write_all(b"abcd").is_ok()); + if let SpillingResult::Buffer(buf) = spilling_wrt.finalize().unwrap() { + assert_eq!(buf, b"abcd") + } else { + panic!("spill writer should not have spilled"); + } + assert!(!ram_directory.exists(path)); + } + + #[test] + fn test_spilling() { + let ram_directory = RAMDirectory::create(); + let mut ram_directory_clone = ram_directory.clone(); + let path = Path::new("test"); + let write_factory = Box::new(move || { + ram_directory_clone + .open_write(path) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err)) + }); + let mut spilling_wrt = SpillingWriter::new(10, write_factory); + assert!(spilling_wrt.write_all(b"abcd").is_ok()); + assert!(spilling_wrt.write_all(b"efghijklmnop").is_ok()); + if let SpillingResult::Spilled = spilling_wrt.finalize().unwrap() { + } else { + panic!("spill writer should have spilled"); + } + assert_eq!( + ram_directory.atomic_read(path).unwrap(), + b"abcdefghijklmnop" + ); + } +} diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 5a9b1daeb..560cb965d 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -2,6 +2,7 @@ use crate::common::MAX_DOC_LIMIT; use crate::core::Segment; use crate::core::SegmentReader; use crate::core::SerializableSegment; +use crate::directory::WritePtr; use crate::docset::DocSet; use crate::fastfield::BytesFastFieldReader; use crate::fastfield::DeleteBitSet; @@ -661,7 +662,8 @@ impl IndexMerger { Ok(term_ordinal_mappings) } - fn write_storable_fields(&self, store_writer: &mut StoreWriter) -> crate::Result<()> { + pub fn write_storable_fields(&self, store_wrt: WritePtr) -> crate::Result<()> { + let mut store_writer = StoreWriter::new(store_wrt); for reader in &self.readers { let store_reader = reader.get_store_reader(); if reader.num_deleted_docs() > 0 { @@ -673,6 +675,7 @@ impl IndexMerger { store_writer.stack(&store_reader)?; } } + store_writer.close()?; Ok(()) } } @@ -682,7 +685,6 @@ impl SerializableSegment for IndexMerger { let term_ord_mappings = self.write_postings(serializer.get_postings_serializer())?; self.write_fieldnorms(serializer.get_fieldnorms_serializer())?; self.write_fast_fields(serializer.get_fast_field_serializer(), term_ord_mappings)?; - self.write_storable_fields(serializer.get_store_writer())?; serializer.close()?; Ok(self.max_doc) } diff --git a/src/indexer/segment_serializer.rs b/src/indexer/segment_serializer.rs index d259b6bb5..45e94bfb8 100644 --- a/src/indexer/segment_serializer.rs +++ b/src/indexer/segment_serializer.rs @@ -3,12 +3,10 @@ use crate::core::SegmentComponent; use crate::fastfield::FastFieldSerializer; use crate::fieldnorm::FieldNormsSerializer; use crate::postings::InvertedIndexSerializer; -use crate::store::StoreWriter; /// Segment serializer is in charge of laying out on disk /// the data accumulated and sorted by the `SegmentWriter`. pub struct SegmentSerializer { - store_writer: StoreWriter, fast_field_serializer: FastFieldSerializer, fieldnorms_serializer: FieldNormsSerializer, postings_serializer: InvertedIndexSerializer, @@ -17,8 +15,6 @@ pub struct SegmentSerializer { impl SegmentSerializer { /// Creates a new `SegmentSerializer`. pub fn for_segment(segment: &mut Segment) -> crate::Result { - let store_write = segment.open_write(SegmentComponent::STORE)?; - let fast_field_write = segment.open_write(SegmentComponent::FASTFIELDS)?; let fast_field_serializer = FastFieldSerializer::from_write(fast_field_write)?; @@ -27,7 +23,6 @@ impl SegmentSerializer { let postings_serializer = InvertedIndexSerializer::open(segment)?; Ok(SegmentSerializer { - store_writer: StoreWriter::new(store_write), fast_field_serializer, fieldnorms_serializer, postings_serializer, @@ -49,16 +44,10 @@ impl SegmentSerializer { &mut self.fieldnorms_serializer } - /// Accessor to the `StoreWriter`. - pub fn get_store_writer(&mut self) -> &mut StoreWriter { - &mut self.store_writer - } - /// Finalize the segment serialization. pub fn close(self) -> crate::Result<()> { self.fast_field_serializer.close()?; self.postings_serializer.close()?; - self.store_writer.close()?; self.fieldnorms_serializer.close()?; Ok(()) } diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index 69607a0ce..5d3c09562 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -18,7 +18,7 @@ use crate::indexer::SegmentSerializer; use crate::indexer::{DefaultMergePolicy, MergePolicy}; use crate::indexer::{MergeCandidate, MergeOperation}; use crate::schema::Schema; -use crate::Opstamp; +use crate::{Opstamp, SegmentComponent}; use futures::channel::oneshot; use futures::executor::{ThreadPool, ThreadPoolBuilder}; use futures::future::Future; @@ -134,8 +134,10 @@ fn merge( // ... we just serialize this index merger in our new segment to merge the two segments. let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment)?; - let num_docs = merger.write(segment_serializer)?; + let store_wrt = merged_segment.open_write(SegmentComponent::STORE)?; + merger.write_storable_fields(store_wrt)?; + let num_docs = merger.write(segment_serializer)?; let segment_meta = index.new_segment_meta(merged_segment.id(), num_docs); Ok(SegmentEntry::new(segment_meta, delete_cursor, None)) diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index 4b2b1f35d..a7afc70b2 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -11,13 +11,15 @@ use crate::schema::Schema; use crate::schema::Term; use crate::schema::Value; use crate::schema::{Field, FieldEntry}; +use crate::store::StoreWriter; use crate::tokenizer::{BoxTokenStream, PreTokenizedStream}; use crate::tokenizer::{FacetTokenizer, TextAnalyzer}; use crate::tokenizer::{TokenStreamChain, Tokenizer}; -use crate::DocId; use crate::Opstamp; +use crate::{DocId, SegmentComponent}; use std::io; use std::str; +use crate::directory::SpillingWriter; /// Computes the initial size of the hash table. /// @@ -43,11 +45,12 @@ fn initial_table_size(per_thread_memory_budget: usize) -> crate::Result { pub struct SegmentWriter { max_doc: DocId, multifield_postings: MultiFieldPostingsWriter, - segment_serializer: SegmentSerializer, + segment: Segment, fast_field_writers: FastFieldsWriter, fieldnorms_writer: FieldNormsWriter, doc_opstamps: Vec, tokenizers: Vec>, + store_writer: StoreWriter, } impl SegmentWriter { @@ -62,11 +65,10 @@ impl SegmentWriter { /// - schema pub fn for_segment( memory_budget: usize, - mut segment: Segment, + segment: Segment, schema: &Schema, ) -> crate::Result { let table_num_bits = initial_table_size(memory_budget)?; - let segment_serializer = SegmentSerializer::for_segment(&mut segment)?; let multifield_postings = MultiFieldPostingsWriter::new(schema, table_num_bits); let tokenizers = schema .fields() @@ -82,14 +84,22 @@ impl SegmentWriter { }, ) .collect(); + let mut segment_clone = segment.clone(); + let spilling_wrt = SpillingWriter::new(1_000, Box::new(move || { + segment_clone + .open_write(SegmentComponent::STORE) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err)) + })); + let store_writer = StoreWriter::new(spilling_wrt); Ok(SegmentWriter { max_doc: 0, multifield_postings, fieldnorms_writer: FieldNormsWriter::for_schema(schema), - segment_serializer, + segment, fast_field_writers: FastFieldsWriter::from_schema(schema), doc_opstamps: Vec::with_capacity(1_000), tokenizers, + store_writer, }) } @@ -99,11 +109,14 @@ impl SegmentWriter { /// be used afterwards. pub fn finalize(mut self) -> crate::Result> { self.fieldnorms_writer.fill_up_to_max_doc(self.max_doc); + let spilling_wrt = self.store_writer.close()?; + spilling_wrt.flush_and_finalize()?; + let segment_serializer = SegmentSerializer::for_segment(&mut self.segment)?; write( &self.multifield_postings, &self.fast_field_writers, &self.fieldnorms_writer, - self.segment_serializer, + segment_serializer, )?; Ok(self.doc_opstamps) } @@ -246,8 +259,7 @@ impl SegmentWriter { } doc.filter_fields(|field| schema.get_field_entry(field).is_stored()); doc.prepare_for_store(); - let doc_writer = self.segment_serializer.get_store_writer(); - doc_writer.store(&doc)?; + self.store_writer.store(&doc)?; self.max_doc += 1; Ok(()) } diff --git a/src/store/writer.rs b/src/store/writer.rs index 5ddda2c7f..c74a16ad9 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -3,8 +3,6 @@ use super::skiplist::SkipListBuilder; use super::StoreReader; use crate::common::CountingWriter; use crate::common::{BinarySerializable, VInt}; -use crate::directory::TerminatingWrite; -use crate::directory::WritePtr; use crate::schema::Document; use crate::DocId; use std::io::{self, Write}; @@ -19,20 +17,20 @@ const BLOCK_SIZE: usize = 16_384; /// /// The skip list index on the other hand, is built in memory. /// -pub struct StoreWriter { +pub struct StoreWriter { doc: DocId, offset_index_writer: SkipListBuilder, - writer: CountingWriter, + writer: CountingWriter, intermediary_buffer: Vec, current_block: Vec, } -impl StoreWriter { +impl StoreWriter { /// Create a store writer. /// /// The store writer will writes blocks on disc as /// document are added. - pub fn new(writer: WritePtr) -> StoreWriter { + pub fn new(writer: W) -> StoreWriter { StoreWriter { doc: 0, offset_index_writer: SkipListBuilder::new(4), @@ -102,7 +100,7 @@ impl StoreWriter { /// /// Compress the last unfinished block if any, /// and serializes the skip list index on disc. - pub fn close(mut self) -> io::Result<()> { + pub fn close(mut self) -> io::Result { if !self.current_block.is_empty() { self.write_and_compress_block()?; } @@ -110,6 +108,9 @@ impl StoreWriter { self.offset_index_writer.write(&mut self.writer)?; header_offset.serialize(&mut self.writer)?; self.doc.serialize(&mut self.writer)?; - self.writer.terminate() + self.writer.flush()?; + let (wrt, _) = self.writer.finish()?; + Ok(wrt) + } }