renames and refactoring

This commit is contained in:
Pascal Seitz
2022-06-21 16:10:48 +08:00
parent 79e42d4a6d
commit 0bc6b4a117

View File

@@ -29,14 +29,13 @@ pub struct StoreWriter {
// the channel to send data to the compressor thread.
compressor_sender: SyncSender<BlockCompressorMessage>,
// the handle to check for errors on the thread
compressor_thread_handle: JoinHandle<io::Result<()>>,
}
enum BlockCompressorMessage {
AddBlock(DocumentBlock),
Stack((StoreReader, DocumentBlock)),
Stack(StoreReader),
}
impl StoreWriter {
@@ -61,10 +60,10 @@ impl StoreWriter {
while let Ok(packet) = block_receiver.recv() {
match packet {
BlockCompressorMessage::AddBlock(block) => {
block_compressor.compress_block(block)?;
block_compressor.compress_block_and_write(block)?;
}
BlockCompressorMessage::Stack((store_reader, block)) => {
block_compressor.stack(block, store_reader)?;
BlockCompressorMessage::Stack(store_reader) => {
block_compressor.stack(store_reader)?;
}
}
}
@@ -95,22 +94,25 @@ impl StoreWriter {
/// Checks if the current block is full, and if so, compresses and flushes it.
fn check_flush_block(&mut self) -> io::Result<()> {
if self.current_block.len() > self.block_size {
let block = self.get_current_block();
self.compressor_sender
.send(BlockCompressorMessage::AddBlock(block))
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
self.send_current_block_to_compressor()?;
}
Ok(())
}
fn get_current_block(&mut self) -> DocumentBlock {
/// Flushes current uncompressed block and sends to compressor.
fn send_current_block_to_compressor(&mut self) -> io::Result<()> {
let block = DocumentBlock {
data: self.current_block.to_owned(),
num_docs_in_block: self.num_docs_in_current_block,
};
self.current_block.clear();
self.num_docs_in_current_block = 0;
block
if !block.is_empty() {
self.compressor_sender
.send(BlockCompressorMessage::AddBlock(block))
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
}
Ok(())
}
/// Store a new document.
@@ -150,10 +152,10 @@ impl StoreWriter {
/// in the store and adding them one by one, as the store's data will
/// not be decompressed and then recompressed.
pub fn stack(&mut self, store_reader: StoreReader) -> io::Result<()> {
self.check_flush_block()?;
let block = self.get_current_block();
// We flush the current block first before stacking
self.send_current_block_to_compressor()?;
self.compressor_sender
.send(BlockCompressorMessage::Stack((store_reader, block)))
.send(BlockCompressorMessage::Stack(store_reader))
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
Ok(())
@@ -164,12 +166,7 @@ impl StoreWriter {
/// Compress the last unfinished block if any,
/// and serializes the skip list index on disc.
pub fn close(mut self) -> io::Result<()> {
let block = self.get_current_block();
if !block.is_empty() {
self.compressor_sender
.send(BlockCompressorMessage::AddBlock(block))
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
}
self.send_current_block_to_compressor()?;
drop(self.compressor_sender);
self.compressor_thread_handle
@@ -183,7 +180,7 @@ impl StoreWriter {
/// BlockCompressor is separated from StoreWriter, to be run in an own thread
pub struct BlockCompressor {
compressor: Compressor,
doc: DocId,
first_doc_in_block: DocId,
offset_index_writer: SkipIndexBuilder,
intermediary_buffer: Vec<u8>,
writer: CountingWriter<WritePtr>,
@@ -204,14 +201,14 @@ impl BlockCompressor {
fn new(compressor: Compressor, writer: WritePtr) -> Self {
Self {
compressor,
doc: 0,
first_doc_in_block: 0,
offset_index_writer: SkipIndexBuilder::new(),
intermediary_buffer: Vec::new(),
writer: CountingWriter::wrap(writer),
}
}
fn compress_block(&mut self, block: DocumentBlock) -> io::Result<()> {
fn compress_block_and_write(&mut self, block: DocumentBlock) -> io::Result<()> {
assert!(block.num_docs_in_block > 0);
self.intermediary_buffer.clear();
self.compressor
@@ -222,7 +219,7 @@ impl BlockCompressor {
let end_offset = self.writer.written_bytes() as usize;
self.register_checkpoint(Checkpoint {
doc_range: self.doc..self.doc + block.num_docs_in_block,
doc_range: self.first_doc_in_block..self.first_doc_in_block + block.num_docs_in_block,
byte_range: start_offset..end_offset,
});
Ok(())
@@ -230,18 +227,15 @@ impl BlockCompressor {
fn register_checkpoint(&mut self, checkpoint: Checkpoint) {
self.offset_index_writer.insert(checkpoint.clone());
self.doc = checkpoint.doc_range.end;
self.first_doc_in_block = checkpoint.doc_range.end;
}
/// Stacks a store reader on top of the documents written so far.
/// This method is an optimization compared to iterating over the documents
/// in the store and adding them one by one, as the store's data will
/// not be decompressed and then recompressed.
fn stack(&mut self, block: DocumentBlock, store_reader: StoreReader) -> io::Result<()> {
if !block.is_empty() {
self.compress_block(block)?;
}
let doc_shift = self.doc;
fn stack(&mut self, store_reader: StoreReader) -> io::Result<()> {
let doc_shift = self.first_doc_in_block;
let start_shift = self.writer.written_bytes() as usize;
// just bulk write all of the block of the given reader.