mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-06-04 09:30:42 +00:00
enable setting compression level
This commit is contained in:
@@ -251,6 +251,11 @@ pub struct IndexSettings {
|
||||
#[serde(default = "default_docstore_blocksize")]
|
||||
/// The size of each block that will be compressed and written to disk
|
||||
pub docstore_blocksize: usize,
|
||||
#[serde(default)]
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
/// The compression level, which will be forwarded to the underlying compressor (if
|
||||
/// applicaple).
|
||||
pub docstore_compression_level: Option<i32>,
|
||||
}
|
||||
|
||||
/// Must be a function to be compatible with serde defaults
|
||||
@@ -264,6 +269,7 @@ impl Default for IndexSettings {
|
||||
sort_by_field: None,
|
||||
docstore_compression: Compressor::default(),
|
||||
docstore_blocksize: default_docstore_blocksize(),
|
||||
docstore_compression_level: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,9 +40,10 @@ impl SegmentSerializer {
|
||||
let postings_serializer = InvertedIndexSerializer::open(&mut segment)?;
|
||||
let compressor = segment.index().settings().docstore_compression;
|
||||
let blocksize = segment.index().settings().docstore_blocksize;
|
||||
let compression_level = segment.index().settings().docstore_compression_level;
|
||||
Ok(SegmentSerializer {
|
||||
segment,
|
||||
store_writer: StoreWriter::new(store_write, compressor, blocksize),
|
||||
store_writer: StoreWriter::new(store_write, compressor, blocksize, compression_level),
|
||||
fast_field_serializer,
|
||||
fieldnorms_serializer: Some(fieldnorms_serializer),
|
||||
postings_serializer,
|
||||
|
||||
@@ -373,9 +373,14 @@ fn remap_and_write(
|
||||
.open_write(SegmentComponent::Store)?;
|
||||
let compressor = serializer.segment().index().settings().docstore_compression;
|
||||
let block_size = serializer.segment().index().settings().docstore_blocksize;
|
||||
let compression_level = serializer
|
||||
.segment()
|
||||
.index()
|
||||
.settings()
|
||||
.docstore_compression_level;
|
||||
let old_store_writer = std::mem::replace(
|
||||
&mut serializer.store_writer,
|
||||
StoreWriter::new(store_write, compressor, block_size),
|
||||
StoreWriter::new(store_write, compressor, block_size, compression_level),
|
||||
);
|
||||
old_store_writer.close()?;
|
||||
let store_read = StoreReader::open(
|
||||
|
||||
@@ -4,7 +4,11 @@ use zstd::bulk::{compress_to_buffer, decompress_to_buffer};
|
||||
use zstd::DEFAULT_COMPRESSION_LEVEL;
|
||||
|
||||
#[inline]
|
||||
pub fn compress(uncompressed: &[u8], compressed: &mut Vec<u8>) -> io::Result<()> {
|
||||
pub fn compress(
|
||||
uncompressed: &[u8],
|
||||
compressed: &mut Vec<u8>,
|
||||
compression_level: Option<i32>,
|
||||
) -> io::Result<()> {
|
||||
let count_size = std::mem::size_of::<u32>();
|
||||
let max_size = zstd::zstd_safe::compress_bound(uncompressed.len()) + count_size;
|
||||
|
||||
@@ -14,7 +18,7 @@ pub fn compress(uncompressed: &[u8], compressed: &mut Vec<u8>) -> io::Result<()>
|
||||
let compressed_size = compress_to_buffer(
|
||||
uncompressed,
|
||||
&mut compressed[count_size..],
|
||||
DEFAULT_COMPRESSION_LEVEL,
|
||||
compression_level.unwrap_or(DEFAULT_COMPRESSION_LEVEL),
|
||||
)?;
|
||||
|
||||
compressed[0..count_size].copy_from_slice(&(uncompressed.len() as u32).to_le_bytes());
|
||||
|
||||
@@ -68,10 +68,11 @@ impl Compressor {
|
||||
}
|
||||
}
|
||||
#[inline]
|
||||
pub(crate) fn compress_into(
|
||||
pub(crate) fn compress(
|
||||
&self,
|
||||
uncompressed: &[u8],
|
||||
compressed: &mut Vec<u8>,
|
||||
_compression_level: Option<i32>,
|
||||
) -> io::Result<()> {
|
||||
match self {
|
||||
Self::None => {
|
||||
@@ -112,7 +113,11 @@ impl Compressor {
|
||||
Self::Zstd => {
|
||||
#[cfg(feature = "zstd-compression")]
|
||||
{
|
||||
super::compression_zstd_block::compress(uncompressed, compressed)
|
||||
super::compression_zstd_block::compress(
|
||||
uncompressed,
|
||||
compressed,
|
||||
_compression_level,
|
||||
)
|
||||
}
|
||||
#[cfg(not(feature = "zstd-compression"))]
|
||||
{
|
||||
|
||||
@@ -86,7 +86,7 @@ pub mod tests {
|
||||
schema_builder.add_text_field("title", TextOptions::default().set_stored());
|
||||
let schema = schema_builder.build();
|
||||
{
|
||||
let mut store_writer = StoreWriter::new(writer, compressor, blocksize);
|
||||
let mut store_writer = StoreWriter::new(writer, compressor, blocksize, None);
|
||||
for i in 0..num_docs {
|
||||
let mut doc = Document::default();
|
||||
doc.add_field_value(field_body, LOREM.to_string());
|
||||
|
||||
@@ -21,6 +21,7 @@ use crate::DocId;
|
||||
pub struct StoreWriter {
|
||||
compressor: Compressor,
|
||||
block_size: usize,
|
||||
compression_level: Option<i32>,
|
||||
doc: DocId,
|
||||
first_doc_in_block: DocId,
|
||||
offset_index_writer: SkipIndexBuilder,
|
||||
@@ -34,10 +35,16 @@ impl StoreWriter {
|
||||
///
|
||||
/// The store writer will writes blocks on disc as
|
||||
/// document are added.
|
||||
pub fn new(writer: WritePtr, compressor: Compressor, block_size: usize) -> StoreWriter {
|
||||
pub fn new(
|
||||
writer: WritePtr,
|
||||
compressor: Compressor,
|
||||
block_size: usize,
|
||||
compression_level: Option<i32>,
|
||||
) -> StoreWriter {
|
||||
StoreWriter {
|
||||
compressor,
|
||||
block_size,
|
||||
compression_level,
|
||||
doc: 0,
|
||||
first_doc_in_block: 0,
|
||||
offset_index_writer: SkipIndexBuilder::new(),
|
||||
@@ -129,8 +136,11 @@ impl StoreWriter {
|
||||
fn write_and_compress_block(&mut self) -> io::Result<()> {
|
||||
assert!(self.doc > 0);
|
||||
self.intermediary_buffer.clear();
|
||||
self.compressor
|
||||
.compress_into(&self.current_block[..], &mut self.intermediary_buffer)?;
|
||||
self.compressor.compress(
|
||||
&self.current_block[..],
|
||||
&mut self.intermediary_buffer,
|
||||
self.compression_level,
|
||||
)?;
|
||||
let start_offset = self.writer.written_bytes() as usize;
|
||||
self.writer.write_all(&self.intermediary_buffer)?;
|
||||
let end_offset = self.writer.written_bytes() as usize;
|
||||
|
||||
Reference in New Issue
Block a user