Merge pull request #1378 from quickwit-oss/test_compression

enable setting compression level
This commit is contained in:
PSeitz
2022-06-10 11:10:07 +08:00
committed by GitHub
9 changed files with 389 additions and 115 deletions

View File

@@ -326,7 +326,7 @@ pub struct IndexMeta {
pub payload: Option<String>,
}
#[derive(Deserialize)]
#[derive(Deserialize, Debug)]
struct UntrackedIndexMeta {
pub segments: Vec<InnerSegmentMeta>,
#[serde(default)]
@@ -395,6 +395,7 @@ mod tests {
use super::IndexMeta;
use crate::core::index_meta::UntrackedIndexMeta;
use crate::schema::{Schema, TEXT};
use crate::store::ZstdCompressor;
use crate::{IndexSettings, IndexSortByField, Order};
#[test]
@@ -428,4 +429,60 @@ mod tests {
assert_eq!(index_metas.schema, deser_meta.schema);
assert_eq!(index_metas.opstamp, deser_meta.opstamp);
}
#[test]
fn test_serialize_metas_zstd_compressor() {
let schema = {
let mut schema_builder = Schema::builder();
schema_builder.add_text_field("text", TEXT);
schema_builder.build()
};
let index_metas = IndexMeta {
index_settings: IndexSettings {
sort_by_field: Some(IndexSortByField {
field: "text".to_string(),
order: Order::Asc,
}),
docstore_compression: crate::store::Compressor::Zstd(ZstdCompressor {
compression_level: Some(4),
}),
docstore_blocksize: 1_000_000,
},
segments: Vec::new(),
schema,
opstamp: 0u64,
payload: None,
};
let json = serde_json::ser::to_string(&index_metas).expect("serialization failed");
assert_eq!(
json,
r#"{"index_settings":{"sort_by_field":{"field":"text","order":"Asc"},"docstore_compression":"zstd(compression_level=4)","docstore_blocksize":1000000},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0}"#
);
let deser_meta: UntrackedIndexMeta = serde_json::from_str(&json).unwrap();
assert_eq!(index_metas.index_settings, deser_meta.index_settings);
assert_eq!(index_metas.schema, deser_meta.schema);
assert_eq!(index_metas.opstamp, deser_meta.opstamp);
}
#[test]
fn test_serialize_metas_invalid_comp() {
let json = r#"{"index_settings":{"sort_by_field":{"field":"text","order":"Asc"},"docstore_compression":"zsstd","docstore_blocksize":1000000},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0}"#;
let err = serde_json::from_str::<UntrackedIndexMeta>(&json).unwrap_err();
assert_eq!(
err.to_string(),
"unknown variant `zsstd`, expected one of `none`, `lz4`, `brotli`, `snappy`, `zstd`, \
`zstd(compression_level=5)` at line 1 column 96"
.to_string()
);
let json = r#"{"index_settings":{"sort_by_field":{"field":"text","order":"Asc"},"docstore_compression":"zstd(bla=10)","docstore_blocksize":1000000},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0}"#;
let err = serde_json::from_str::<UntrackedIndexMeta>(&json).unwrap_err();
assert_eq!(
err.to_string(),
"unknown zstd option \"bla\" at line 1 column 103".to_string()
);
}
}

View File

@@ -1073,7 +1073,7 @@ impl IndexMerger {
//
// take 7 in order to not walk over all checkpoints.
|| store_reader.block_checkpoints().take(7).count() < 6
|| store_reader.compressor() != store_writer.compressor()
|| store_reader.decompressor() != store_writer.compressor().into()
{
for doc_bytes_res in store_reader.iter_raw(reader.alive_bitset()) {
let doc_bytes = doc_bytes_res?;

View File

@@ -4,7 +4,11 @@ use zstd::bulk::{compress_to_buffer, decompress_to_buffer};
use zstd::DEFAULT_COMPRESSION_LEVEL;
#[inline]
pub fn compress(uncompressed: &[u8], compressed: &mut Vec<u8>) -> io::Result<()> {
pub fn compress(
uncompressed: &[u8],
compressed: &mut Vec<u8>,
compression_level: Option<i32>,
) -> io::Result<()> {
let count_size = std::mem::size_of::<u32>();
let max_size = zstd::zstd_safe::compress_bound(uncompressed.len()) + count_size;
@@ -14,7 +18,7 @@ pub fn compress(uncompressed: &[u8], compressed: &mut Vec<u8>) -> io::Result<()>
let compressed_size = compress_to_buffer(
uncompressed,
&mut compressed[count_size..],
DEFAULT_COMPRESSION_LEVEL,
compression_level.unwrap_or(DEFAULT_COMPRESSION_LEVEL),
)?;
compressed[0..count_size].copy_from_slice(&(uncompressed.len() as u32).to_le_bytes());

View File

@@ -1,6 +1,6 @@
use std::io;
use serde::{Deserialize, Serialize};
use serde::{Deserialize, Deserializer, Serialize};
pub trait StoreCompressor {
fn compress(&self, uncompressed: &[u8], compressed: &mut Vec<u8>) -> io::Result<()>;
@@ -12,23 +12,114 @@ pub trait StoreCompressor {
/// the compressor used to compress the doc store.
///
/// The default is Lz4Block, but also depends on the enabled feature flags.
#[derive(Clone, Debug, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Clone, Debug, Copy, PartialEq, Eq)]
pub enum Compressor {
#[serde(rename = "none")]
/// No compression
None,
#[serde(rename = "lz4")]
/// Use the lz4 compressor (block format)
Lz4,
#[serde(rename = "brotli")]
/// Use the brotli compressor
Brotli,
#[serde(rename = "snappy")]
/// Use the snap compressor
Snappy,
#[serde(rename = "zstd")]
/// Use the zstd compressor
Zstd,
Zstd(ZstdCompressor),
}
impl Serialize for Compressor {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where S: serde::Serializer {
match *self {
Compressor::None => serializer.serialize_str("none"),
Compressor::Lz4 => serializer.serialize_str("lz4"),
Compressor::Brotli => serializer.serialize_str("brotli"),
Compressor::Snappy => serializer.serialize_str("snappy"),
Compressor::Zstd(zstd) => serializer.serialize_str(&zstd.ser_to_string()),
}
}
}
impl<'de> Deserialize<'de> for Compressor {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where D: Deserializer<'de> {
let buf = String::deserialize(deserializer)?;
let compressor = match buf.as_str() {
"none" => Compressor::None,
"lz4" => Compressor::Lz4,
"brotli" => Compressor::Brotli,
"snappy" => Compressor::Snappy,
_ => {
if buf.starts_with("zstd") {
Compressor::Zstd(
ZstdCompressor::deser_from_str(&buf).map_err(serde::de::Error::custom)?,
)
} else {
return Err(serde::de::Error::unknown_variant(
&buf,
&[
"none",
"lz4",
"brotli",
"snappy",
"zstd",
"zstd(compression_level=5)",
],
));
}
}
};
Ok(compressor)
}
}
#[derive(Clone, Default, Debug, Copy, PartialEq, Eq, Serialize, Deserialize)]
/// The Zstd compressor, with optional compression level.
pub struct ZstdCompressor {
/// The compression level, if unset defaults to zstd::DEFAULT_COMPRESSION_LEVEL = 3
pub compression_level: Option<i32>,
}
impl ZstdCompressor {
fn deser_from_str(val: &str) -> Result<ZstdCompressor, String> {
if !val.starts_with("zstd") {
return Err(format!("needs to start with zstd, but got {}", val));
}
if val == "zstd" {
return Ok(ZstdCompressor::default());
}
let options = &val["zstd".len() + 1..val.len() - 1];
let mut compressor = ZstdCompressor::default();
for option in options.split(',') {
let (opt_name, value) = options
.split_once('=')
.ok_or_else(|| format!("no '=' found in option {:?}", option))?;
match opt_name {
"compression_level" => {
let value = value.parse::<i32>().map_err(|err| {
format!(
"Could not parse value {} of option {}, e: {}",
value, opt_name, err
)
})?;
compressor.compression_level = Some(value);
}
_ => {
return Err(format!("unknown zstd option {:?}", opt_name));
}
}
}
Ok(compressor)
}
fn ser_to_string(&self) -> String {
if let Some(compression_level) = self.compression_level {
format!("zstd(compression_level={})", compression_level)
} else {
"zstd".to_string()
}
}
}
impl Default for Compressor {
@@ -40,7 +131,7 @@ impl Default for Compressor {
} else if cfg!(feature = "snappy-compression") {
Compressor::Snappy
} else if cfg!(feature = "zstd-compression") {
Compressor::Zstd
Compressor::Zstd(ZstdCompressor::default())
} else {
Compressor::None
}
@@ -48,25 +139,6 @@ impl Default for Compressor {
}
impl Compressor {
pub(crate) fn from_id(id: u8) -> Compressor {
match id {
0 => Compressor::None,
1 => Compressor::Lz4,
2 => Compressor::Brotli,
3 => Compressor::Snappy,
4 => Compressor::Zstd,
_ => panic!("unknown compressor id {:?}", id),
}
}
pub(crate) fn get_id(&self) -> u8 {
match self {
Self::None => 0,
Self::Lz4 => 1,
Self::Brotli => 2,
Self::Snappy => 3,
Self::Zstd => 4,
}
}
#[inline]
pub(crate) fn compress_into(
&self,
@@ -109,71 +181,14 @@ impl Compressor {
panic!("snappy-compression feature flag not activated");
}
}
Self::Zstd => {
Self::Zstd(_zstd_compressor) => {
#[cfg(feature = "zstd-compression")]
{
super::compression_zstd_block::compress(uncompressed, compressed)
}
#[cfg(not(feature = "zstd-compression"))]
{
panic!("zstd-compression feature flag not activated");
}
}
}
}
pub(crate) fn decompress(&self, compressed_block: &[u8]) -> io::Result<Vec<u8>> {
let mut decompressed_block = vec![];
self.decompress_into(compressed_block, &mut decompressed_block)?;
Ok(decompressed_block)
}
#[inline]
pub(crate) fn decompress_into(
&self,
compressed: &[u8],
decompressed: &mut Vec<u8>,
) -> io::Result<()> {
match self {
Self::None => {
decompressed.clear();
decompressed.extend_from_slice(compressed);
Ok(())
}
Self::Lz4 => {
#[cfg(feature = "lz4-compression")]
{
super::compression_lz4_block::decompress(compressed, decompressed)
}
#[cfg(not(feature = "lz4-compression"))]
{
panic!("lz4-compression feature flag not activated");
}
}
Self::Brotli => {
#[cfg(feature = "brotli-compression")]
{
super::compression_brotli::decompress(compressed, decompressed)
}
#[cfg(not(feature = "brotli-compression"))]
{
panic!("brotli-compression feature flag not activated");
}
}
Self::Snappy => {
#[cfg(feature = "snappy-compression")]
{
super::compression_snap::decompress(compressed, decompressed)
}
#[cfg(not(feature = "snappy-compression"))]
{
panic!("snappy-compression feature flag not activated");
}
}
Self::Zstd => {
#[cfg(feature = "zstd-compression")]
{
super::compression_zstd_block::decompress(compressed, decompressed)
super::compression_zstd_block::compress(
uncompressed,
compressed,
_zstd_compressor.compression_level,
)
}
#[cfg(not(feature = "zstd-compression"))]
{
@@ -183,3 +198,55 @@ impl Compressor {
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn zstd_serde_roundtrip() {
let compressor = ZstdCompressor {
compression_level: Some(15),
};
assert_eq!(
ZstdCompressor::deser_from_str(&compressor.ser_to_string()).unwrap(),
compressor
);
assert_eq!(
ZstdCompressor::deser_from_str(&ZstdCompressor::default().ser_to_string()).unwrap(),
ZstdCompressor::default()
);
}
#[test]
fn deser_zstd_test() {
assert_eq!(
ZstdCompressor::deser_from_str("zstd").unwrap(),
ZstdCompressor::default()
);
assert!(ZstdCompressor::deser_from_str("zzstd").is_err());
assert!(ZstdCompressor::deser_from_str("zzstd()").is_err());
assert_eq!(
ZstdCompressor::deser_from_str("zstd(compression_level=15)").unwrap(),
ZstdCompressor {
compression_level: Some(15)
}
);
assert_eq!(
ZstdCompressor::deser_from_str("zstd(compresion_level=15)").unwrap_err(),
"unknown zstd option \"compresion_level\""
);
assert_eq!(
ZstdCompressor::deser_from_str("zstd(compression_level->2)").unwrap_err(),
"no '=' found in option \"compression_level->2\""
);
assert_eq!(
ZstdCompressor::deser_from_str("zstd(compression_level=over9000)").unwrap_err(),
"Could not parse value over9000 of option compression_level, e: invalid digit found \
in string"
);
}
}

140
src/store/decompressors.rs Normal file
View File

@@ -0,0 +1,140 @@
use std::io;
use serde::{Deserialize, Serialize};
use super::Compressor;
pub trait StoreCompressor {
fn compress(&self, uncompressed: &[u8], compressed: &mut Vec<u8>) -> io::Result<()>;
fn decompress(&self, compressed: &[u8], decompressed: &mut Vec<u8>) -> io::Result<()>;
fn get_compressor_id() -> u8;
}
/// Decompressor is deserialized from the doc store footer, when opening an index.
#[derive(Clone, Debug, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum Decompressor {
/// No compression
None,
/// Use the lz4 compressor (block format)
Lz4,
/// Use the brotli compressor
Brotli,
/// Use the snap compressor
Snappy,
/// Use the zstd compressor
Zstd,
}
impl From<Compressor> for Decompressor {
fn from(compressor: Compressor) -> Self {
match compressor {
Compressor::None => Decompressor::None,
Compressor::Lz4 => Decompressor::Lz4,
Compressor::Brotli => Decompressor::Brotli,
Compressor::Snappy => Decompressor::Snappy,
Compressor::Zstd(_) => Decompressor::Zstd,
}
}
}
impl Decompressor {
pub(crate) fn from_id(id: u8) -> Decompressor {
match id {
0 => Decompressor::None,
1 => Decompressor::Lz4,
2 => Decompressor::Brotli,
3 => Decompressor::Snappy,
4 => Decompressor::Zstd,
_ => panic!("unknown compressor id {:?}", id),
}
}
pub(crate) fn get_id(&self) -> u8 {
match self {
Self::None => 0,
Self::Lz4 => 1,
Self::Brotli => 2,
Self::Snappy => 3,
Self::Zstd => 4,
}
}
pub(crate) fn decompress(&self, compressed_block: &[u8]) -> io::Result<Vec<u8>> {
let mut decompressed_block = vec![];
self.decompress_into(compressed_block, &mut decompressed_block)?;
Ok(decompressed_block)
}
#[inline]
pub(crate) fn decompress_into(
&self,
compressed: &[u8],
decompressed: &mut Vec<u8>,
) -> io::Result<()> {
match self {
Self::None => {
decompressed.clear();
decompressed.extend_from_slice(compressed);
Ok(())
}
Self::Lz4 => {
#[cfg(feature = "lz4-compression")]
{
super::compression_lz4_block::decompress(compressed, decompressed)
}
#[cfg(not(feature = "lz4-compression"))]
{
panic!("lz4-compression feature flag not activated");
}
}
Self::Brotli => {
#[cfg(feature = "brotli-compression")]
{
super::compression_brotli::decompress(compressed, decompressed)
}
#[cfg(not(feature = "brotli-compression"))]
{
panic!("brotli-compression feature flag not activated");
}
}
Self::Snappy => {
#[cfg(feature = "snappy-compression")]
{
super::compression_snap::decompress(compressed, decompressed)
}
#[cfg(not(feature = "snappy-compression"))]
{
panic!("snappy-compression feature flag not activated");
}
}
Self::Zstd => {
#[cfg(feature = "zstd-compression")]
{
super::compression_zstd_block::decompress(compressed, decompressed)
}
#[cfg(not(feature = "zstd-compression"))]
{
panic!("zstd-compression feature flag not activated");
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::store::Compressor;
#[test]
fn compressor_decompressor_id_test() {
assert_eq!(Decompressor::from(Compressor::None), Decompressor::None);
assert_eq!(Decompressor::from(Compressor::Lz4), Decompressor::Lz4);
assert_eq!(Decompressor::from(Compressor::Brotli), Decompressor::Brotli);
assert_eq!(Decompressor::from(Compressor::Snappy), Decompressor::Snappy);
assert_eq!(
Decompressor::from(Compressor::Zstd(Default::default())),
Decompressor::Zstd
);
}
}

View File

@@ -2,13 +2,13 @@ use std::io;
use common::{BinarySerializable, FixedSize, HasLen};
use super::Decompressor;
use crate::directory::FileSlice;
use crate::store::Compressor;
#[derive(Debug, Clone, PartialEq)]
pub struct DocStoreFooter {
pub offset: u64,
pub compressor: Compressor,
pub decompressor: Decompressor,
}
/// Serialises the footer to a byte-array
@@ -18,7 +18,7 @@ pub struct DocStoreFooter {
impl BinarySerializable for DocStoreFooter {
fn serialize<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
BinarySerializable::serialize(&self.offset, writer)?;
BinarySerializable::serialize(&self.compressor.get_id(), writer)?;
BinarySerializable::serialize(&self.decompressor.get_id(), writer)?;
writer.write_all(&[0; 15])?;
Ok(())
}
@@ -30,7 +30,7 @@ impl BinarySerializable for DocStoreFooter {
reader.read_exact(&mut skip_buf)?;
Ok(DocStoreFooter {
offset,
compressor: Compressor::from_id(compressor_id),
decompressor: Decompressor::from_id(compressor_id),
})
}
}
@@ -40,8 +40,11 @@ impl FixedSize for DocStoreFooter {
}
impl DocStoreFooter {
pub fn new(offset: u64, compressor: Compressor) -> Self {
DocStoreFooter { offset, compressor }
pub fn new(offset: u64, decompressor: Decompressor) -> Self {
DocStoreFooter {
offset,
decompressor,
}
}
pub fn extract_footer(file: FileSlice) -> io::Result<(DocStoreFooter, FileSlice)> {

View File

@@ -33,11 +33,13 @@
//! !
mod compressors;
mod decompressors;
mod footer;
mod index;
mod reader;
mod writer;
pub use self::compressors::Compressor;
pub use self::compressors::{Compressor, ZstdCompressor};
pub use self::decompressors::Decompressor;
pub use self::reader::StoreReader;
pub use self::writer::StoreWriter;
@@ -196,7 +198,7 @@ pub mod tests {
#[cfg(feature = "zstd-compression")]
#[test]
fn test_store_zstd() -> crate::Result<()> {
test_store(Compressor::Zstd, BLOCK_SIZE)
test_store(Compressor::Zstd(ZstdCompressor::default()), BLOCK_SIZE)
}
#[test]
@@ -267,8 +269,8 @@ pub mod tests {
index.reader().unwrap().searcher().segment_readers()[0]
.get_store_reader()
.unwrap()
.compressor(),
Compressor::Lz4
.decompressor(),
Decompressor::Lz4
);
// Change compressor, this disables stacking on merging
let index_settings = index.settings_mut();
@@ -294,7 +296,7 @@ pub mod tests {
LOREM.to_string()
);
}
assert_eq!(store.compressor(), Compressor::Snappy);
assert_eq!(store.decompressor(), Decompressor::Snappy);
Ok(())
}

View File

@@ -8,7 +8,7 @@ use ownedbytes::OwnedBytes;
use super::footer::DocStoreFooter;
use super::index::SkipIndex;
use super::Compressor;
use super::Decompressor;
use crate::directory::FileSlice;
use crate::error::DataCorruption;
use crate::fastfield::AliveBitSet;
@@ -23,7 +23,7 @@ type Block = OwnedBytes;
/// Reads document off tantivy's [`Store`](./index.html)
pub struct StoreReader {
compressor: Compressor,
decompressor: Decompressor,
data: FileSlice,
skip_index: Arc<SkipIndex>,
space_usage: StoreSpaceUsage,
@@ -87,7 +87,7 @@ impl StoreReader {
let space_usage = StoreSpaceUsage::new(data_file.len(), offset_index_file.len());
let skip_index = SkipIndex::open(index_data);
Ok(StoreReader {
compressor: footer.compressor,
decompressor: footer.decompressor,
data: data_file,
cache: BlockCache {
cache: Mutex::new(LruCache::new(LRU_CACHE_CAPACITY)),
@@ -103,8 +103,8 @@ impl StoreReader {
self.skip_index.checkpoints()
}
pub(crate) fn compressor(&self) -> Compressor {
self.compressor
pub(crate) fn decompressor(&self) -> Decompressor {
self.decompressor
}
/// Returns the cache hit and miss statistics of the store reader.
@@ -141,7 +141,7 @@ impl StoreReader {
let compressed_block = self.get_compressed_block(checkpoint)?;
let decompressed_block =
OwnedBytes::new(self.compressor.decompress(compressed_block.as_ref())?);
OwnedBytes::new(self.decompressor.decompress(compressed_block.as_ref())?);
self.cache
.put_into_cache(cache_key, decompressed_block.clone());
@@ -321,7 +321,7 @@ impl StoreReader {
.await?;
let decompressed_block =
OwnedBytes::new(self.compressor.decompress(compressed_block.as_ref())?);
OwnedBytes::new(self.decompressor.decompress(compressed_block.as_ref())?);
self.cache
.put_into_cache(cache_key, decompressed_block.clone());
@@ -351,6 +351,7 @@ mod tests {
use crate::directory::RamDirectory;
use crate::schema::{Document, Field};
use crate::store::tests::write_lorem_ipsum_store;
use crate::store::Compressor;
use crate::Directory;
const BLOCK_SIZE: usize = 16_384;

View File

@@ -5,7 +5,7 @@ use common::{BinarySerializable, CountingWriter, VInt};
use super::compressors::Compressor;
use super::footer::DocStoreFooter;
use super::index::SkipIndexBuilder;
use super::StoreReader;
use super::{Decompressor, StoreReader};
use crate::directory::{TerminatingWrite, WritePtr};
use crate::schema::Document;
use crate::store::index::Checkpoint;
@@ -152,7 +152,7 @@ impl StoreWriter {
self.write_and_compress_block()?;
}
let header_offset: u64 = self.writer.written_bytes() as u64;
let footer = DocStoreFooter::new(header_offset, self.compressor);
let footer = DocStoreFooter::new(header_offset, Decompressor::from(self.compressor));
self.offset_index_writer.write(&mut self.writer)?;
footer.serialize(&mut self.writer)?;
self.writer.terminate()