diff --git a/Cargo.lock b/Cargo.lock index a1dfa0d46b..619a48eba0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7284,8 +7284,7 @@ checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0" [[package]] name = "memcomparable" version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "376101dbd964fc502d5902216e180f92b3d003b5cc3d2e40e044eb5470fca677" +source = "git+https://github.com/v0y4g3r/memcomparable.git?rev=a07122dc03556bbd88ad66234cbea7efd3b23efb#a07122dc03556bbd88ad66234cbea7efd3b23efb" dependencies = [ "bytes", "serde", @@ -7603,7 +7602,6 @@ dependencies = [ "itertools 0.14.0", "lazy_static", "log-store", - "memcomparable", "mito-codec", "moka", "object-store", diff --git a/src/common/datasource/src/file_format/parquet.rs b/src/common/datasource/src/file_format/parquet.rs index 6bbb1a6d54..fbaf7f6e3d 100644 --- a/src/common/datasource/src/file_format/parquet.rs +++ b/src/common/datasource/src/file_format/parquet.rs @@ -196,7 +196,10 @@ pub async fn stream_to_parquet( concurrency: usize, ) -> Result { let write_props = column_wise_config( - WriterProperties::builder().set_compression(Compression::ZSTD(ZstdLevel::default())), + WriterProperties::builder() + .set_compression(Compression::ZSTD(ZstdLevel::default())) + .set_statistics_truncate_length(None) + .set_column_index_truncate_length(None), schema, ) .build(); diff --git a/src/metric-engine/src/engine/flush.rs b/src/metric-engine/src/engine/flush.rs index 156ff2fc9d..72dd4535da 100644 --- a/src/metric-engine/src/engine/flush.rs +++ b/src/metric-engine/src/engine/flush.rs @@ -127,10 +127,10 @@ mod tests { r#" ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", level: 0, file_path: "test_metric_region/11_0000000001/data/.parquet", file_size: 3157, index_file_path: Some("test_metric_region/11_0000000001/data/index/.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(20) } ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "", level: 0, file_path: "test_metric_region/11_0000000002/data/.parquet", file_size: 3157, index_file_path: Some("test_metric_region/11_0000000002/data/index/.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10) } -ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417473(11, 16777217), table_id: 11, region_number: 16777217, region_group: 1, region_sequence: 1, file_id: "", level: 0, file_path: "test_metric_region/11_0000000001/metadata/.parquet", file_size: 3201, index_file_path: None, index_file_size: None, num_rows: 8, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(8) } -ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "", level: 0, file_path: "test_metric_region/11_0000000002/metadata/.parquet", file_size: 3185, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4) } +ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417473(11, 16777217), table_id: 11, region_number: 16777217, region_group: 1, region_sequence: 1, file_id: "", level: 0, file_path: "test_metric_region/11_0000000001/metadata/.parquet", file_size: 3429, index_file_path: None, index_file_size: None, num_rows: 8, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(8) } +ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "", level: 0, file_path: "test_metric_region/11_0000000002/metadata/.parquet", file_size: 3413, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4) } ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "", level: 0, file_path: "test_metric_region/22_0000000042/data/.parquet", file_size: 3157, index_file_path: Some("test_metric_region/22_0000000042/data/index/.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10) } -ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "", level: 0, file_path: "test_metric_region/22_0000000042/metadata/.parquet", file_size: 3185, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4) }"# +ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "", level: 0, file_path: "test_metric_region/22_0000000042/metadata/.parquet", file_size: 3413, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4) }"# ); // list from storage diff --git a/src/mito-codec/Cargo.toml b/src/mito-codec/Cargo.toml index a96d24d963..99a46e8ac9 100644 --- a/src/mito-codec/Cargo.toml +++ b/src/mito-codec/Cargo.toml @@ -19,7 +19,7 @@ common-recordbatch.workspace = true common-telemetry.workspace = true common-time.workspace = true datatypes.workspace = true -memcomparable = "0.2" +memcomparable = { git = "https://github.com/v0y4g3r/memcomparable.git", rev = "a07122dc03556bbd88ad66234cbea7efd3b23efb" } paste.workspace = true serde.workspace = true snafu.workspace = true diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 1bbc481965..14be8e464c 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -50,7 +50,6 @@ index.workspace = true itertools.workspace = true lazy_static = "1.4" log-store = { workspace = true } -memcomparable = "0.2" mito-codec.workspace = true moka = { workspace = true, features = ["sync", "future"] } object-store.workspace = true diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index febd2be45e..1a0806e2f3 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -53,6 +53,8 @@ mod prune_test; #[cfg(test)] mod row_selector_test; #[cfg(test)] +mod scan_corrupt; +#[cfg(test)] mod scan_test; #[cfg(test)] mod set_role_state_test; diff --git a/src/mito2/src/engine/scan_corrupt.rs b/src/mito2/src/engine/scan_corrupt.rs new file mode 100644 index 0000000000..46ac635f8f --- /dev/null +++ b/src/mito2/src/engine/scan_corrupt.rs @@ -0,0 +1,112 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::helper::row; +use api::v1::value::ValueData; +use api::v1::Rows; +use datatypes::value::Value; +use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec}; +use parquet::file::statistics::Statistics; +use store_api::region_engine::RegionEngine; +use store_api::region_request::{PathType, RegionRequest}; +use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME; +use store_api::storage::RegionId; + +use crate::config::MitoConfig; +use crate::sst::parquet::reader::ParquetReaderBuilder; +use crate::test_util; +use crate::test_util::{CreateRequestBuilder, TestEnv}; + +#[tokio::test] +async fn test_scan_corrupt() { + common_telemetry::init_default_ut_logging(); + let mut env = TestEnv::with_prefix("test_write_stats_with_long_string_value").await; + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + let table_dir = request.table_dir.clone(); + let column_schemas = test_util::rows_schema(&request); + + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let build_rows = |start: i32, end: i32| { + (start..end) + .map(|i| { + row(vec![ + ValueData::StringValue(i.to_string().repeat(128)), + ValueData::F64Value(i as f64), + ValueData::TimestampMillisecondValue(i as i64 * 1000), + ]) + }) + .collect() + }; + let put_rows = async |start, end| { + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows(start, end), + }; + test_util::put_rows(&engine, region_id, rows).await; + test_util::flush_region(&engine, region_id, None).await; + }; + put_rows(0, 3).await; + + let region = engine.get_region(region_id).unwrap(); + + let version = region.version(); + let file = version + .ssts + .levels() + .iter() + .flat_map(|l| l.files.values()) + .next() + .unwrap(); + + let object_store = env.get_object_store().unwrap(); + let reader = ParquetReaderBuilder::new( + table_dir.clone(), + PathType::Bare, + file.clone(), + object_store.clone(), + ) + .build() + .await + .unwrap(); + + let codec = DensePrimaryKeyCodec::new(&version.metadata); + for r in reader.parquet_metadata().row_groups() { + for c in r.columns() { + if c.column_descr().name() == PRIMARY_KEY_COLUMN_NAME { + let stats = c.statistics().unwrap(); + let Statistics::ByteArray(b) = stats else { + unreachable!() + }; + let min = codec + .decode_leftmost(b.min_bytes_opt().unwrap()) + .unwrap() + .unwrap(); + assert_eq!(Value::String("0".repeat(128).into()), min); + + let max = codec + .decode_leftmost(b.max_bytes_opt().unwrap()) + .unwrap() + .unwrap(); + assert_eq!(Value::String("2".repeat(128).into()), max); + } + } + } +} diff --git a/src/mito2/src/memtable/bulk/part.rs b/src/mito2/src/memtable/bulk/part.rs index d57e76ea4e..e0e28c63d2 100644 --- a/src/mito2/src/memtable/bulk/part.rs +++ b/src/mito2/src/memtable/bulk/part.rs @@ -554,6 +554,8 @@ impl BulkPartEncoder { WriterProperties::builder() .set_write_batch_size(row_group_size) .set_max_row_group_size(row_group_size) + .set_column_index_truncate_length(None) + .set_statistics_truncate_length(None) .build(), ); Self { diff --git a/src/mito2/src/memtable/partition_tree/data.rs b/src/mito2/src/memtable/partition_tree/data.rs index 016583277b..8c56757f33 100644 --- a/src/mito2/src/memtable/partition_tree/data.rs +++ b/src/mito2/src/memtable/partition_tree/data.rs @@ -774,7 +774,9 @@ impl<'a> DataPartEncoder<'a> { .set_column_encoding(sequence_col.clone(), Encoding::DELTA_BINARY_PACKED) .set_column_dictionary_enabled(sequence_col, false) .set_column_encoding(op_type_col.clone(), Encoding::DELTA_BINARY_PACKED) - .set_column_dictionary_enabled(op_type_col, true); + .set_column_dictionary_enabled(op_type_col, true) + .set_column_index_truncate_length(None) + .set_statistics_truncate_length(None); builder.build() } diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index bec76cf128..0715f1aac4 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -329,7 +329,9 @@ where .set_key_value_metadata(Some(vec![key_value_meta])) .set_compression(Compression::ZSTD(ZstdLevel::default())) .set_encoding(Encoding::PLAIN) - .set_max_row_group_size(opts.row_group_size); + .set_max_row_group_size(opts.row_group_size) + .set_column_index_truncate_length(None) + .set_statistics_truncate_length(None); let props_builder = Self::customize_column_config(props_builder, &self.metadata); let writer_props = props_builder.build();