fix: avoid truncating SST statistics during flush (#6977)

fix/disable-parquet-stats-truncate:
 - **Update `memcomparable` Dependency**: Switched from crates.io to a Git repository for `memcomparable` in `Cargo.lock`, `mito-codec/Cargo.toml`, and removed it from `mito2/Cargo.toml`.
 - **Enhance Parquet Writer Properties**: Added `set_statistics_truncate_length` and `set_column_index_truncate_length` to `WriterProperties` in `parquet.rs`, `bulk/part.rs`, `partition_tree/data.rs`, and `writer.rs`.
 - **Add Test for Corrupt Scan**: Introduced a new test module `scan_corrupt.rs` in `mito2/src/engine` to verify handling of corrupt data.
 - **Update Test Data**: Modified test data in `flush.rs` to reflect changes in file sizes and sequences.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
Lei, HUANG
2025-09-17 11:02:52 +08:00
committed by Weny Xu
parent a0587e2e87
commit 849ae8ebb6
10 changed files with 131 additions and 11 deletions

4
Cargo.lock generated
View File

@@ -7284,8 +7284,7 @@ checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0"
[[package]]
name = "memcomparable"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "376101dbd964fc502d5902216e180f92b3d003b5cc3d2e40e044eb5470fca677"
source = "git+https://github.com/v0y4g3r/memcomparable.git?rev=a07122dc03556bbd88ad66234cbea7efd3b23efb#a07122dc03556bbd88ad66234cbea7efd3b23efb"
dependencies = [
"bytes",
"serde",
@@ -7603,7 +7602,6 @@ dependencies = [
"itertools 0.14.0",
"lazy_static",
"log-store",
"memcomparable",
"mito-codec",
"moka",
"object-store",

View File

@@ -196,7 +196,10 @@ pub async fn stream_to_parquet(
concurrency: usize,
) -> Result<usize> {
let write_props = column_wise_config(
WriterProperties::builder().set_compression(Compression::ZSTD(ZstdLevel::default())),
WriterProperties::builder()
.set_compression(Compression::ZSTD(ZstdLevel::default()))
.set_statistics_truncate_length(None)
.set_column_index_truncate_length(None),
schema,
)
.build();

View File

@@ -127,10 +127,10 @@ mod tests {
r#"
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", level: 0, file_path: "test_metric_region/11_0000000001/data/<file_id>.parquet", file_size: 3157, index_file_path: Some("test_metric_region/11_0000000001/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(20) }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", level: 0, file_path: "test_metric_region/11_0000000002/data/<file_id>.parquet", file_size: 3157, index_file_path: Some("test_metric_region/11_0000000002/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10) }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417473(11, 16777217), table_id: 11, region_number: 16777217, region_group: 1, region_sequence: 1, file_id: "<file_id>", level: 0, file_path: "test_metric_region/11_0000000001/metadata/<file_id>.parquet", file_size: 3201, index_file_path: None, index_file_size: None, num_rows: 8, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(8) }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "<file_id>", level: 0, file_path: "test_metric_region/11_0000000002/metadata/<file_id>.parquet", file_size: 3185, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4) }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417473(11, 16777217), table_id: 11, region_number: 16777217, region_group: 1, region_sequence: 1, file_id: "<file_id>", level: 0, file_path: "test_metric_region/11_0000000001/metadata/<file_id>.parquet", file_size: 3429, index_file_path: None, index_file_size: None, num_rows: 8, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(8) }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "<file_id>", level: 0, file_path: "test_metric_region/11_0000000002/metadata/<file_id>.parquet", file_size: 3413, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4) }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", level: 0, file_path: "test_metric_region/22_0000000042/data/<file_id>.parquet", file_size: 3157, index_file_path: Some("test_metric_region/22_0000000042/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10) }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "<file_id>", level: 0, file_path: "test_metric_region/22_0000000042/metadata/<file_id>.parquet", file_size: 3185, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4) }"#
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "<file_id>", level: 0, file_path: "test_metric_region/22_0000000042/metadata/<file_id>.parquet", file_size: 3413, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4) }"#
);
// list from storage

View File

@@ -19,7 +19,7 @@ common-recordbatch.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
datatypes.workspace = true
memcomparable = "0.2"
memcomparable = { git = "https://github.com/v0y4g3r/memcomparable.git", rev = "a07122dc03556bbd88ad66234cbea7efd3b23efb" }
paste.workspace = true
serde.workspace = true
snafu.workspace = true

View File

@@ -50,7 +50,6 @@ index.workspace = true
itertools.workspace = true
lazy_static = "1.4"
log-store = { workspace = true }
memcomparable = "0.2"
mito-codec.workspace = true
moka = { workspace = true, features = ["sync", "future"] }
object-store.workspace = true

View File

@@ -53,6 +53,8 @@ mod prune_test;
#[cfg(test)]
mod row_selector_test;
#[cfg(test)]
mod scan_corrupt;
#[cfg(test)]
mod scan_test;
#[cfg(test)]
mod set_role_state_test;

View File

@@ -0,0 +1,112 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::helper::row;
use api::v1::value::ValueData;
use api::v1::Rows;
use datatypes::value::Value;
use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec};
use parquet::file::statistics::Statistics;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{PathType, RegionRequest};
use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
use store_api::storage::RegionId;
use crate::config::MitoConfig;
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::test_util;
use crate::test_util::{CreateRequestBuilder, TestEnv};
#[tokio::test]
async fn test_scan_corrupt() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::with_prefix("test_write_stats_with_long_string_value").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let table_dir = request.table_dir.clone();
let column_schemas = test_util::rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let build_rows = |start: i32, end: i32| {
(start..end)
.map(|i| {
row(vec![
ValueData::StringValue(i.to_string().repeat(128)),
ValueData::F64Value(i as f64),
ValueData::TimestampMillisecondValue(i as i64 * 1000),
])
})
.collect()
};
let put_rows = async |start, end| {
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows(start, end),
};
test_util::put_rows(&engine, region_id, rows).await;
test_util::flush_region(&engine, region_id, None).await;
};
put_rows(0, 3).await;
let region = engine.get_region(region_id).unwrap();
let version = region.version();
let file = version
.ssts
.levels()
.iter()
.flat_map(|l| l.files.values())
.next()
.unwrap();
let object_store = env.get_object_store().unwrap();
let reader = ParquetReaderBuilder::new(
table_dir.clone(),
PathType::Bare,
file.clone(),
object_store.clone(),
)
.build()
.await
.unwrap();
let codec = DensePrimaryKeyCodec::new(&version.metadata);
for r in reader.parquet_metadata().row_groups() {
for c in r.columns() {
if c.column_descr().name() == PRIMARY_KEY_COLUMN_NAME {
let stats = c.statistics().unwrap();
let Statistics::ByteArray(b) = stats else {
unreachable!()
};
let min = codec
.decode_leftmost(b.min_bytes_opt().unwrap())
.unwrap()
.unwrap();
assert_eq!(Value::String("0".repeat(128).into()), min);
let max = codec
.decode_leftmost(b.max_bytes_opt().unwrap())
.unwrap()
.unwrap();
assert_eq!(Value::String("2".repeat(128).into()), max);
}
}
}
}

View File

@@ -554,6 +554,8 @@ impl BulkPartEncoder {
WriterProperties::builder()
.set_write_batch_size(row_group_size)
.set_max_row_group_size(row_group_size)
.set_column_index_truncate_length(None)
.set_statistics_truncate_length(None)
.build(),
);
Self {

View File

@@ -774,7 +774,9 @@ impl<'a> DataPartEncoder<'a> {
.set_column_encoding(sequence_col.clone(), Encoding::DELTA_BINARY_PACKED)
.set_column_dictionary_enabled(sequence_col, false)
.set_column_encoding(op_type_col.clone(), Encoding::DELTA_BINARY_PACKED)
.set_column_dictionary_enabled(op_type_col, true);
.set_column_dictionary_enabled(op_type_col, true)
.set_column_index_truncate_length(None)
.set_statistics_truncate_length(None);
builder.build()
}

View File

@@ -329,7 +329,9 @@ where
.set_key_value_metadata(Some(vec![key_value_meta]))
.set_compression(Compression::ZSTD(ZstdLevel::default()))
.set_encoding(Encoding::PLAIN)
.set_max_row_group_size(opts.row_group_size);
.set_max_row_group_size(opts.row_group_size)
.set_column_index_truncate_length(None)
.set_statistics_truncate_length(None);
let props_builder = Self::customize_column_config(props_builder, &self.metadata);
let writer_props = props_builder.build();