Merge pull request #1389 from quickwit-oss/doc_writer_thread

use separate thread to compress block store
This commit is contained in:
PSeitz
2022-06-23 16:17:41 +08:00
committed by GitHub
8 changed files with 179 additions and 77 deletions

View File

@@ -62,7 +62,7 @@ impl<W: TerminatingWrite> TerminatingWrite for CountingWriter<W> {
pub struct AntiCallToken(());
/// Trait used to indicate when no more write need to be done on a writer
pub trait TerminatingWrite: Write {
pub trait TerminatingWrite: Write + Send {
/// Indicate that the writer will no longer be used. Internally call terminate_ref.
fn terminate(mut self) -> io::Result<()>
where Self: Sized {

View File

@@ -1081,7 +1081,7 @@ impl IndexMerger {
store_writer.store_bytes(&doc_bytes)?;
}
} else {
store_writer.stack(&store_reader)?;
store_writer.stack(store_reader)?;
}
}
}

View File

@@ -42,7 +42,7 @@ impl SegmentSerializer {
let blocksize = segment.index().settings().docstore_blocksize;
Ok(SegmentSerializer {
segment,
store_writer: StoreWriter::new(store_write, compressor, blocksize),
store_writer: StoreWriter::new(store_write, compressor, blocksize)?,
fast_field_serializer,
fieldnorms_serializer: Some(fieldnorms_serializer),
postings_serializer,

View File

@@ -382,7 +382,7 @@ fn remap_and_write(
let block_size = serializer.segment().index().settings().docstore_blocksize;
let old_store_writer = std::mem::replace(
&mut serializer.store_writer,
StoreWriter::new(store_write, compressor, block_size),
StoreWriter::new(store_write, compressor, block_size)?,
);
old_store_writer.close()?;
let store_read = StoreReader::open(

View File

@@ -54,7 +54,7 @@ mod tests {
fn test_skip_index_empty() -> io::Result<()> {
let mut output: Vec<u8> = Vec::new();
let skip_index_builder: SkipIndexBuilder = SkipIndexBuilder::new();
skip_index_builder.write(&mut output)?;
skip_index_builder.serialize_into(&mut output)?;
let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output));
let mut skip_cursor = skip_index.checkpoints();
assert!(skip_cursor.next().is_none());
@@ -70,7 +70,7 @@ mod tests {
byte_range: 0..3,
};
skip_index_builder.insert(checkpoint.clone());
skip_index_builder.write(&mut output)?;
skip_index_builder.serialize_into(&mut output)?;
let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output));
let mut skip_cursor = skip_index.checkpoints();
assert_eq!(skip_cursor.next(), Some(checkpoint));
@@ -108,7 +108,7 @@ mod tests {
for checkpoint in &checkpoints {
skip_index_builder.insert(checkpoint.clone());
}
skip_index_builder.write(&mut output)?;
skip_index_builder.serialize_into(&mut output)?;
let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output));
assert_eq!(
@@ -167,7 +167,7 @@ mod tests {
for checkpoint in &checkpoints {
skip_index_builder.insert(checkpoint.clone());
}
skip_index_builder.write(&mut output)?;
skip_index_builder.serialize_into(&mut output)?;
assert_eq!(output.len(), 4035);
let resulting_checkpoints: Vec<Checkpoint> = SkipIndex::open(OwnedBytes::new(output))
.checkpoints()
@@ -238,7 +238,7 @@ mod tests {
skip_index_builder.insert(checkpoint);
}
let mut buffer = Vec::new();
skip_index_builder.write(&mut buffer).unwrap();
skip_index_builder.serialize_into(&mut buffer).unwrap();
let skip_index = SkipIndex::open(OwnedBytes::new(buffer));
let iter_checkpoints: Vec<Checkpoint> = skip_index.checkpoints().collect();
assert_eq!(&checkpoints[..], &iter_checkpoints[..]);

View File

@@ -87,7 +87,7 @@ impl SkipIndexBuilder {
}
}
pub fn write<W: Write>(mut self, output: &mut W) -> io::Result<()> {
pub fn serialize_into<W: Write>(mut self, output: &mut W) -> io::Result<()> {
let mut last_pointer = None;
for skip_layer in self.layers.iter_mut() {
if let Some(checkpoint) = last_pointer {

View File

@@ -88,7 +88,7 @@ pub mod tests {
schema_builder.add_text_field("title", TextOptions::default().set_stored());
let schema = schema_builder.build();
{
let mut store_writer = StoreWriter::new(writer, compressor, blocksize);
let mut store_writer = StoreWriter::new(writer, compressor, blocksize).unwrap();
for i in 0..num_docs {
let mut doc = Document::default();
doc.add_field_value(field_body, LOREM.to_string());

View File

@@ -1,4 +1,6 @@
use std::io::{self, Write};
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use std::thread::{self, JoinHandle};
use common::{BinarySerializable, CountingWriter, VInt};
@@ -21,12 +23,19 @@ use crate::DocId;
pub struct StoreWriter {
compressor: Compressor,
block_size: usize,
doc: DocId,
first_doc_in_block: DocId,
offset_index_writer: SkipIndexBuilder,
writer: CountingWriter<WritePtr>,
num_docs_in_current_block: DocId,
intermediary_buffer: Vec<u8>,
current_block: Vec<u8>,
// the channel to send data to the compressor thread.
compressor_sender: SyncSender<BlockCompressorMessage>,
// the handle to check for errors on the thread
compressor_thread_handle: JoinHandle<io::Result<()>>,
}
enum BlockCompressorMessage {
AddBlock(DocumentBlock),
Stack(StoreReader),
}
impl StoreWriter {
@@ -34,17 +43,43 @@ impl StoreWriter {
///
/// The store writer will writes blocks on disc as
/// document are added.
pub fn new(writer: WritePtr, compressor: Compressor, block_size: usize) -> StoreWriter {
StoreWriter {
pub fn new(
writer: WritePtr,
compressor: Compressor,
block_size: usize,
) -> io::Result<StoreWriter> {
let thread_builder = thread::Builder::new().name("docstore-compressor-thread".to_string());
// Channel to send uncompressed data to compressor channel
let (block_sender, block_receiver): (
SyncSender<BlockCompressorMessage>,
Receiver<BlockCompressorMessage>,
) = sync_channel(3);
let thread_join_handle = thread_builder.spawn(move || {
let mut block_compressor = BlockCompressor::new(compressor, writer);
while let Ok(packet) = block_receiver.recv() {
match packet {
BlockCompressorMessage::AddBlock(block) => {
block_compressor.compress_block_and_write(block)?;
}
BlockCompressorMessage::Stack(store_reader) => {
block_compressor.stack(store_reader)?;
}
}
}
block_compressor.close()?;
Ok(())
})?;
Ok(StoreWriter {
compressor,
block_size,
doc: 0,
first_doc_in_block: 0,
offset_index_writer: SkipIndexBuilder::new(),
writer: CountingWriter::wrap(writer),
num_docs_in_current_block: 0,
intermediary_buffer: Vec::new(),
current_block: Vec::new(),
}
compressor_sender: block_sender,
compressor_thread_handle: thread_join_handle,
})
}
pub(crate) fn compressor(&self) -> Compressor {
@@ -56,21 +91,33 @@ impl StoreWriter {
self.intermediary_buffer.capacity() + self.current_block.capacity()
}
/// Store bytes of a serialized document.
///
/// The document id is implicitely the current number
/// of documents.
pub fn store_bytes(&mut self, serialized_document: &[u8]) -> io::Result<()> {
let doc_num_bytes = serialized_document.len();
VInt(doc_num_bytes as u64).serialize(&mut self.current_block)?;
self.current_block.write_all(serialized_document)?;
self.doc += 1;
/// Checks if the current block is full, and if so, compresses and flushes it.
fn check_flush_block(&mut self) -> io::Result<()> {
if self.current_block.len() > self.block_size {
self.write_and_compress_block()?;
self.send_current_block_to_compressor()?;
}
Ok(())
}
/// Flushes current uncompressed block and sends to compressor.
fn send_current_block_to_compressor(&mut self) -> io::Result<()> {
// We don't do anything if the current block is empty to begin with.
if self.current_block.is_empty() {
return Ok(());
}
let block = DocumentBlock {
data: self.current_block.to_owned(),
num_docs_in_block: self.num_docs_in_current_block,
};
self.current_block.clear();
self.num_docs_in_current_block = 0;
self.compressor_sender
.send(BlockCompressorMessage::AddBlock(block))
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
Ok(())
}
/// Store a new document.
///
/// The document id is implicitely the current number
@@ -82,13 +129,24 @@ impl StoreWriter {
// intermediary_buffer due to the borrow checker
// a new buffer costs ~1% indexing performance
let doc_num_bytes = self.intermediary_buffer.len();
VInt(doc_num_bytes as u64).serialize(&mut self.current_block)?;
VInt(doc_num_bytes as u64).serialize_into_vec(&mut self.current_block);
self.current_block
.write_all(&self.intermediary_buffer[..])?;
self.doc += 1;
if self.current_block.len() > self.block_size {
self.write_and_compress_block()?;
}
self.num_docs_in_current_block += 1;
self.check_flush_block()?;
Ok(())
}
/// Store bytes of a serialized document.
///
/// The document id is implicitely the current number
/// of documents.
pub fn store_bytes(&mut self, serialized_document: &[u8]) -> io::Result<()> {
let doc_num_bytes = serialized_document.len();
VInt(doc_num_bytes as u64).serialize_into_vec(&mut self.current_block);
self.current_block.extend_from_slice(serialized_document);
self.num_docs_in_current_block += 1;
self.check_flush_block()?;
Ok(())
}
@@ -96,12 +154,85 @@ impl StoreWriter {
/// This method is an optimization compared to iterating over the documents
/// in the store and adding them one by one, as the store's data will
/// not be decompressed and then recompressed.
pub fn stack(&mut self, store_reader: &StoreReader) -> io::Result<()> {
if !self.current_block.is_empty() {
self.write_and_compress_block()?;
pub fn stack(&mut self, store_reader: StoreReader) -> io::Result<()> {
// We flush the current block first before stacking
self.send_current_block_to_compressor()?;
self.compressor_sender
.send(BlockCompressorMessage::Stack(store_reader))
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
Ok(())
}
/// Finalized the store writer.
///
/// Compress the last unfinished block if any,
/// and serializes the skip list index on disc.
pub fn close(mut self) -> io::Result<()> {
self.send_current_block_to_compressor()?;
drop(self.compressor_sender);
self.compressor_thread_handle
.join()
.map_err(|err| io::Error::new(io::ErrorKind::Other, format!("{:?}", err)))??;
Ok(())
}
}
/// BlockCompressor is separated from StoreWriter, to be run in an own thread
pub struct BlockCompressor {
compressor: Compressor,
first_doc_in_block: DocId,
offset_index_writer: SkipIndexBuilder,
intermediary_buffer: Vec<u8>,
writer: CountingWriter<WritePtr>,
}
struct DocumentBlock {
data: Vec<u8>,
num_docs_in_block: DocId,
}
impl BlockCompressor {
fn new(compressor: Compressor, writer: WritePtr) -> Self {
Self {
compressor,
first_doc_in_block: 0,
offset_index_writer: SkipIndexBuilder::new(),
intermediary_buffer: Vec::new(),
writer: CountingWriter::wrap(writer),
}
assert_eq!(self.first_doc_in_block, self.doc);
let doc_shift = self.doc;
}
fn compress_block_and_write(&mut self, block: DocumentBlock) -> io::Result<()> {
assert!(block.num_docs_in_block > 0);
self.intermediary_buffer.clear();
self.compressor
.compress_into(&block.data[..], &mut self.intermediary_buffer)?;
let start_offset = self.writer.written_bytes() as usize;
self.writer.write_all(&self.intermediary_buffer)?;
let end_offset = self.writer.written_bytes() as usize;
self.register_checkpoint(Checkpoint {
doc_range: self.first_doc_in_block..self.first_doc_in_block + block.num_docs_in_block,
byte_range: start_offset..end_offset,
});
Ok(())
}
fn register_checkpoint(&mut self, checkpoint: Checkpoint) {
self.offset_index_writer.insert(checkpoint.clone());
self.first_doc_in_block = checkpoint.doc_range.end;
}
/// Stacks a store reader on top of the documents written so far.
/// This method is an optimization compared to iterating over the documents
/// in the store and adding them one by one, as the store's data will
/// not be decompressed and then recompressed.
fn stack(&mut self, store_reader: StoreReader) -> io::Result<()> {
let doc_shift = self.first_doc_in_block;
let start_shift = self.writer.written_bytes() as usize;
// just bulk write all of the block of the given reader.
@@ -119,42 +250,13 @@ impl StoreWriter {
}
Ok(())
}
fn register_checkpoint(&mut self, checkpoint: Checkpoint) {
self.offset_index_writer.insert(checkpoint.clone());
self.first_doc_in_block = checkpoint.doc_range.end;
self.doc = checkpoint.doc_range.end;
}
fn write_and_compress_block(&mut self) -> io::Result<()> {
assert!(self.doc > 0);
self.intermediary_buffer.clear();
self.compressor
.compress_into(&self.current_block[..], &mut self.intermediary_buffer)?;
let start_offset = self.writer.written_bytes() as usize;
self.writer.write_all(&self.intermediary_buffer)?;
let end_offset = self.writer.written_bytes() as usize;
let end_doc = self.doc;
self.register_checkpoint(Checkpoint {
doc_range: self.first_doc_in_block..end_doc,
byte_range: start_offset..end_offset,
});
self.current_block.clear();
Ok(())
}
/// Finalized the store writer.
///
/// Compress the last unfinished block if any,
/// and serializes the skip list index on disc.
pub fn close(mut self) -> io::Result<()> {
if !self.current_block.is_empty() {
self.write_and_compress_block()?;
}
fn close(mut self) -> io::Result<()> {
let header_offset: u64 = self.writer.written_bytes() as u64;
let footer = DocStoreFooter::new(header_offset, Decompressor::from(self.compressor));
self.offset_index_writer.write(&mut self.writer)?;
footer.serialize(&mut self.writer)?;
let docstore_footer =
DocStoreFooter::new(header_offset, Decompressor::from(self.compressor));
self.offset_index_writer.serialize_into(&mut self.writer)?;
docstore_footer.serialize(&mut self.writer)?;
self.writer.terminate()
}
}