use seperate thread to compress block store

Use seperate thread to compress block store for increased indexing performance. This allows to use slower compressors with higher compression ratio, with less or no perfomance impact (with enough cores).

A seperate thread is spawned to compress the docstore, which handles single blocks and stacking from other docstores.
The spawned compressor thread does not write, instead it sends back the compressed data. This is done in order to avoid writing multithreaded on the same file.
This commit is contained in:
Pascal Seitz
2022-06-16 13:06:52 +08:00
parent 93f356a7a7
commit 4c7dedef29
7 changed files with 237 additions and 78 deletions

View File

@@ -1081,7 +1081,7 @@ impl IndexMerger {
store_writer.store_bytes(&doc_bytes)?;
}
} else {
store_writer.stack(&store_reader)?;
store_writer.stack(store_reader)?;
}
}
}

View File

@@ -42,7 +42,7 @@ impl SegmentSerializer {
let blocksize = segment.index().settings().docstore_blocksize;
Ok(SegmentSerializer {
segment,
store_writer: StoreWriter::new(store_write, compressor, blocksize),
store_writer: StoreWriter::new(store_write, compressor, blocksize)?,
fast_field_serializer,
fieldnorms_serializer: Some(fieldnorms_serializer),
postings_serializer,

View File

@@ -382,7 +382,7 @@ fn remap_and_write(
let block_size = serializer.segment().index().settings().docstore_blocksize;
let old_store_writer = std::mem::replace(
&mut serializer.store_writer,
StoreWriter::new(store_write, compressor, block_size),
StoreWriter::new(store_write, compressor, block_size)?,
);
old_store_writer.close()?;
let store_read = StoreReader::open(

View File

@@ -54,7 +54,7 @@ mod tests {
fn test_skip_index_empty() -> io::Result<()> {
let mut output: Vec<u8> = Vec::new();
let skip_index_builder: SkipIndexBuilder = SkipIndexBuilder::new();
skip_index_builder.write(&mut output)?;
skip_index_builder.serialize_into(&mut output)?;
let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output));
let mut skip_cursor = skip_index.checkpoints();
assert!(skip_cursor.next().is_none());
@@ -70,7 +70,7 @@ mod tests {
byte_range: 0..3,
};
skip_index_builder.insert(checkpoint.clone());
skip_index_builder.write(&mut output)?;
skip_index_builder.serialize_into(&mut output)?;
let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output));
let mut skip_cursor = skip_index.checkpoints();
assert_eq!(skip_cursor.next(), Some(checkpoint));
@@ -108,7 +108,7 @@ mod tests {
for checkpoint in &checkpoints {
skip_index_builder.insert(checkpoint.clone());
}
skip_index_builder.write(&mut output)?;
skip_index_builder.serialize_into(&mut output)?;
let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output));
assert_eq!(
@@ -167,7 +167,7 @@ mod tests {
for checkpoint in &checkpoints {
skip_index_builder.insert(checkpoint.clone());
}
skip_index_builder.write(&mut output)?;
skip_index_builder.serialize_into(&mut output)?;
assert_eq!(output.len(), 4035);
let resulting_checkpoints: Vec<Checkpoint> = SkipIndex::open(OwnedBytes::new(output))
.checkpoints()
@@ -238,7 +238,7 @@ mod tests {
skip_index_builder.insert(checkpoint);
}
let mut buffer = Vec::new();
skip_index_builder.write(&mut buffer).unwrap();
skip_index_builder.serialize_into(&mut buffer).unwrap();
let skip_index = SkipIndex::open(OwnedBytes::new(buffer));
let iter_checkpoints: Vec<Checkpoint> = skip_index.checkpoints().collect();
assert_eq!(&checkpoints[..], &iter_checkpoints[..]);

View File

@@ -87,7 +87,7 @@ impl SkipIndexBuilder {
}
}
pub fn write<W: Write>(mut self, output: &mut W) -> io::Result<()> {
pub fn serialize_into<W: Write>(mut self, output: &mut W) -> io::Result<()> {
let mut last_pointer = None;
for skip_layer in self.layers.iter_mut() {
if let Some(checkpoint) = last_pointer {

View File

@@ -88,7 +88,7 @@ pub mod tests {
schema_builder.add_text_field("title", TextOptions::default().set_stored());
let schema = schema_builder.build();
{
let mut store_writer = StoreWriter::new(writer, compressor, blocksize);
let mut store_writer = StoreWriter::new(writer, compressor, blocksize).unwrap();
for i in 0..num_docs {
let mut doc = Document::default();
doc.add_field_value(field_body, LOREM.to_string());

View File

@@ -1,6 +1,9 @@
use std::io::{self, Write};
use std::sync::mpsc::{sync_channel, Receiver, SyncSender, TryRecvError};
use std::thread::{self, JoinHandle};
use common::{BinarySerializable, CountingWriter, VInt};
use common::{BinarySerializable, VInt};
use ownedbytes::OwnedBytes;
use super::compressors::Compressor;
use super::footer::DocStoreFooter;
@@ -21,12 +24,23 @@ use crate::DocId;
pub struct StoreWriter {
compressor: Compressor,
block_size: usize,
doc: DocId,
first_doc_in_block: DocId,
offset_index_writer: SkipIndexBuilder,
writer: CountingWriter<WritePtr>,
num_docs_in_current_block: DocId,
intermediary_buffer: Vec<u8>,
current_block: Vec<u8>,
writer: WritePtr,
// the channel to communicate with the compressor thread.
compressor_sender: SyncSender<BlockCompressorMessage>,
// the channel to receive data to write from the compressor thread.
data_receiver: Receiver<OwnedBytes>,
// the handle to check for errors on the thread
compressor_thread_handle: JoinHandle<io::Result<(DocStoreFooter, SkipIndexBuilder)>>,
}
enum BlockCompressorMessage {
AddBlock(DocumentBlock),
Stack((StoreReader, DocumentBlock)),
}
impl StoreWriter {
@@ -34,17 +48,43 @@ impl StoreWriter {
///
/// The store writer will writes blocks on disc as
/// document are added.
pub fn new(writer: WritePtr, compressor: Compressor, block_size: usize) -> StoreWriter {
StoreWriter {
pub fn new(
writer: WritePtr,
compressor: Compressor,
block_size: usize,
) -> io::Result<StoreWriter> {
let thread_builder = thread::Builder::new().name("docstore compressor thread".to_string());
// Data channel to send fs writes, to write only from current thread
let (data_sender, data_receiver) = sync_channel(3);
// Channel to send uncompressed data to compressor channel
let (block_sender, block_receiver) = sync_channel(3);
let thread_join_handle = thread_builder.spawn(move || {
let mut block_compressor = BlockCompressor::new(compressor, data_sender);
while let Ok(packet) = block_receiver.recv() {
match packet {
BlockCompressorMessage::AddBlock(block) => {
block_compressor.compress_block(block)?;
}
BlockCompressorMessage::Stack((store_reader, block)) => {
block_compressor.stack(block, store_reader)?;
}
}
}
block_compressor.close()
})?;
Ok(StoreWriter {
compressor,
block_size,
doc: 0,
first_doc_in_block: 0,
offset_index_writer: SkipIndexBuilder::new(),
writer: CountingWriter::wrap(writer),
num_docs_in_current_block: 0,
intermediary_buffer: Vec::new(),
current_block: Vec::new(),
}
writer,
compressor_sender: block_sender,
compressor_thread_handle: thread_join_handle,
data_receiver,
})
}
pub(crate) fn compressor(&self) -> Compressor {
@@ -56,21 +96,53 @@ impl StoreWriter {
self.intermediary_buffer.capacity() + self.current_block.capacity()
}
/// Store bytes of a serialized document.
///
/// The document id is implicitely the current number
/// of documents.
pub fn store_bytes(&mut self, serialized_document: &[u8]) -> io::Result<()> {
let doc_num_bytes = serialized_document.len();
VInt(doc_num_bytes as u64).serialize(&mut self.current_block)?;
self.current_block.write_all(serialized_document)?;
self.doc += 1;
fn check_flush_block(&mut self) -> io::Result<()> {
if self.current_block.len() > self.block_size {
self.write_and_compress_block()?;
let block = self.get_current_block();
self.compressor_sender
.send(BlockCompressorMessage::AddBlock(block))
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
self.fetch_writes_from_channel()?;
}
Ok(())
}
/// Try to empty the queue to write into the file.
///
/// This is done in order to avoid writing from multiple threads into the file.
fn fetch_writes_from_channel(&mut self) -> io::Result<()> {
loop {
match self.data_receiver.try_recv() {
Ok(data) => {
self.writer.write_all(data.as_slice())?;
}
Err(err) => match err {
TryRecvError::Empty => {
break;
}
TryRecvError::Disconnected => {
return Err(io::Error::new(
io::ErrorKind::Other,
"compressor data channel unexpected closed".to_string(),
));
}
},
}
}
Ok(())
}
fn get_current_block(&mut self) -> DocumentBlock {
let block = DocumentBlock {
data: self.current_block.to_owned(),
num_docs_in_block: self.num_docs_in_current_block,
};
self.current_block.clear();
self.num_docs_in_current_block = 0;
block
}
/// Store a new document.
///
/// The document id is implicitely the current number
@@ -85,10 +157,21 @@ impl StoreWriter {
VInt(doc_num_bytes as u64).serialize(&mut self.current_block)?;
self.current_block
.write_all(&self.intermediary_buffer[..])?;
self.doc += 1;
if self.current_block.len() > self.block_size {
self.write_and_compress_block()?;
}
self.num_docs_in_current_block += 1;
self.check_flush_block()?;
Ok(())
}
/// Store bytes of a serialized document.
///
/// The document id is implicitely the current number
/// of documents.
pub fn store_bytes(&mut self, serialized_document: &[u8]) -> io::Result<()> {
let doc_num_bytes = serialized_document.len();
VInt(doc_num_bytes as u64).serialize(&mut self.current_block)?;
self.current_block.write_all(serialized_document)?;
self.num_docs_in_current_block += 1;
self.check_flush_block()?;
Ok(())
}
@@ -96,17 +179,123 @@ impl StoreWriter {
/// This method is an optimization compared to iterating over the documents
/// in the store and adding them one by one, as the store's data will
/// not be decompressed and then recompressed.
pub fn stack(&mut self, store_reader: &StoreReader) -> io::Result<()> {
if !self.current_block.is_empty() {
self.write_and_compress_block()?;
pub fn stack(&mut self, store_reader: StoreReader) -> io::Result<()> {
self.check_flush_block()?;
let block = self.get_current_block();
self.compressor_sender
.send(BlockCompressorMessage::Stack((store_reader, block)))
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
Ok(())
}
/// Finalized the store writer.
///
/// Compress the last unfinished block if any,
/// and serializes the skip list index on disc.
pub fn close(mut self) -> io::Result<()> {
let block = self.get_current_block();
if !block.is_empty() {
self.compressor_sender
.send(BlockCompressorMessage::AddBlock(block))
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
}
drop(self.compressor_sender);
// Wait for remaining data on the channel to write
while let Ok(data) = self.data_receiver.recv() {
self.writer.write_all(data.as_slice())?;
}
// The compressor thread should have finished already, since data_receiver stopped
// receiving
let (docstore_footer, offset_index_writer) = self
.compressor_thread_handle
.join()
.map_err(|err| io::Error::new(io::ErrorKind::Other, format!("{:?}", err)))??;
offset_index_writer.serialize_into(&mut self.writer)?;
docstore_footer.serialize(&mut self.writer)?;
self.writer.terminate()
}
}
/// BlockCompressor is seperated from StoreWriter, to be run in an own thread
pub struct BlockCompressor {
compressor: Compressor,
doc: DocId,
offset_index_writer: SkipIndexBuilder,
intermediary_buffer: Vec<u8>,
written_bytes: usize,
data_sender: SyncSender<OwnedBytes>,
}
struct DocumentBlock {
data: Vec<u8>,
num_docs_in_block: DocId,
}
impl DocumentBlock {
fn is_empty(&self) -> bool {
self.data.is_empty()
}
}
impl BlockCompressor {
fn new(compressor: Compressor, data_sender: SyncSender<OwnedBytes>) -> Self {
Self {
compressor,
doc: 0,
offset_index_writer: SkipIndexBuilder::new(),
intermediary_buffer: Vec::new(),
written_bytes: 0,
data_sender,
}
}
fn compress_block(&mut self, block: DocumentBlock) -> io::Result<()> {
assert!(block.num_docs_in_block > 0);
self.intermediary_buffer.clear();
self.compressor
.compress_into(&block.data[..], &mut self.intermediary_buffer)?;
let byte_range = self.written_bytes..self.written_bytes + self.intermediary_buffer.len();
self.data_sender
.send(OwnedBytes::new(self.intermediary_buffer.to_owned()))
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
self.written_bytes += byte_range.len();
self.register_checkpoint(Checkpoint {
doc_range: self.doc..self.doc + block.num_docs_in_block,
byte_range,
});
Ok(())
}
fn register_checkpoint(&mut self, checkpoint: Checkpoint) {
self.offset_index_writer.insert(checkpoint.clone());
self.doc = checkpoint.doc_range.end;
}
/// Stacks a store reader on top of the documents written so far.
/// This method is an optimization compared to iterating over the documents
/// in the store and adding them one by one, as the store's data will
/// not be decompressed and then recompressed.
fn stack(&mut self, block: DocumentBlock, store_reader: StoreReader) -> io::Result<()> {
if !block.is_empty() {
self.compress_block(block)?;
}
assert_eq!(self.first_doc_in_block, self.doc);
let doc_shift = self.doc;
let start_shift = self.writer.written_bytes() as usize;
let start_shift = self.written_bytes;
// just bulk write all of the block of the given reader.
self.writer
.write_all(store_reader.block_data()?.as_slice())?;
self.data_sender
.send(store_reader.block_data()?)
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
self.written_bytes += store_reader.block_data()?.as_slice().len();
// concatenate the index of the `store_reader`, after translating
// its start doc id and its start file offset.
@@ -119,42 +308,12 @@ impl StoreWriter {
}
Ok(())
}
fn close(self) -> io::Result<(DocStoreFooter, SkipIndexBuilder)> {
drop(self.data_sender);
fn register_checkpoint(&mut self, checkpoint: Checkpoint) {
self.offset_index_writer.insert(checkpoint.clone());
self.first_doc_in_block = checkpoint.doc_range.end;
self.doc = checkpoint.doc_range.end;
}
fn write_and_compress_block(&mut self) -> io::Result<()> {
assert!(self.doc > 0);
self.intermediary_buffer.clear();
self.compressor
.compress_into(&self.current_block[..], &mut self.intermediary_buffer)?;
let start_offset = self.writer.written_bytes() as usize;
self.writer.write_all(&self.intermediary_buffer)?;
let end_offset = self.writer.written_bytes() as usize;
let end_doc = self.doc;
self.register_checkpoint(Checkpoint {
doc_range: self.first_doc_in_block..end_doc,
byte_range: start_offset..end_offset,
});
self.current_block.clear();
Ok(())
}
/// Finalized the store writer.
///
/// Compress the last unfinished block if any,
/// and serializes the skip list index on disc.
pub fn close(mut self) -> io::Result<()> {
if !self.current_block.is_empty() {
self.write_and_compress_block()?;
}
let header_offset: u64 = self.writer.written_bytes() as u64;
let header_offset: u64 = self.written_bytes as u64;
let footer = DocStoreFooter::new(header_offset, Decompressor::from(self.compressor));
self.offset_index_writer.write(&mut self.writer)?;
footer.serialize(&mut self.writer)?;
self.writer.terminate()
Ok((footer, self.offset_index_writer))
}
}