From b1b76fde0b9e8b679addc98daddbc9dbe1746c81 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Tue, 3 Mar 2026 21:30:34 +0800 Subject: [PATCH] test(fuzz): repartition validation and add dedicated CI GC profile (#7703) * test(fuzz): add concurrent write loop and partition-bound value generation for repartition validation Signed-off-by: WenyXu * ci: run repartition fuzz target with dedicated local-wal GC config Signed-off-by: WenyXu * chore: fix typos Signed-off-by: WenyXu * chore: apply suggestions Signed-off-by: WenyXu * chore: fix typo Signed-off-by: WenyXu * count distinct timestamp value Signed-off-by: WenyXu * chore: apply suggestions Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- .../with-minio-repartition-gc.yaml | 39 ++++ .github/workflows/develop.yml | 7 + tests-fuzz/src/generator.rs | 4 +- tests-fuzz/src/generator/insert_expr.rs | 15 +- tests-fuzz/src/ir.rs | 110 ++++++++++- tests-fuzz/src/ir/insert_expr.rs | 1 + tests-fuzz/src/ir/partition_expr.rs | 11 ++ tests-fuzz/src/validator/partition.rs | 12 +- .../targets/ddl/fuzz_repartition_table.rs | 184 +++++++++++++++++- 9 files changed, 369 insertions(+), 14 deletions(-) create mode 100644 .github/actions/setup-greptimedb-cluster/with-minio-repartition-gc.yaml diff --git a/.github/actions/setup-greptimedb-cluster/with-minio-repartition-gc.yaml b/.github/actions/setup-greptimedb-cluster/with-minio-repartition-gc.yaml new file mode 100644 index 0000000000..b836606ad6 --- /dev/null +++ b/.github/actions/setup-greptimedb-cluster/with-minio-repartition-gc.yaml @@ -0,0 +1,39 @@ +meta: + configData: |- + [runtime] + global_rt_size = 4 + + [datanode] + [datanode.client] + timeout = "120s" + + [gc] + enable = true +datanode: + configData: |- + [runtime] + global_rt_size = 4 + compact_rt_size = 2 + + [[region_engine]] + [region_engine.mito] + [region_engine.mito.gc] + enable = true + lingering_time = "0s" + unknown_file_lingering_time = "0s" +frontend: + configData: |- + [runtime] + global_rt_size = 4 + + [meta_client] + ddl_timeout = "120s" +objectStorage: + s3: + bucket: default + region: us-west-2 + root: test-root + endpoint: http://minio.minio.svc.cluster.local + credentials: + accessKeyId: rootuser + secretAccessKey: rootpass123 diff --git a/.github/workflows/develop.yml b/.github/workflows/develop.yml index c258eae3b2..0238e92c8d 100644 --- a/.github/workflows/develop.yml +++ b/.github/workflows/develop.yml @@ -316,6 +316,13 @@ jobs: minio: true kafka: true values: "with-remote-wal.yaml" + include: + - target: "fuzz_repartition_table" + mode: + name: "Local WAL Repartition GC" + minio: true + kafka: false + values: "with-minio-repartition-gc.yaml" steps: - name: Remove unused software run: | diff --git a/tests-fuzz/src/generator.rs b/tests-fuzz/src/generator.rs index 893efcff8a..e6a2965688 100644 --- a/tests-fuzz/src/generator.rs +++ b/tests-fuzz/src/generator.rs @@ -27,7 +27,7 @@ use rand::Rng; use crate::error::Error; use crate::ir::create_expr::ColumnOption; -use crate::ir::{AlterTableExpr, CreateTableExpr, Ident}; +use crate::ir::{AlterTableExpr, Column, CreateTableExpr, Ident, RowValue}; pub type CreateTableExprGenerator = Box + Sync + Send>; @@ -44,6 +44,8 @@ pub type ValueGenerator = pub type TsValueGenerator = Box Value>; +pub type ValueOverride = Box Option>; + pub trait Generator { type Error: Sync + Send + fmt::Debug; diff --git a/tests-fuzz/src/generator/insert_expr.rs b/tests-fuzz/src/generator/insert_expr.rs index ededa53ddd..ea13c10f3a 100644 --- a/tests-fuzz/src/generator/insert_expr.rs +++ b/tests-fuzz/src/generator/insert_expr.rs @@ -23,7 +23,7 @@ use super::TsValueGenerator; use crate::context::TableContextRef; use crate::error::{Error, Result}; use crate::fake::WordGenerator; -use crate::generator::{Generator, Random, ValueGenerator}; +use crate::generator::{Generator, Random, ValueGenerator, ValueOverride}; use crate::ir::insert_expr::{InsertIntoExpr, RowValue}; use crate::ir::{Ident, generate_random_timestamp, generate_random_value}; @@ -34,6 +34,10 @@ pub struct InsertExprGenerator { table_ctx: TableContextRef, // Whether to omit all columns, i.e. INSERT INTO table_name VALUES (...) omit_column_list: bool, + #[builder(default)] + required_columns: Vec, + #[builder(default)] + value_overrides: Option>, #[builder(default = "1")] rows: usize, #[builder(default = "Box::new(WordGenerator)")] @@ -57,10 +61,11 @@ impl Generator for InsertExprGenerator { values_columns.clone_from(&self.table_ctx.columns); } else { for column in &self.table_ctx.columns { + let is_required = self.required_columns.contains(&column.name); let can_omit = column.is_nullable() || column.has_default_value(); // 50% chance to omit a column if it's not required - if !can_omit || rng.random_bool(0.5) { + if is_required || !can_omit || rng.random_bool(0.5) { values_columns.push(column.clone()); } } @@ -76,6 +81,12 @@ impl Generator for InsertExprGenerator { for _ in 0..self.rows { let mut row = Vec::with_capacity(values_columns.len()); for column in &values_columns { + if let Some(override_fn) = self.value_overrides.as_ref() + && let Some(value) = override_fn(column, rng) + { + row.push(value); + continue; + } if column.is_nullable() && rng.random_bool(0.2) { row.push(RowValue::Value(Value::Null)); continue; diff --git a/tests-fuzz/src/ir.rs b/tests-fuzz/src/ir.rs index 10fee5877a..e8c15dcf95 100644 --- a/tests-fuzz/src/ir.rs +++ b/tests-fuzz/src/ir.rs @@ -36,17 +36,19 @@ use datatypes::value::Value; use derive_builder::Builder; pub use insert_expr::InsertIntoExpr; use lazy_static::lazy_static; +pub use partition_expr::SimplePartitions; use rand::Rng; use rand::seq::{IndexedRandom, SliceRandom}; pub use repartition_expr::RepartitionExpr; use serde::{Deserialize, Serialize}; -use self::insert_expr::{RowValue, RowValues}; +use self::insert_expr::RowValues; use crate::context::TableContextRef; use crate::fake::WordGenerator; use crate::generator::{Random, TsValueGenerator}; use crate::impl_random; use crate::ir::create_expr::ColumnOption; +pub use crate::ir::insert_expr::RowValue; lazy_static! { pub static ref DATA_TYPES: Vec = vec![ @@ -168,8 +170,13 @@ pub fn generate_random_value( /// Generate monotonically increasing timestamps for MySQL. pub fn generate_unique_timestamp_for_mysql(base: i64) -> TsValueGenerator { let base = Timestamp::new_millisecond(base); - let clock = Arc::new(Mutex::new(base)); + generate_unique_timestamp_for_mysql_with_clock(Arc::new(Mutex::new(base))) +} +/// Generates a unique timestamp for MySQL. +pub fn generate_unique_timestamp_for_mysql_with_clock( + clock: Arc>, +) -> TsValueGenerator { Box::new(move |_rng, ts_type| -> Value { let mut clock = clock.lock().unwrap(); let ts = clock.add_duration(Duration::from_secs(1)).unwrap(); @@ -255,6 +262,105 @@ fn generate_random_date(rng: &mut R) -> Value { Value::from(Date::from(date)) } +/// Generates a partition value for the given column type and bounds. +pub fn generate_partition_value( + rng: &mut R, + column_type: &ConcreteDataType, + bounds: &[Value], + bound_idx: usize, +) -> Value { + if bounds.is_empty() { + return generate_random_value(rng, column_type, None); + } + let first = bounds.first().unwrap(); + let last = bounds.last().unwrap(); + match column_type { + datatypes::data_type::ConcreteDataType::Int16(_) => { + let first_value = match first { + datatypes::value::Value::Int16(v) => *v, + _ => 0, + }; + if bound_idx == 0 { + datatypes::value::Value::from(first_value.saturating_sub(1)) + } else if bound_idx < bounds.len() { + bounds[bound_idx - 1].clone() + } else { + last.clone() + } + } + datatypes::data_type::ConcreteDataType::Int32(_) => { + let first_value = match first { + datatypes::value::Value::Int32(v) => *v, + _ => 0, + }; + if bound_idx == 0 { + datatypes::value::Value::from(first_value.saturating_sub(1)) + } else if bound_idx < bounds.len() { + bounds[bound_idx - 1].clone() + } else { + last.clone() + } + } + datatypes::data_type::ConcreteDataType::Int64(_) => { + let first_value = match first { + datatypes::value::Value::Int64(v) => *v, + _ => 0, + }; + if bound_idx == 0 { + datatypes::value::Value::from(first_value.saturating_sub(1)) + } else if bound_idx < bounds.len() { + bounds[bound_idx - 1].clone() + } else { + last.clone() + } + } + datatypes::data_type::ConcreteDataType::Float32(_) => { + let first_value = match first { + datatypes::value::Value::Float32(v) => v.0, + _ => 0.0, + }; + if bound_idx == 0 { + datatypes::value::Value::from(first_value - 1.0) + } else if bound_idx < bounds.len() { + bounds[bound_idx - 1].clone() + } else { + last.clone() + } + } + datatypes::data_type::ConcreteDataType::Float64(_) => { + let first_value = match first { + datatypes::value::Value::Float64(v) => v.0, + _ => 0.0, + }; + if bound_idx == 0 { + datatypes::value::Value::from(first_value - 1.0) + } else if bound_idx < bounds.len() { + bounds[bound_idx - 1].clone() + } else { + last.clone() + } + } + datatypes::data_type::ConcreteDataType::String(_) => { + let upper = match first { + datatypes::value::Value::String(v) => v.as_utf8(), + _ => "", + }; + if bound_idx == 0 { + if upper <= "A" { + datatypes::value::Value::from("") + } else { + datatypes::value::Value::from("A") + } + } else if bound_idx < bounds.len() { + bounds[bound_idx - 1].clone() + } else { + last.clone() + } + } + _ => unimplemented!("unsupported partition column type: {column_type}"), + } +} + /// An identifier. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, PartialOrd, Eq, Ord, Hash)] pub struct Ident { diff --git a/tests-fuzz/src/ir/insert_expr.rs b/tests-fuzz/src/ir/insert_expr.rs index b7a8992a90..783b525b20 100644 --- a/tests-fuzz/src/ir/insert_expr.rs +++ b/tests-fuzz/src/ir/insert_expr.rs @@ -73,6 +73,7 @@ pub enum RowValue { } impl RowValue { + #[allow(clippy::should_implement_trait)] pub fn cmp(&self, other: &Self) -> Option { match (self, other) { (RowValue::Value(Value::Null), RowValue::Value(v2)) => v2.partial_cmp(&Value::Null), diff --git a/tests-fuzz/src/ir/partition_expr.rs b/tests-fuzz/src/ir/partition_expr.rs index 1357379a29..c91dd487ae 100644 --- a/tests-fuzz/src/ir/partition_expr.rs +++ b/tests-fuzz/src/ir/partition_expr.rs @@ -18,6 +18,7 @@ use partition::expr::{Operand, PartitionExpr, RestrictedOp}; use rand::Rng; use snafu::ensure; +use crate::context::TableContext; use crate::error::{self, Result}; use crate::ir::{Ident, generate_random_value}; @@ -36,6 +37,7 @@ use crate::ir::{Ident, generate_random_value}; /// # Fields /// - `column_name`: The name of the column used for partitioning. /// - `bounds`: The partition boundary values; must be sorted for correct partitioning logic. +#[derive(Clone)] pub struct SimplePartitions { /// The column to partition by. pub column_name: Ident, @@ -136,6 +138,15 @@ impl SimplePartitions { Ok(Self::new(column_name, bounds)) } + /// Reconstructs a `SimplePartitions` instance from a `TableContext`. + pub fn from_table_ctx(table_ctx: &TableContext) -> Result { + let partition_def = table_ctx + .partition + .as_ref() + .expect("expected partition def"); + Self::from_exprs(partition_def.columns[0].clone(), &partition_def.exprs) + } + /// Inserts a new bound into the partition bounds and returns the index of the new bound. pub fn insert_bound(&mut self, bound: Value) -> Result { ensure!( diff --git a/tests-fuzz/src/validator/partition.rs b/tests-fuzz/src/validator/partition.rs index 932f635d8f..3fbc8e7f86 100644 --- a/tests-fuzz/src/validator/partition.rs +++ b/tests-fuzz/src/validator/partition.rs @@ -52,6 +52,10 @@ pub async fn fetch_partitions_info_schema( }) } +fn normalize(s: &str) -> String { + s.replace("\\\"", "\"").replace("\\\\", "\\") +} + /// Asserts the partitions are equal to the expected partitions. pub fn assert_partitions(expected: &PartitionDef, actual: &[PartitionInfo]) -> Result<()> { ensure!( @@ -69,17 +73,17 @@ pub fn assert_partitions(expected: &PartitionDef, actual: &[PartitionInfo]) -> R for expr in expected_exprs { let actual_expr = actual .iter() - .find(|info| info.partition_description == expr); + .find(|info| normalize(&info.partition_description) == normalize(&expr)); ensure!( actual_expr.is_some(), error::AssertSnafu { reason: format!( - "Expected partition expression: {expr} not found, actual: {:?}", + "Expected partition expression: '{expr:?}' not found, actual: {:?}", actual .iter() - .map(|info| info.partition_description.clone()) + .map(|info| format!("'{:?}'", info.partition_description.clone())) .collect::>() - .join(", ") + .join("; ") ), } ); diff --git a/tests-fuzz/targets/ddl/fuzz_repartition_table.rs b/tests-fuzz/targets/ddl/fuzz_repartition_table.rs index d861cfd4b7..d4b9e9fd7a 100644 --- a/tests-fuzz/targets/ddl/fuzz_repartition_table.rs +++ b/tests-fuzz/targets/ddl/fuzz_repartition_table.rs @@ -14,15 +14,18 @@ #![no_main] -use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; use arbitrary::{Arbitrary, Unstructured}; -use common_telemetry::info; +use common_telemetry::{info, warn}; +use common_time::Timestamp; use libfuzzer_sys::fuzz_target; use rand::{Rng, SeedableRng}; use rand_chacha::ChaChaRng; use snafu::ResultExt; -use sqlx::{MySql, Pool}; +use sqlx::{Executor, MySql, Pool}; use tests_fuzz::context::{TableContext, TableContextRef}; use tests_fuzz::error::{self, Result}; use tests_fuzz::fake::{ @@ -31,19 +34,26 @@ use tests_fuzz::fake::{ }; use tests_fuzz::generator::Generator; use tests_fuzz::generator::create_expr::CreateTableExprGeneratorBuilder; +use tests_fuzz::generator::insert_expr::InsertExprGeneratorBuilder; use tests_fuzz::generator::repartition_expr::{ MergePartitionExprGeneratorBuilder, SplitPartitionExprGeneratorBuilder, }; -use tests_fuzz::ir::{CreateTableExpr, MySQLTsColumnTypeGenerator, RepartitionExpr}; +use tests_fuzz::ir::{ + CreateTableExpr, InsertIntoExpr, MySQLTsColumnTypeGenerator, RepartitionExpr, RowValue, + SimplePartitions, generate_partition_value, generate_unique_timestamp_for_mysql_with_clock, +}; use tests_fuzz::translator::DslTranslator; use tests_fuzz::translator::mysql::create_expr::CreateTableExprTranslator; +use tests_fuzz::translator::mysql::insert_expr::InsertIntoExprTranslator; use tests_fuzz::translator::mysql::repartition_expr::RepartitionExprTranslator; use tests_fuzz::utils::{ Connections, get_fuzz_override, get_gt_fuzz_input_max_alter_actions, init_greptime_connections_via_env, }; use tests_fuzz::validator; +use tests_fuzz::validator::row::count_values; +#[derive(Clone)] struct FuzzContext { greptime: Pool, } @@ -96,6 +106,114 @@ fn generate_create_expr( create_table_generator.generate(rng) } +struct SharedState { + table_ctx: TableContextRef, + clock: Arc>, + inserted_rows: u64, + running: bool, +} + +fn build_insert_expr( + table_ctx: &TableContextRef, + rng: &mut R, + partitions: &SimplePartitions, + clock: &Arc>, +) -> InsertIntoExpr { + let ts_value_generator = generate_unique_timestamp_for_mysql_with_clock(clock.clone()); + let counter = Arc::new(AtomicUsize::new(0)); + let counter_clone = counter.clone(); + let partition_len = table_ctx.partition.as_ref().unwrap().exprs.len(); + let row = rng.random_range(partition_len..partition_len * 2); + + let moved_partitions = partitions.clone(); + let insert_generator = InsertExprGeneratorBuilder::default() + .table_ctx(table_ctx.clone()) + .omit_column_list(false) + .rows(row) + .required_columns(vec![partitions.column_name.clone()]) + .value_overrides(Some(Box::new(move |column, rng| { + if column.name.value == moved_partitions.column_name.value { + let bound_idx = counter_clone.fetch_add(1, Ordering::Relaxed) % partition_len; + + return Some(RowValue::Value(generate_partition_value( + rng, + &column.column_type, + &moved_partitions.bounds, + bound_idx, + ))); + } + None + }))) + .ts_value_generator(ts_value_generator) + .build() + .unwrap(); + insert_generator.generate(rng).unwrap() +} + +async fn execute_insert_with_retry(ctx: &FuzzContext, sql: &str) -> Result<()> { + let mut delay = Duration::from_millis(100); + let mut attempt = 0; + let max_attempts = 10; + loop { + match ctx + .greptime + // unprepared query, see + .execute(sql) + .await + { + Ok(_) => { + return Ok(()); + } + Err(err) => { + tokio::time::sleep(delay).await; + delay = std::cmp::min(delay * 2, Duration::from_secs(1)); + attempt += 1; + warn!("Execute insert with retry: {sql}, attempt: {attempt}, error: {err:?}"); + if attempt >= max_attempts { + return Err(err).context(error::ExecuteQuerySnafu { sql }); + } + } + } + } +} + +async fn write_loop( + mut rng: R, + ctx: FuzzContext, + shared_state: Arc>, +) -> Result<()> { + info!("Start write loop"); + let clock = shared_state.lock().unwrap().clock.clone(); + loop { + let (is_running, table_ctx) = { + let state = shared_state.lock().unwrap(); + (state.running, state.table_ctx.clone()) + }; + + if !is_running { + break; + } + + let partitions = SimplePartitions::from_table_ctx(&table_ctx).unwrap(); + let insert_expr = build_insert_expr(&table_ctx, &mut rng, &partitions, &clock); + + let new_inserted_rows = insert_expr.values_list.len() as u64; + let translator = InsertIntoExprTranslator; + let sql = translator.translate(&insert_expr)?; + + let now = Instant::now(); + execute_insert_with_retry(&ctx, &sql).await?; + info!("Execute insert sql: {sql}, elapsed: {:?}", now.elapsed()); + { + let mut state = shared_state.lock().unwrap(); + state.inserted_rows += new_inserted_rows; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + info!("Write loop ended"); + Ok(()) +} + async fn execute_repartition_table(ctx: FuzzContext, input: FuzzInput) -> Result<()> { info!("input: {input:?}"); let mut rng = ChaChaRng::seed_from_u64(input.seed); @@ -111,6 +229,15 @@ async fn execute_repartition_table(ctx: FuzzContext, input: FuzzInput) -> Result // Repartition table let mut table_ctx = Arc::new(TableContext::from(&expr)); + let shared_state = Arc::new(Mutex::new(SharedState { + table_ctx: table_ctx.clone(), + clock: Arc::new(Mutex::new(Timestamp::current_millis())), + running: true, + inserted_rows: 0, + })); + + let writer_rng = ChaChaRng::seed_from_u64(input.seed); + let writer_task = tokio::spawn(write_loop(writer_rng, ctx.clone(), shared_state.clone())); for i in 0..input.actions { let partition_num = table_ctx.partition.as_ref().unwrap().exprs.len(); info!( @@ -129,6 +256,8 @@ async fn execute_repartition_table(ctx: FuzzContext, input: FuzzInput) -> Result .context(error::ExecuteQuerySnafu { sql: &sql })?; info!("result: {result:?}"); table_ctx = Arc::new(Arc::unwrap_or_clone(table_ctx).repartition(expr).unwrap()); + // Updates the table partition in the shared state. + shared_state.lock().unwrap().table_ctx = table_ctx.clone(); // Validates partition expression let partition_entries = validator::partition::fetch_partitions_info_schema( @@ -137,13 +266,58 @@ async fn execute_repartition_table(ctx: FuzzContext, input: FuzzInput) -> Result &table_ctx.name, ) .await?; + info!( + "partition_entries:\n{}", + partition_entries + .iter() + .map(|entry| { + format!( + r#" - table_catalog: {} + table_schema: {} + table_name: {} + partition_name: {} + partition_expression: {} + partition_description: {} + greptime_partition_id: {} + partition_ordinal_position: {}"#, + entry.table_catalog, + entry.table_schema, + entry.table_name, + entry.partition_name, + entry.partition_expression, + entry.partition_description, + entry.greptime_partition_id, + entry.partition_ordinal_position, + ) + }) + .collect::>() + .join("\n") + ); validator::partition::assert_partitions( table_ctx.partition.as_ref().unwrap(), &partition_entries, )?; - // TODO(weny): inserts data and validates the data } + shared_state.lock().unwrap().running = false; + writer_task.await.unwrap().unwrap(); + let count_sql = format!("SELECT COUNT(1) AS count FROM {}", table_ctx.name); + let count = count_values(&ctx.greptime, &count_sql).await?; + assert_eq!( + count.count as usize, + shared_state.lock().unwrap().inserted_rows as usize + ); + let timestamp_column_name = table_ctx.timestamp_column().unwrap().name.clone(); + // Since each timestamp value is unique, the count of distinct timestamps should match the total row count. + let distinct_count_sql = format!( + "SELECT COUNT(DISTINCT {}) AS count FROM {}", + timestamp_column_name, table_ctx.name + ); + let distinct_count = count_values(&ctx.greptime, &distinct_count_sql).await?; + assert_eq!( + distinct_count.count as usize, + shared_state.lock().unwrap().inserted_rows as usize + ); // Cleans up let table_name = table_ctx.name.clone();