diff --git a/src/store/writer.rs b/src/store/writer.rs index cd80cd908..244bf72fc 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -29,14 +29,13 @@ pub struct StoreWriter { // the channel to send data to the compressor thread. compressor_sender: SyncSender, - // the handle to check for errors on the thread compressor_thread_handle: JoinHandle>, } enum BlockCompressorMessage { AddBlock(DocumentBlock), - Stack((StoreReader, DocumentBlock)), + Stack(StoreReader), } impl StoreWriter { @@ -61,10 +60,10 @@ impl StoreWriter { while let Ok(packet) = block_receiver.recv() { match packet { BlockCompressorMessage::AddBlock(block) => { - block_compressor.compress_block(block)?; + block_compressor.compress_block_and_write(block)?; } - BlockCompressorMessage::Stack((store_reader, block)) => { - block_compressor.stack(block, store_reader)?; + BlockCompressorMessage::Stack(store_reader) => { + block_compressor.stack(store_reader)?; } } } @@ -95,22 +94,25 @@ impl StoreWriter { /// Checks if the current block is full, and if so, compresses and flushes it. fn check_flush_block(&mut self) -> io::Result<()> { if self.current_block.len() > self.block_size { - let block = self.get_current_block(); - self.compressor_sender - .send(BlockCompressorMessage::AddBlock(block)) - .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; + self.send_current_block_to_compressor()?; } Ok(()) } - fn get_current_block(&mut self) -> DocumentBlock { + /// Flushes current uncompressed block and sends to compressor. + fn send_current_block_to_compressor(&mut self) -> io::Result<()> { let block = DocumentBlock { data: self.current_block.to_owned(), num_docs_in_block: self.num_docs_in_current_block, }; self.current_block.clear(); self.num_docs_in_current_block = 0; - block + if !block.is_empty() { + self.compressor_sender + .send(BlockCompressorMessage::AddBlock(block)) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; + } + Ok(()) } /// Store a new document. @@ -150,10 +152,10 @@ impl StoreWriter { /// in the store and adding them one by one, as the store's data will /// not be decompressed and then recompressed. pub fn stack(&mut self, store_reader: StoreReader) -> io::Result<()> { - self.check_flush_block()?; - let block = self.get_current_block(); + // We flush the current block first before stacking + self.send_current_block_to_compressor()?; self.compressor_sender - .send(BlockCompressorMessage::Stack((store_reader, block))) + .send(BlockCompressorMessage::Stack(store_reader)) .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; Ok(()) @@ -164,12 +166,7 @@ impl StoreWriter { /// Compress the last unfinished block if any, /// and serializes the skip list index on disc. pub fn close(mut self) -> io::Result<()> { - let block = self.get_current_block(); - if !block.is_empty() { - self.compressor_sender - .send(BlockCompressorMessage::AddBlock(block)) - .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; - } + self.send_current_block_to_compressor()?; drop(self.compressor_sender); self.compressor_thread_handle @@ -183,7 +180,7 @@ impl StoreWriter { /// BlockCompressor is separated from StoreWriter, to be run in an own thread pub struct BlockCompressor { compressor: Compressor, - doc: DocId, + first_doc_in_block: DocId, offset_index_writer: SkipIndexBuilder, intermediary_buffer: Vec, writer: CountingWriter, @@ -204,14 +201,14 @@ impl BlockCompressor { fn new(compressor: Compressor, writer: WritePtr) -> Self { Self { compressor, - doc: 0, + first_doc_in_block: 0, offset_index_writer: SkipIndexBuilder::new(), intermediary_buffer: Vec::new(), writer: CountingWriter::wrap(writer), } } - fn compress_block(&mut self, block: DocumentBlock) -> io::Result<()> { + fn compress_block_and_write(&mut self, block: DocumentBlock) -> io::Result<()> { assert!(block.num_docs_in_block > 0); self.intermediary_buffer.clear(); self.compressor @@ -222,7 +219,7 @@ impl BlockCompressor { let end_offset = self.writer.written_bytes() as usize; self.register_checkpoint(Checkpoint { - doc_range: self.doc..self.doc + block.num_docs_in_block, + doc_range: self.first_doc_in_block..self.first_doc_in_block + block.num_docs_in_block, byte_range: start_offset..end_offset, }); Ok(()) @@ -230,18 +227,15 @@ impl BlockCompressor { fn register_checkpoint(&mut self, checkpoint: Checkpoint) { self.offset_index_writer.insert(checkpoint.clone()); - self.doc = checkpoint.doc_range.end; + self.first_doc_in_block = checkpoint.doc_range.end; } /// Stacks a store reader on top of the documents written so far. /// This method is an optimization compared to iterating over the documents /// in the store and adding them one by one, as the store's data will /// not be decompressed and then recompressed. - fn stack(&mut self, block: DocumentBlock, store_reader: StoreReader) -> io::Result<()> { - if !block.is_empty() { - self.compress_block(block)?; - } - let doc_shift = self.doc; + fn stack(&mut self, store_reader: StoreReader) -> io::Result<()> { + let doc_shift = self.first_doc_in_block; let start_shift = self.writer.written_bytes() as usize; // just bulk write all of the block of the given reader.