diff --git a/Cargo.toml b/Cargo.toml index cab07efcd..d6f83f800 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -112,13 +112,16 @@ debug-assertions = true overflow-checks = true [features] -default = ["mmap", "stopwords", "lz4-compression"] +default = ["mmap", "stopwords", "lz4-compression", "columnar-zstd-compression"] mmap = ["fs4", "tempfile", "memmap2"] stopwords = [] lz4-compression = ["lz4_flex"] zstd-compression = ["zstd"] +# enable zstd-compression in columnar (and sstable) +columnar-zstd-compression = ["columnar/zstd-compression"] + failpoints = ["fail", "fail/failpoints"] unstable = [] # useful for benches. diff --git a/columnar/Cargo.toml b/columnar/Cargo.toml index b17606c08..7c741b0b5 100644 --- a/columnar/Cargo.toml +++ b/columnar/Cargo.toml @@ -33,6 +33,6 @@ harness = false name = "bench_access" harness = false - [features] unstable = [] +zstd-compression = ["sstable/zstd-compression"] diff --git a/sstable/Cargo.toml b/sstable/Cargo.toml index 2b2928bb2..52a183d6c 100644 --- a/sstable/Cargo.toml +++ b/sstable/Cargo.toml @@ -16,7 +16,10 @@ itertools = "0.14.0" tantivy-bitpacker = { version= "0.6", path="../bitpacker" } tantivy-fst = "0.5" # experimental gives us access to Decompressor::upper_bound -zstd = { version = "0.13", features = ["experimental"] } +zstd = { version = "0.13", optional = true, features = ["experimental"] } + +[features] +zstd-compression = ["zstd"] [dev-dependencies] proptest = "1" diff --git a/sstable/src/block_reader.rs b/sstable/src/block_reader.rs index 8a86e83f7..0de2b44cd 100644 --- a/sstable/src/block_reader.rs +++ b/sstable/src/block_reader.rs @@ -2,6 +2,7 @@ use std::io::{self, Read}; use std::ops::Range; use common::OwnedBytes; +#[cfg(feature = "zstd-compression")] use zstd::bulk::Decompressor; pub struct BlockReader { @@ -82,13 +83,23 @@ impl BlockReader { )); } if compress == 1 { - let required_capacity = - Decompressor::upper_bound(&self.reader[..block_len]).unwrap_or(1024 * 1024); - self.buffer.reserve(required_capacity); - Decompressor::new()? - .decompress_to_buffer(&self.reader[..block_len], &mut self.buffer)?; + #[cfg(feature = "zstd-compression")] + { + let required_capacity = + Decompressor::upper_bound(&self.reader[..block_len]).unwrap_or(1024 * 1024); + self.buffer.reserve(required_capacity); + Decompressor::new()? + .decompress_to_buffer(&self.reader[..block_len], &mut self.buffer)?; - self.reader.advance(block_len); + self.reader.advance(block_len); + } + + if cfg!(not(feature = "zstd-compression")) { + return Err(io::Error::new( + io::ErrorKind::Unsupported, + "zstd-compression feature is not enabled", + )); + } } else { self.buffer.resize(block_len, 0u8); self.reader.read_exact(&mut self.buffer[..])?; diff --git a/sstable/src/delta.rs b/sstable/src/delta.rs index 9542c7905..4d4317ea8 100644 --- a/sstable/src/delta.rs +++ b/sstable/src/delta.rs @@ -2,6 +2,7 @@ use std::io::{self, BufWriter, Write}; use std::ops::Range; use common::{CountingWriter, OwnedBytes}; +#[cfg(feature = "zstd-compression")] use zstd::bulk::Compressor; use super::value::ValueWriter; @@ -53,25 +54,28 @@ where let block_len = buffer.len() + self.block.len(); - if block_len > 2048 { - buffer.extend_from_slice(&self.block); - self.block.clear(); + if cfg!(feature = "zstd-compression") && block_len > 2048 { + #[cfg(feature = "zstd-compression")] + { + buffer.extend_from_slice(&self.block); + self.block.clear(); - let max_len = zstd::zstd_safe::compress_bound(buffer.len()); - self.block.reserve(max_len); - Compressor::new(3)?.compress_to_buffer(buffer, &mut self.block)?; + let max_len = zstd::zstd_safe::compress_bound(buffer.len()); + self.block.reserve(max_len); + Compressor::new(3)?.compress_to_buffer(buffer, &mut self.block)?; - // verify compression had a positive impact - if self.block.len() < buffer.len() { - self.write - .write_all(&(self.block.len() as u32 + 1).to_le_bytes())?; - self.write.write_all(&[1])?; - self.write.write_all(&self.block[..])?; - } else { - self.write - .write_all(&(block_len as u32 + 1).to_le_bytes())?; - self.write.write_all(&[0])?; - self.write.write_all(&buffer[..])?; + // verify compression had a positive impact + if self.block.len() < buffer.len() { + self.write + .write_all(&(self.block.len() as u32 + 1).to_le_bytes())?; + self.write.write_all(&[1])?; + self.write.write_all(&self.block[..])?; + } else { + self.write + .write_all(&(block_len as u32 + 1).to_le_bytes())?; + self.write.write_all(&[0])?; + self.write.write_all(&buffer[..])?; + } } } else { self.write