feat: support compression method

This commit is contained in:
evenyag
2024-11-06 18:58:20 +08:00
parent a66909a562
commit 29b9b7db0c
6 changed files with 26 additions and 1 deletions

View File

@@ -259,6 +259,7 @@ impl Compactor for DefaultCompactor {
let write_opts = WriteOptions {
write_buffer_size: compaction_region.engine_config.sst_write_buffer_size,
compression_method: compaction_region.engine_config.compression_method,
..Default::default()
};

View File

@@ -131,6 +131,17 @@ pub struct MitoConfig {
/// Skip wal
pub skip_wal: bool,
/// SST compression method.
pub compression_method: CompressionMethod,
}
#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum CompressionMethod {
#[default]
Zstd,
Lz4,
None,
}
impl Default for MitoConfig {
@@ -165,6 +176,7 @@ impl Default for MitoConfig {
memtable: MemtableConfig::default(),
min_compaction_interval: Duration::from_secs(0),
skip_wal: false,
compression_method: CompressionMethod::Zstd,
};
// Adjust buffer and cache size according to system memory if we can.

View File

@@ -321,6 +321,7 @@ impl RegionFlushTask {
let mut write_opts = WriteOptions {
write_buffer_size: self.engine_config.sst_write_buffer_size,
compression_method: self.engine_config.compression_method,
..Default::default()
};
if let Some(row_group_size) = self.row_group_size {

View File

@@ -19,6 +19,7 @@ use std::sync::Arc;
use common_base::readable_size::ReadableSize;
use parquet::file::metadata::ParquetMetaData;
use crate::config::CompressionMethod;
use crate::sst::file::FileTimeRange;
use crate::sst::index::IndexOutput;
use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
@@ -49,6 +50,8 @@ pub struct WriteOptions {
pub write_buffer_size: ReadableSize,
/// Row group size.
pub row_group_size: usize,
/// Compression method.
pub compression_method: CompressionMethod,
}
impl Default for WriteOptions {
@@ -56,6 +59,7 @@ impl Default for WriteOptions {
WriteOptions {
write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
row_group_size: DEFAULT_ROW_GROUP_SIZE,
compression_method: CompressionMethod::default(),
}
}
}

View File

@@ -34,6 +34,7 @@ use store_api::storage::consts::SEQUENCE_COLUMN_NAME;
use tokio::io::AsyncWrite;
use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
use crate::config::CompressionMethod;
use crate::error::{InvalidMetadataSnafu, OpenDalSnafu, Result, WriteParquetSnafu};
use crate::read::{Batch, Source};
use crate::sst::index::Indexer;
@@ -217,9 +218,14 @@ where
let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
// TODO(yingwen): Find and set proper column encoding for internal columns: op type and tsid.
let compression = match opts.compression_method {
CompressionMethod::Zstd => Compression::ZSTD(ZstdLevel::default()),
CompressionMethod::Lz4 => Compression::LZ4_RAW,
CompressionMethod::None => Compression::UNCOMPRESSED,
};
let props_builder = WriterProperties::builder()
.set_key_value_metadata(Some(vec![key_value_meta]))
.set_compression(Compression::ZSTD(ZstdLevel::default()))
.set_compression(compression)
.set_encoding(Encoding::PLAIN)
.set_max_row_group_size(opts.row_group_size);

View File

@@ -898,6 +898,7 @@ parallel_scan_channel_size = 32
allow_stale_entries = false
min_compaction_interval = "0s"
skip_wal = false
compression_method = "zstd"
[region_engine.mito.index]
aux_path = ""