mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-06-05 01:50:42 +00:00
Allow for a same-thread doc compressor. (#1510)
In addition, it isolates the doc compressor logic, better reports io::Result. In the case of the same-thread doc compressor, the blocks are also not copied.
This commit is contained in:
@@ -235,6 +235,14 @@ impl InnerSegmentMeta {
|
||||
}
|
||||
}
|
||||
|
||||
fn return_true() -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
fn is_true(val: &bool) -> bool {
|
||||
*val
|
||||
}
|
||||
|
||||
/// Search Index Settings.
|
||||
///
|
||||
/// Contains settings which are applied on the whole
|
||||
@@ -248,6 +256,12 @@ pub struct IndexSettings {
|
||||
/// The `Compressor` used to compress the doc store.
|
||||
#[serde(default)]
|
||||
pub docstore_compression: Compressor,
|
||||
/// If set to true, docstore compression will happen on a dedicated thread.
|
||||
/// (defaults: true)
|
||||
#[doc(hidden)]
|
||||
#[serde(default = "return_true")]
|
||||
#[serde(skip_serializing_if = "is_true")]
|
||||
pub docstore_compress_dedicated_thread: bool,
|
||||
#[serde(default = "default_docstore_blocksize")]
|
||||
/// The size of each block that will be compressed and written to disk
|
||||
pub docstore_blocksize: usize,
|
||||
@@ -264,6 +278,7 @@ impl Default for IndexSettings {
|
||||
sort_by_field: None,
|
||||
docstore_compression: Compressor::default(),
|
||||
docstore_blocksize: default_docstore_blocksize(),
|
||||
docstore_compress_dedicated_thread: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -395,7 +410,7 @@ mod tests {
|
||||
use super::IndexMeta;
|
||||
use crate::core::index_meta::UntrackedIndexMeta;
|
||||
use crate::schema::{Schema, TEXT};
|
||||
use crate::store::ZstdCompressor;
|
||||
use crate::store::{Compressor, ZstdCompressor};
|
||||
use crate::{IndexSettings, IndexSortByField, Order};
|
||||
|
||||
#[test]
|
||||
@@ -447,6 +462,7 @@ mod tests {
|
||||
compression_level: Some(4),
|
||||
}),
|
||||
docstore_blocksize: 1_000_000,
|
||||
docstore_compress_dedicated_thread: true,
|
||||
},
|
||||
segments: Vec::new(),
|
||||
schema,
|
||||
@@ -485,4 +501,47 @@ mod tests {
|
||||
"unknown zstd option \"bla\" at line 1 column 103".to_string()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg(feature = "lz4-compression")]
|
||||
fn test_index_settings_default() {
|
||||
let mut index_settings = IndexSettings::default();
|
||||
assert_eq!(
|
||||
index_settings,
|
||||
IndexSettings {
|
||||
sort_by_field: None,
|
||||
docstore_compression: Compressor::default(),
|
||||
docstore_compress_dedicated_thread: true,
|
||||
docstore_blocksize: 16_384
|
||||
}
|
||||
);
|
||||
{
|
||||
let index_settings_json = serde_json::to_value(&index_settings).unwrap();
|
||||
assert_eq!(
|
||||
index_settings_json,
|
||||
serde_json::json!({
|
||||
"docstore_compression": "lz4",
|
||||
"docstore_blocksize": 16384
|
||||
})
|
||||
);
|
||||
let index_settings_deser: IndexSettings =
|
||||
serde_json::from_value(index_settings_json).unwrap();
|
||||
assert_eq!(index_settings_deser, index_settings);
|
||||
}
|
||||
{
|
||||
index_settings.docstore_compress_dedicated_thread = false;
|
||||
let index_settings_json = serde_json::to_value(&index_settings).unwrap();
|
||||
assert_eq!(
|
||||
index_settings_json,
|
||||
serde_json::json!({
|
||||
"docstore_compression": "lz4",
|
||||
"docstore_blocksize": 16384,
|
||||
"docstore_compress_dedicated_thread": false,
|
||||
})
|
||||
);
|
||||
let index_settings_deser: IndexSettings =
|
||||
serde_json::from_value(index_settings_json).unwrap();
|
||||
assert_eq!(index_settings_deser, index_settings);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,11 +38,16 @@ impl SegmentSerializer {
|
||||
let fieldnorms_serializer = FieldNormsSerializer::from_write(fieldnorms_write)?;
|
||||
|
||||
let postings_serializer = InvertedIndexSerializer::open(&mut segment)?;
|
||||
let compressor = segment.index().settings().docstore_compression;
|
||||
let blocksize = segment.index().settings().docstore_blocksize;
|
||||
let settings = segment.index().settings();
|
||||
let store_writer = StoreWriter::new(
|
||||
store_write,
|
||||
settings.docstore_compression,
|
||||
settings.docstore_blocksize,
|
||||
settings.docstore_compress_dedicated_thread,
|
||||
)?;
|
||||
Ok(SegmentSerializer {
|
||||
segment,
|
||||
store_writer: StoreWriter::new(store_write, compressor, blocksize)?,
|
||||
store_writer,
|
||||
fast_field_serializer,
|
||||
fieldnorms_serializer: Some(fieldnorms_serializer),
|
||||
postings_serializer,
|
||||
|
||||
@@ -380,12 +380,14 @@ fn remap_and_write(
|
||||
let store_write = serializer
|
||||
.segment_mut()
|
||||
.open_write(SegmentComponent::Store)?;
|
||||
let compressor = serializer.segment().index().settings().docstore_compression;
|
||||
let block_size = serializer.segment().index().settings().docstore_blocksize;
|
||||
let old_store_writer = std::mem::replace(
|
||||
&mut serializer.store_writer,
|
||||
StoreWriter::new(store_write, compressor, block_size)?,
|
||||
);
|
||||
let settings = serializer.segment().index().settings();
|
||||
let store_writer = StoreWriter::new(
|
||||
store_write,
|
||||
settings.docstore_compression,
|
||||
settings.docstore_blocksize,
|
||||
settings.docstore_compress_dedicated_thread,
|
||||
)?;
|
||||
let old_store_writer = std::mem::replace(&mut serializer.store_writer, store_writer);
|
||||
old_store_writer.close()?;
|
||||
let store_read = StoreReader::open(
|
||||
serializer
|
||||
|
||||
@@ -43,6 +43,7 @@ pub use self::decompressors::Decompressor;
|
||||
pub(crate) use self::reader::DOCSTORE_CACHE_CAPACITY;
|
||||
pub use self::reader::{CacheStats, StoreReader};
|
||||
pub use self::writer::StoreWriter;
|
||||
mod store_compressor;
|
||||
|
||||
#[cfg(feature = "lz4-compression")]
|
||||
mod compression_lz4_block;
|
||||
@@ -82,6 +83,7 @@ pub mod tests {
|
||||
num_docs: usize,
|
||||
compressor: Compressor,
|
||||
blocksize: usize,
|
||||
separate_thread: bool,
|
||||
) -> Schema {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let field_body = schema_builder.add_text_field("body", TextOptions::default().set_stored());
|
||||
@@ -89,7 +91,8 @@ pub mod tests {
|
||||
schema_builder.add_text_field("title", TextOptions::default().set_stored());
|
||||
let schema = schema_builder.build();
|
||||
{
|
||||
let mut store_writer = StoreWriter::new(writer, compressor, blocksize).unwrap();
|
||||
let mut store_writer =
|
||||
StoreWriter::new(writer, compressor, blocksize, separate_thread).unwrap();
|
||||
for i in 0..num_docs {
|
||||
let mut doc = Document::default();
|
||||
doc.add_field_value(field_body, LOREM.to_string());
|
||||
@@ -112,7 +115,8 @@ pub mod tests {
|
||||
let path = Path::new("store");
|
||||
let directory = RamDirectory::create();
|
||||
let store_wrt = directory.open_write(path)?;
|
||||
let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, Compressor::Lz4, BLOCK_SIZE);
|
||||
let schema =
|
||||
write_lorem_ipsum_store(store_wrt, NUM_DOCS, Compressor::Lz4, BLOCK_SIZE, true);
|
||||
let field_title = schema.get_field("title").unwrap();
|
||||
let store_file = directory.open_read(path)?;
|
||||
let store = StoreReader::open(store_file, 10)?;
|
||||
@@ -148,11 +152,16 @@ pub mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn test_store(compressor: Compressor, blocksize: usize) -> crate::Result<()> {
|
||||
fn test_store(
|
||||
compressor: Compressor,
|
||||
blocksize: usize,
|
||||
separate_thread: bool,
|
||||
) -> crate::Result<()> {
|
||||
let path = Path::new("store");
|
||||
let directory = RamDirectory::create();
|
||||
let store_wrt = directory.open_write(path)?;
|
||||
let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, compressor, blocksize);
|
||||
let schema =
|
||||
write_lorem_ipsum_store(store_wrt, NUM_DOCS, compressor, blocksize, separate_thread);
|
||||
let field_title = schema.get_field("title").unwrap();
|
||||
let store_file = directory.open_read(path)?;
|
||||
let store = StoreReader::open(store_file, 10)?;
|
||||
@@ -177,29 +186,39 @@ pub mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_store_noop() -> crate::Result<()> {
|
||||
test_store(Compressor::None, BLOCK_SIZE)
|
||||
fn test_store_no_compression_same_thread() -> crate::Result<()> {
|
||||
test_store(Compressor::None, BLOCK_SIZE, false)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_store_no_compression() -> crate::Result<()> {
|
||||
test_store(Compressor::None, BLOCK_SIZE, true)
|
||||
}
|
||||
|
||||
#[cfg(feature = "lz4-compression")]
|
||||
#[test]
|
||||
fn test_store_lz4_block() -> crate::Result<()> {
|
||||
test_store(Compressor::Lz4, BLOCK_SIZE)
|
||||
test_store(Compressor::Lz4, BLOCK_SIZE, true)
|
||||
}
|
||||
#[cfg(feature = "snappy-compression")]
|
||||
#[test]
|
||||
fn test_store_snap() -> crate::Result<()> {
|
||||
test_store(Compressor::Snappy, BLOCK_SIZE)
|
||||
test_store(Compressor::Snappy, BLOCK_SIZE, true)
|
||||
}
|
||||
#[cfg(feature = "brotli-compression")]
|
||||
#[test]
|
||||
fn test_store_brotli() -> crate::Result<()> {
|
||||
test_store(Compressor::Brotli, BLOCK_SIZE)
|
||||
test_store(Compressor::Brotli, BLOCK_SIZE, true)
|
||||
}
|
||||
|
||||
#[cfg(feature = "zstd-compression")]
|
||||
#[test]
|
||||
fn test_store_zstd() -> crate::Result<()> {
|
||||
test_store(Compressor::Zstd(ZstdCompressor::default()), BLOCK_SIZE)
|
||||
test_store(
|
||||
Compressor::Zstd(ZstdCompressor::default()),
|
||||
BLOCK_SIZE,
|
||||
true,
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -364,6 +383,7 @@ mod bench {
|
||||
1_000,
|
||||
Compressor::default(),
|
||||
16_384,
|
||||
true,
|
||||
);
|
||||
directory.delete(path).unwrap();
|
||||
});
|
||||
@@ -378,6 +398,7 @@ mod bench {
|
||||
1_000,
|
||||
Compressor::default(),
|
||||
16_384,
|
||||
true,
|
||||
);
|
||||
let store_file = directory.open_read(path).unwrap();
|
||||
let store = StoreReader::open(store_file, 10).unwrap();
|
||||
|
||||
@@ -393,7 +393,7 @@ mod tests {
|
||||
let directory = RamDirectory::create();
|
||||
let path = Path::new("store");
|
||||
let writer = directory.open_write(path)?;
|
||||
let schema = write_lorem_ipsum_store(writer, 500, Compressor::default(), BLOCK_SIZE);
|
||||
let schema = write_lorem_ipsum_store(writer, 500, Compressor::default(), BLOCK_SIZE, true);
|
||||
let title = schema.get_field("title").unwrap();
|
||||
let store_file = directory.open_read(path)?;
|
||||
let store = StoreReader::open(store_file, DOCSTORE_CACHE_CAPACITY)?;
|
||||
|
||||
269
src/store/store_compressor.rs
Normal file
269
src/store/store_compressor.rs
Normal file
@@ -0,0 +1,269 @@
|
||||
use std::io::Write;
|
||||
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
|
||||
use std::thread::JoinHandle;
|
||||
use std::{io, thread};
|
||||
|
||||
use common::{BinarySerializable, CountingWriter, TerminatingWrite};
|
||||
|
||||
use crate::directory::WritePtr;
|
||||
use crate::store::footer::DocStoreFooter;
|
||||
use crate::store::index::{Checkpoint, SkipIndexBuilder};
|
||||
use crate::store::{Compressor, Decompressor, StoreReader};
|
||||
use crate::DocId;
|
||||
|
||||
pub struct BlockCompressor(BlockCompressorVariants);
|
||||
|
||||
// The struct wrapping an enum is just here to keep the
|
||||
// impls private.
|
||||
enum BlockCompressorVariants {
|
||||
SameThread(BlockCompressorImpl),
|
||||
DedicatedThread(DedicatedThreadBlockCompressorImpl),
|
||||
}
|
||||
|
||||
impl BlockCompressor {
|
||||
pub fn new(compressor: Compressor, wrt: WritePtr, dedicated_thread: bool) -> io::Result<Self> {
|
||||
let block_compressor_impl = BlockCompressorImpl::new(compressor, wrt);
|
||||
if dedicated_thread {
|
||||
let dedicated_thread_compressor =
|
||||
DedicatedThreadBlockCompressorImpl::new(block_compressor_impl)?;
|
||||
Ok(BlockCompressor(BlockCompressorVariants::DedicatedThread(
|
||||
dedicated_thread_compressor,
|
||||
)))
|
||||
} else {
|
||||
Ok(BlockCompressor(BlockCompressorVariants::SameThread(
|
||||
block_compressor_impl,
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn compress_block_and_write(
|
||||
&mut self,
|
||||
bytes: &[u8],
|
||||
num_docs_in_block: u32,
|
||||
) -> io::Result<()> {
|
||||
match &mut self.0 {
|
||||
BlockCompressorVariants::SameThread(block_compressor) => {
|
||||
block_compressor.compress_block_and_write(bytes, num_docs_in_block)?;
|
||||
}
|
||||
BlockCompressorVariants::DedicatedThread(different_thread_block_compressor) => {
|
||||
different_thread_block_compressor
|
||||
.compress_block_and_write(bytes, num_docs_in_block)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn stack_reader(&mut self, store_reader: StoreReader) -> io::Result<()> {
|
||||
match &mut self.0 {
|
||||
BlockCompressorVariants::SameThread(block_compressor) => {
|
||||
block_compressor.stack(store_reader)?;
|
||||
}
|
||||
BlockCompressorVariants::DedicatedThread(different_thread_block_compressor) => {
|
||||
different_thread_block_compressor.stack_reader(store_reader)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn close(self) -> io::Result<()> {
|
||||
let imp = self.0;
|
||||
match imp {
|
||||
BlockCompressorVariants::SameThread(block_compressor) => block_compressor.close(),
|
||||
BlockCompressorVariants::DedicatedThread(different_thread_block_compressor) => {
|
||||
different_thread_block_compressor.close()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct BlockCompressorImpl {
|
||||
compressor: Compressor,
|
||||
first_doc_in_block: DocId,
|
||||
offset_index_writer: SkipIndexBuilder,
|
||||
intermediary_buffer: Vec<u8>,
|
||||
writer: CountingWriter<WritePtr>,
|
||||
}
|
||||
|
||||
impl BlockCompressorImpl {
|
||||
fn new(compressor: Compressor, writer: WritePtr) -> Self {
|
||||
Self {
|
||||
compressor,
|
||||
first_doc_in_block: 0,
|
||||
offset_index_writer: SkipIndexBuilder::new(),
|
||||
intermediary_buffer: Vec::new(),
|
||||
writer: CountingWriter::wrap(writer),
|
||||
}
|
||||
}
|
||||
|
||||
fn compress_block_and_write(&mut self, data: &[u8], num_docs_in_block: u32) -> io::Result<()> {
|
||||
assert!(num_docs_in_block > 0);
|
||||
self.intermediary_buffer.clear();
|
||||
self.compressor
|
||||
.compress_into(data, &mut self.intermediary_buffer)?;
|
||||
|
||||
let start_offset = self.writer.written_bytes() as usize;
|
||||
self.writer.write_all(&self.intermediary_buffer)?;
|
||||
let end_offset = self.writer.written_bytes() as usize;
|
||||
|
||||
self.register_checkpoint(Checkpoint {
|
||||
doc_range: self.first_doc_in_block..self.first_doc_in_block + num_docs_in_block,
|
||||
byte_range: start_offset..end_offset,
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn register_checkpoint(&mut self, checkpoint: Checkpoint) {
|
||||
self.offset_index_writer.insert(checkpoint.clone());
|
||||
self.first_doc_in_block = checkpoint.doc_range.end;
|
||||
}
|
||||
|
||||
/// Stacks a store reader on top of the documents written so far.
|
||||
/// This method is an optimization compared to iterating over the documents
|
||||
/// in the store and adding them one by one, as the store's data will
|
||||
/// not be decompressed and then recompressed.
|
||||
fn stack(&mut self, store_reader: StoreReader) -> io::Result<()> {
|
||||
let doc_shift = self.first_doc_in_block;
|
||||
let start_shift = self.writer.written_bytes() as usize;
|
||||
|
||||
// just bulk write all of the block of the given reader.
|
||||
self.writer
|
||||
.write_all(store_reader.block_data()?.as_slice())?;
|
||||
|
||||
// concatenate the index of the `store_reader`, after translating
|
||||
// its start doc id and its start file offset.
|
||||
for mut checkpoint in store_reader.block_checkpoints() {
|
||||
checkpoint.doc_range.start += doc_shift;
|
||||
checkpoint.doc_range.end += doc_shift;
|
||||
checkpoint.byte_range.start += start_shift;
|
||||
checkpoint.byte_range.end += start_shift;
|
||||
self.register_checkpoint(checkpoint);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn close(mut self) -> io::Result<()> {
|
||||
let header_offset: u64 = self.writer.written_bytes() as u64;
|
||||
let docstore_footer =
|
||||
DocStoreFooter::new(header_offset, Decompressor::from(self.compressor));
|
||||
self.offset_index_writer.serialize_into(&mut self.writer)?;
|
||||
docstore_footer.serialize(&mut self.writer)?;
|
||||
self.writer.terminate()
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------
|
||||
enum BlockCompressorMessage {
|
||||
CompressBlockAndWrite {
|
||||
block_data: Vec<u8>,
|
||||
num_docs_in_block: u32,
|
||||
},
|
||||
Stack(StoreReader),
|
||||
}
|
||||
|
||||
struct DedicatedThreadBlockCompressorImpl {
|
||||
join_handle: Option<JoinHandle<io::Result<()>>>,
|
||||
tx: SyncSender<BlockCompressorMessage>,
|
||||
}
|
||||
|
||||
impl DedicatedThreadBlockCompressorImpl {
|
||||
fn new(mut block_compressor: BlockCompressorImpl) -> io::Result<Self> {
|
||||
let (tx, rx): (
|
||||
SyncSender<BlockCompressorMessage>,
|
||||
Receiver<BlockCompressorMessage>,
|
||||
) = sync_channel(3);
|
||||
let join_handle = thread::Builder::new()
|
||||
.name("docstore-compressor-thread".to_string())
|
||||
.spawn(move || {
|
||||
while let Ok(packet) = rx.recv() {
|
||||
match packet {
|
||||
BlockCompressorMessage::CompressBlockAndWrite {
|
||||
block_data,
|
||||
num_docs_in_block,
|
||||
} => {
|
||||
block_compressor
|
||||
.compress_block_and_write(&block_data[..], num_docs_in_block)?;
|
||||
}
|
||||
BlockCompressorMessage::Stack(store_reader) => {
|
||||
block_compressor.stack(store_reader)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
block_compressor.close()?;
|
||||
Ok(())
|
||||
})?;
|
||||
Ok(DedicatedThreadBlockCompressorImpl {
|
||||
join_handle: Some(join_handle),
|
||||
tx,
|
||||
})
|
||||
}
|
||||
|
||||
fn compress_block_and_write(&mut self, bytes: &[u8], num_docs_in_block: u32) -> io::Result<()> {
|
||||
self.send(BlockCompressorMessage::CompressBlockAndWrite {
|
||||
block_data: bytes.to_vec(),
|
||||
num_docs_in_block,
|
||||
})
|
||||
}
|
||||
|
||||
fn stack_reader(&mut self, store_reader: StoreReader) -> io::Result<()> {
|
||||
self.send(BlockCompressorMessage::Stack(store_reader))
|
||||
}
|
||||
|
||||
fn send(&mut self, msg: BlockCompressorMessage) -> io::Result<()> {
|
||||
if self.tx.send(msg).is_err() {
|
||||
harvest_thread_result(self.join_handle.take())?;
|
||||
return Err(io::Error::new(io::ErrorKind::Other, "Unidentified error."));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn close(self) -> io::Result<()> {
|
||||
drop(self.tx);
|
||||
harvest_thread_result(self.join_handle)
|
||||
}
|
||||
}
|
||||
|
||||
/// Wait for the thread result to terminate and returns its result.
|
||||
///
|
||||
/// If the thread panicked, or if the result has already been harvested,
|
||||
/// returns an explicit error.
|
||||
fn harvest_thread_result(join_handle_opt: Option<JoinHandle<io::Result<()>>>) -> io::Result<()> {
|
||||
let join_handle = join_handle_opt
|
||||
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "Thread already joined."))?;
|
||||
join_handle
|
||||
.join()
|
||||
.map_err(|_err| io::Error::new(io::ErrorKind::Other, "Compressing thread panicked."))?
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::io;
|
||||
use std::path::Path;
|
||||
|
||||
use crate::directory::RamDirectory;
|
||||
use crate::store::store_compressor::BlockCompressor;
|
||||
use crate::store::Compressor;
|
||||
use crate::Directory;
|
||||
|
||||
fn populate_block_compressor(mut block_compressor: BlockCompressor) -> io::Result<()> {
|
||||
block_compressor.compress_block_and_write(b"hello", 1)?;
|
||||
block_compressor.compress_block_and_write(b"happy", 1)?;
|
||||
block_compressor.close()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_block_store_compressor_impls_yield_the_same_result() {
|
||||
let ram_directory = RamDirectory::default();
|
||||
let path1 = Path::new("path1");
|
||||
let path2 = Path::new("path2");
|
||||
let wrt1 = ram_directory.open_write(path1).unwrap();
|
||||
let wrt2 = ram_directory.open_write(path2).unwrap();
|
||||
let block_compressor1 = BlockCompressor::new(Compressor::None, wrt1, true).unwrap();
|
||||
let block_compressor2 = BlockCompressor::new(Compressor::None, wrt2, false).unwrap();
|
||||
populate_block_compressor(block_compressor1).unwrap();
|
||||
populate_block_compressor(block_compressor2).unwrap();
|
||||
let data1 = ram_directory.open_read(path1).unwrap();
|
||||
let data2 = ram_directory.open_read(path2).unwrap();
|
||||
assert_eq!(data1.read_bytes().unwrap(), data2.read_bytes().unwrap());
|
||||
}
|
||||
}
|
||||
@@ -1,16 +1,12 @@
|
||||
use std::io::{self, Write};
|
||||
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
|
||||
use std::thread::{self, JoinHandle};
|
||||
|
||||
use common::{BinarySerializable, CountingWriter, VInt};
|
||||
use common::{BinarySerializable, VInt};
|
||||
|
||||
use super::compressors::Compressor;
|
||||
use super::footer::DocStoreFooter;
|
||||
use super::index::SkipIndexBuilder;
|
||||
use super::{Decompressor, StoreReader};
|
||||
use crate::directory::{TerminatingWrite, WritePtr};
|
||||
use super::StoreReader;
|
||||
use crate::directory::WritePtr;
|
||||
use crate::schema::Document;
|
||||
use crate::store::index::Checkpoint;
|
||||
use crate::store::store_compressor::BlockCompressor;
|
||||
use crate::DocId;
|
||||
|
||||
/// Write tantivy's [`Store`](./index.html)
|
||||
@@ -26,16 +22,7 @@ pub struct StoreWriter {
|
||||
num_docs_in_current_block: DocId,
|
||||
intermediary_buffer: Vec<u8>,
|
||||
current_block: Vec<u8>,
|
||||
|
||||
// the channel to send data to the compressor thread.
|
||||
compressor_sender: SyncSender<BlockCompressorMessage>,
|
||||
// the handle to check for errors on the thread
|
||||
compressor_thread_handle: JoinHandle<io::Result<()>>,
|
||||
}
|
||||
|
||||
enum BlockCompressorMessage {
|
||||
AddBlock(DocumentBlock),
|
||||
Stack(StoreReader),
|
||||
block_compressor: BlockCompressor,
|
||||
}
|
||||
|
||||
impl StoreWriter {
|
||||
@@ -47,38 +34,16 @@ impl StoreWriter {
|
||||
writer: WritePtr,
|
||||
compressor: Compressor,
|
||||
block_size: usize,
|
||||
dedicated_thread: bool,
|
||||
) -> io::Result<StoreWriter> {
|
||||
let thread_builder = thread::Builder::new().name("docstore-compressor-thread".to_string());
|
||||
|
||||
// Channel to send uncompressed data to compressor channel
|
||||
let (block_sender, block_receiver): (
|
||||
SyncSender<BlockCompressorMessage>,
|
||||
Receiver<BlockCompressorMessage>,
|
||||
) = sync_channel(3);
|
||||
let thread_join_handle = thread_builder.spawn(move || {
|
||||
let mut block_compressor = BlockCompressor::new(compressor, writer);
|
||||
while let Ok(packet) = block_receiver.recv() {
|
||||
match packet {
|
||||
BlockCompressorMessage::AddBlock(block) => {
|
||||
block_compressor.compress_block_and_write(block)?;
|
||||
}
|
||||
BlockCompressorMessage::Stack(store_reader) => {
|
||||
block_compressor.stack(store_reader)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
block_compressor.close()?;
|
||||
Ok(())
|
||||
})?;
|
||||
|
||||
let block_compressor = BlockCompressor::new(compressor, writer, dedicated_thread)?;
|
||||
Ok(StoreWriter {
|
||||
compressor,
|
||||
block_size,
|
||||
num_docs_in_current_block: 0,
|
||||
intermediary_buffer: Vec::new(),
|
||||
current_block: Vec::new(),
|
||||
compressor_sender: block_sender,
|
||||
compressor_thread_handle: thread_join_handle,
|
||||
block_compressor,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -105,16 +70,10 @@ impl StoreWriter {
|
||||
if self.current_block.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
let block = DocumentBlock {
|
||||
data: self.current_block.to_owned(),
|
||||
num_docs_in_block: self.num_docs_in_current_block,
|
||||
};
|
||||
self.block_compressor
|
||||
.compress_block_and_write(&self.current_block, self.num_docs_in_current_block)?;
|
||||
self.current_block.clear();
|
||||
self.num_docs_in_current_block = 0;
|
||||
self.compressor_sender
|
||||
.send(BlockCompressorMessage::AddBlock(block))
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -157,10 +116,7 @@ impl StoreWriter {
|
||||
pub fn stack(&mut self, store_reader: StoreReader) -> io::Result<()> {
|
||||
// We flush the current block first before stacking
|
||||
self.send_current_block_to_compressor()?;
|
||||
self.compressor_sender
|
||||
.send(BlockCompressorMessage::Stack(store_reader))
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
|
||||
|
||||
self.block_compressor.stack_reader(store_reader)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -170,93 +126,7 @@ impl StoreWriter {
|
||||
/// and serializes the skip list index on disc.
|
||||
pub fn close(mut self) -> io::Result<()> {
|
||||
self.send_current_block_to_compressor()?;
|
||||
drop(self.compressor_sender);
|
||||
|
||||
self.compressor_thread_handle
|
||||
.join()
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, format!("{:?}", err)))??;
|
||||
|
||||
self.block_compressor.close()?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// BlockCompressor is separated from StoreWriter, to be run in an own thread
|
||||
pub struct BlockCompressor {
|
||||
compressor: Compressor,
|
||||
first_doc_in_block: DocId,
|
||||
offset_index_writer: SkipIndexBuilder,
|
||||
intermediary_buffer: Vec<u8>,
|
||||
writer: CountingWriter<WritePtr>,
|
||||
}
|
||||
|
||||
struct DocumentBlock {
|
||||
data: Vec<u8>,
|
||||
num_docs_in_block: DocId,
|
||||
}
|
||||
|
||||
impl BlockCompressor {
|
||||
fn new(compressor: Compressor, writer: WritePtr) -> Self {
|
||||
Self {
|
||||
compressor,
|
||||
first_doc_in_block: 0,
|
||||
offset_index_writer: SkipIndexBuilder::new(),
|
||||
intermediary_buffer: Vec::new(),
|
||||
writer: CountingWriter::wrap(writer),
|
||||
}
|
||||
}
|
||||
|
||||
fn compress_block_and_write(&mut self, block: DocumentBlock) -> io::Result<()> {
|
||||
assert!(block.num_docs_in_block > 0);
|
||||
self.intermediary_buffer.clear();
|
||||
self.compressor
|
||||
.compress_into(&block.data[..], &mut self.intermediary_buffer)?;
|
||||
|
||||
let start_offset = self.writer.written_bytes() as usize;
|
||||
self.writer.write_all(&self.intermediary_buffer)?;
|
||||
let end_offset = self.writer.written_bytes() as usize;
|
||||
|
||||
self.register_checkpoint(Checkpoint {
|
||||
doc_range: self.first_doc_in_block..self.first_doc_in_block + block.num_docs_in_block,
|
||||
byte_range: start_offset..end_offset,
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn register_checkpoint(&mut self, checkpoint: Checkpoint) {
|
||||
self.offset_index_writer.insert(checkpoint.clone());
|
||||
self.first_doc_in_block = checkpoint.doc_range.end;
|
||||
}
|
||||
|
||||
/// Stacks a store reader on top of the documents written so far.
|
||||
/// This method is an optimization compared to iterating over the documents
|
||||
/// in the store and adding them one by one, as the store's data will
|
||||
/// not be decompressed and then recompressed.
|
||||
fn stack(&mut self, store_reader: StoreReader) -> io::Result<()> {
|
||||
let doc_shift = self.first_doc_in_block;
|
||||
let start_shift = self.writer.written_bytes() as usize;
|
||||
|
||||
// just bulk write all of the block of the given reader.
|
||||
self.writer
|
||||
.write_all(store_reader.block_data()?.as_slice())?;
|
||||
|
||||
// concatenate the index of the `store_reader`, after translating
|
||||
// its start doc id and its start file offset.
|
||||
for mut checkpoint in store_reader.block_checkpoints() {
|
||||
checkpoint.doc_range.start += doc_shift;
|
||||
checkpoint.doc_range.end += doc_shift;
|
||||
checkpoint.byte_range.start += start_shift;
|
||||
checkpoint.byte_range.end += start_shift;
|
||||
self.register_checkpoint(checkpoint);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
fn close(mut self) -> io::Result<()> {
|
||||
let header_offset: u64 = self.writer.written_bytes() as u64;
|
||||
let docstore_footer =
|
||||
DocStoreFooter::new(header_offset, Decompressor::from(self.compressor));
|
||||
|
||||
self.offset_index_writer.serialize_into(&mut self.writer)?;
|
||||
docstore_footer.serialize(&mut self.writer)?;
|
||||
self.writer.terminate()
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user