diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index d05a920562..926a8c8328 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -502,9 +502,9 @@ async fn test_region_usage() { flush_region(&engine, region_id, None).await; let region_stat = region.region_usage().await; - assert!(region_stat.wal_usage == 0); - assert_eq!(region_stat.sst_usage, 2827); + assert_eq!(region_stat.wal_usage, 0); + assert_eq!(region_stat.sst_usage, 2742); // region total usage - assert_eq!(region_stat.disk_usage(), 3833); + assert_eq!(region_stat.disk_usage(), 3748); } diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index 14d6a9da3e..aa68a1e1c6 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -19,7 +19,7 @@ use common_time::Timestamp; use object_store::ObjectStore; use parquet::basic::{Compression, Encoding, ZstdLevel}; use parquet::file::metadata::KeyValue; -use parquet::file::properties::WriterProperties; +use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder}; use parquet::schema::types::ColumnPath; use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; @@ -64,26 +64,15 @@ impl ParquetWriter { pub async fn write_all(&mut self, opts: &WriteOptions) -> Result> { let json = self.metadata.to_json().context(InvalidMetadataSnafu)?; let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json); - let ts_column = self.metadata.time_index_column(); // TODO(yingwen): Find and set proper column encoding for internal columns: op type and tsid. let props_builder = WriterProperties::builder() .set_key_value_metadata(Some(vec![key_value_meta])) .set_compression(Compression::ZSTD(ZstdLevel::default())) .set_encoding(Encoding::PLAIN) - .set_max_row_group_size(opts.row_group_size) - .set_column_encoding( - ColumnPath::new(vec![SEQUENCE_COLUMN_NAME.to_string()]), - Encoding::DELTA_BINARY_PACKED, - ) - .set_column_dictionary_enabled( - ColumnPath::new(vec![SEQUENCE_COLUMN_NAME.to_string()]), - false, - ) - .set_column_encoding( - ColumnPath::new(vec![ts_column.column_schema.name.clone()]), - Encoding::DELTA_BINARY_PACKED, - ); + .set_max_row_group_size(opts.row_group_size); + + let props_builder = Self::customize_column_config(props_builder, &self.metadata); let writer_props = props_builder.build(); let write_format = WriteFormat::new(self.metadata.clone()); @@ -125,6 +114,25 @@ impl ParquetWriter { num_rows: stats.num_rows, })) } + + /// Customizes per-column config according to schema and maybe column cardinality. + fn customize_column_config( + builder: WriterPropertiesBuilder, + region_metadata: &RegionMetadataRef, + ) -> WriterPropertiesBuilder { + let ts_col = ColumnPath::new(vec![region_metadata + .time_index_column() + .column_schema + .name + .clone()]); + let seq_col = ColumnPath::new(vec![SEQUENCE_COLUMN_NAME.to_string()]); + + builder + .set_column_encoding(seq_col.clone(), Encoding::DELTA_BINARY_PACKED) + .set_column_dictionary_enabled(seq_col, false) + .set_column_encoding(ts_col.clone(), Encoding::DELTA_BINARY_PACKED) + .set_column_dictionary_enabled(ts_col, false) + } } #[derive(Default)]