From 29b9b7db0c38a710a4268334eea4518c1911128e Mon Sep 17 00:00:00 2001 From: evenyag Date: Wed, 6 Nov 2024 18:58:20 +0800 Subject: [PATCH] feat: support compression method --- src/mito2/src/compaction/compactor.rs | 1 + src/mito2/src/config.rs | 12 ++++++++++++ src/mito2/src/flush.rs | 1 + src/mito2/src/sst/parquet.rs | 4 ++++ src/mito2/src/sst/parquet/writer.rs | 8 +++++++- tests-integration/tests/http.rs | 1 + 6 files changed, 26 insertions(+), 1 deletion(-) diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 004b223053..d54b14d4fd 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -259,6 +259,7 @@ impl Compactor for DefaultCompactor { let write_opts = WriteOptions { write_buffer_size: compaction_region.engine_config.sst_write_buffer_size, + compression_method: compaction_region.engine_config.compression_method, ..Default::default() }; diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index f36d9dd221..5dff70b5b6 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -131,6 +131,17 @@ pub struct MitoConfig { /// Skip wal pub skip_wal: bool, + /// SST compression method. + pub compression_method: CompressionMethod, +} + +#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum CompressionMethod { + #[default] + Zstd, + Lz4, + None, } impl Default for MitoConfig { @@ -165,6 +176,7 @@ impl Default for MitoConfig { memtable: MemtableConfig::default(), min_compaction_interval: Duration::from_secs(0), skip_wal: false, + compression_method: CompressionMethod::Zstd, }; // Adjust buffer and cache size according to system memory if we can. diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 9606e92d04..13b118e4a7 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -321,6 +321,7 @@ impl RegionFlushTask { let mut write_opts = WriteOptions { write_buffer_size: self.engine_config.sst_write_buffer_size, + compression_method: self.engine_config.compression_method, ..Default::default() }; if let Some(row_group_size) = self.row_group_size { diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index ae51a0d37c..eab471d786 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use common_base::readable_size::ReadableSize; use parquet::file::metadata::ParquetMetaData; +use crate::config::CompressionMethod; use crate::sst::file::FileTimeRange; use crate::sst::index::IndexOutput; use crate::sst::DEFAULT_WRITE_BUFFER_SIZE; @@ -49,6 +50,8 @@ pub struct WriteOptions { pub write_buffer_size: ReadableSize, /// Row group size. pub row_group_size: usize, + /// Compression method. + pub compression_method: CompressionMethod, } impl Default for WriteOptions { @@ -56,6 +59,7 @@ impl Default for WriteOptions { WriteOptions { write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE, row_group_size: DEFAULT_ROW_GROUP_SIZE, + compression_method: CompressionMethod::default(), } } } diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index 1d63f5e3d0..bc50401906 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -34,6 +34,7 @@ use store_api::storage::consts::SEQUENCE_COLUMN_NAME; use tokio::io::AsyncWrite; use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt}; +use crate::config::CompressionMethod; use crate::error::{InvalidMetadataSnafu, OpenDalSnafu, Result, WriteParquetSnafu}; use crate::read::{Batch, Source}; use crate::sst::index::Indexer; @@ -217,9 +218,14 @@ where let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json); // TODO(yingwen): Find and set proper column encoding for internal columns: op type and tsid. + let compression = match opts.compression_method { + CompressionMethod::Zstd => Compression::ZSTD(ZstdLevel::default()), + CompressionMethod::Lz4 => Compression::LZ4_RAW, + CompressionMethod::None => Compression::UNCOMPRESSED, + }; let props_builder = WriterProperties::builder() .set_key_value_metadata(Some(vec![key_value_meta])) - .set_compression(Compression::ZSTD(ZstdLevel::default())) + .set_compression(compression) .set_encoding(Encoding::PLAIN) .set_max_row_group_size(opts.row_group_size); diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 314cd05e72..99d8d03ca3 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -898,6 +898,7 @@ parallel_scan_channel_size = 32 allow_stale_entries = false min_compaction_interval = "0s" skip_wal = false +compression_method = "zstd" [region_engine.mito.index] aux_path = ""