diff --git a/.github/workflows/develop.yml b/.github/workflows/develop.yml index 0238e92c8d..b6ab0f8926 100644 --- a/.github/workflows/develop.yml +++ b/.github/workflows/develop.yml @@ -319,7 +319,13 @@ jobs: include: - target: "fuzz_repartition_table" mode: - name: "Local WAL Repartition GC" + name: "Local WAL mito table repartition" + minio: true + kafka: false + values: "with-minio-repartition-gc.yaml" + - target: "fuzz_repartition_metric_table" + mode: + name: "Local WAL metric table repartition" minio: true kafka: false values: "with-minio-repartition-gc.yaml" @@ -455,6 +461,14 @@ jobs: path: /tmp/fuzz-monitor-dumps if-no-files-found: warn retention-days: 3 + - name: Upload CSV dumps + if: failure() + uses: actions/upload-artifact@v4 + with: + name: fuzz-tests-csv-dumps-${{ matrix.mode.name }}-${{ matrix.target }} + path: /tmp/greptime-fuzz-dumps + if-no-files-found: warn + retention-days: 3 - name: Delete cluster if: success() shell: bash diff --git a/tests-fuzz/Cargo.toml b/tests-fuzz/Cargo.toml index a537ca0687..bc687092c0 100644 --- a/tests-fuzz/Cargo.toml +++ b/tests-fuzz/Cargo.toml @@ -100,6 +100,13 @@ test = false bench = false doc = false +[[bin]] +name = "fuzz_repartition_metric_table" +path = "targets/ddl/fuzz_repartition_metric_table.rs" +test = false +bench = false +doc = false + [[bin]] name = "fuzz_alter_table" path = "targets/ddl/fuzz_alter_table.rs" diff --git a/tests-fuzz/README.md b/tests-fuzz/README.md index 6807e19a1c..cc9d7eb84e 100644 --- a/tests-fuzz/README.md +++ b/tests-fuzz/README.md @@ -66,3 +66,23 @@ GT_FUZZ_OVERRIDE_SEED=6666 GT_FUZZ_OVERRIDE_ACTIONS=175 cargo fuzz run fuzz_targ ``` For more details, visit [cargo fuzz](https://rust-fuzz.github.io/book/cargo-fuzz/tutorial.html) or run the command `cargo fuzz --help`. + +## Repartition Metric Dump Artifacts + +For `fuzz_repartition_metric_table`, dump artifacts are written under one run directory. + +- Table data snapshots: `.table-data.csv` +- SQL traces per logical table: `.trace.sql` +- Seed metadata: `seed.meta` + +SQL trace behavior: + +- Insert SQL is appended after successful execution with comment fields including + `started_at_ms` and `elapsed_ms`. +- Repartition events are broadcast to all logical table trace files with comment fields including + `action_idx`, `started_at_ms`, `elapsed_ms`, and SQL text. + +Run directory lifecycle: + +- On success, the run directory is cleaned up. +- On failure, the run directory is retained for CI/local diffing. diff --git a/tests-fuzz/src/fake.rs b/tests-fuzz/src/fake.rs index aa92e0293a..8910a39206 100644 --- a/tests-fuzz/src/fake.rs +++ b/tests-fuzz/src/fake.rs @@ -65,6 +65,26 @@ where _v: PhantomData, } +pub struct ConstGenerator { + value: V, +} + +impl ConstGenerator { + pub fn new(value: V) -> Self { + Self { value } + } +} + +impl Random for ConstGenerator +where + R: Rng, + V: Clone, +{ + fn choose(&self, _rng: &mut R, amount: usize) -> Vec { + vec![self.value.clone(); amount] + } +} + pub fn random_capitalize_map(rng: &mut R, s: Ident) -> Ident { let mut v = s.value.chars().collect::>(); diff --git a/tests-fuzz/src/generator/create_expr.rs b/tests-fuzz/src/generator/create_expr.rs index fae6a95eda..261a310db2 100644 --- a/tests-fuzz/src/generator/create_expr.rs +++ b/tests-fuzz/src/generator/create_expr.rs @@ -193,6 +193,26 @@ fn generate_partition_def( } } +fn generate_metric_partition(partitions: usize) -> Option<(Column, PartitionDef)> { + if partitions <= 1 { + return None; + } + + let partition_column = Column { + name: Ident::new("host"), + column_type: ConcreteDataType::string_datatype(), + options: vec![ColumnOption::PrimaryKey], + }; + let bounds = generate_partition_bounds(&partition_column.column_type, partitions - 1); + let partitions = SimplePartitions::new(partition_column.name.clone(), bounds); + let partition_def = PartitionDef { + columns: vec![partitions.column_name.clone()], + exprs: partitions.generate().unwrap(), + }; + + Some((partition_column, partition_def)) +} + /// Generate a physical table with 2 columns: ts of TimestampType::Millisecond as time index and val of Float64Type. #[derive(Builder)] #[builder(pattern = "owned")] @@ -201,6 +221,8 @@ pub struct CreatePhysicalTableExprGenerator { name_generator: Box>, #[builder(default = "false")] if_not_exists: bool, + #[builder(default = "0")] + partition: usize, #[builder(default, setter(into))] with_clause: HashMap, } @@ -215,25 +237,35 @@ impl Generator for CreatePhysicalTableExpr options.insert(key.clone(), Value::from(value.clone())); } + let mut columns = vec![ + Column { + name: Ident::new("ts"), + column_type: ConcreteDataType::timestamp_millisecond_datatype(), + options: vec![ColumnOption::TimeIndex], + }, + Column { + name: Ident::new("val"), + column_type: ConcreteDataType::float64_datatype(), + options: vec![], + }, + ]; + + let mut partition = None; + let mut primary_keys = vec![]; + if let Some((partition_column, partition_def)) = generate_metric_partition(self.partition) { + columns.push(partition_column); + partition = Some(partition_def); + primary_keys.push(columns.len() - 1); + } + Ok(CreateTableExpr { table_name: self.name_generator.generate(rng), - columns: vec![ - Column { - name: Ident::new("ts"), - column_type: ConcreteDataType::timestamp_millisecond_datatype(), - options: vec![ColumnOption::TimeIndex], - }, - Column { - name: Ident::new("val"), - column_type: ConcreteDataType::float64_datatype(), - options: vec![], - }, - ], + columns, if_not_exists: self.if_not_exists, - partition: None, + partition, engine: "metric".to_string(), options, - primary_keys: vec![], + primary_keys, }) } } @@ -245,6 +277,8 @@ pub struct CreateLogicalTableExprGenerator { physical_table_ctx: TableContextRef, labels: usize, if_not_exists: bool, + #[builder(default = "true")] + include_partition_column: bool, #[builder(default = "Box::new(WordGenerator)")] name_generator: Box>, } @@ -253,11 +287,11 @@ impl Generator for CreateLogicalTableExprG type Error = Error; fn generate(&self, rng: &mut R) -> Result { - // Currently we mock the usage of GreptimeDB as Prometheus' backend, the physical table must have two columns. + // Currently we mock the usage of GreptimeDB as Prometheus' backend, the physical table must have ts and val. ensure!( - self.physical_table_ctx.columns.len() == 2, + self.physical_table_ctx.columns.len() >= 2, error::UnexpectedSnafu { - violated: "The physical table must have two columns" + violated: "The physical table must have at least two columns" } ); @@ -265,9 +299,16 @@ impl Generator for CreateLogicalTableExprG let logical_table_name = self .physical_table_ctx .generate_unique_table_name(rng, self.name_generator.as_ref()); + let mut physical_columns = self.physical_table_ctx.columns.clone(); + if !self.include_partition_column + && let Some(partition_def) = &self.physical_table_ctx.partition + { + physical_columns.retain(|column| !partition_def.columns.contains(&column.name)); + } + let mut logical_table = CreateTableExpr { table_name: logical_table_name, - columns: self.physical_table_ctx.columns.clone(), + columns: physical_columns, if_not_exists: self.if_not_exists, partition: None, engine: "metric".to_string(), @@ -459,6 +500,58 @@ mod tests { })); } + #[test] + fn test_create_physical_table_expr_generator_with_partition() { + let mut rng = rand::rng(); + let physical_table_expr = CreatePhysicalTableExprGeneratorBuilder::default() + .partition(3) + .if_not_exists(false) + .build() + .unwrap() + .generate(&mut rng) + .unwrap(); + + assert_eq!(physical_table_expr.engine, "metric"); + assert!(physical_table_expr.partition.is_some()); + assert_eq!(physical_table_expr.partition.unwrap().exprs.len(), 3); + } + + #[test] + fn test_create_logical_table_expr_generator_without_partition_column() { + let mut rng = rand::rng(); + let physical_table_expr = CreatePhysicalTableExprGeneratorBuilder::default() + .partition(3) + .if_not_exists(false) + .build() + .unwrap() + .generate(&mut rng) + .unwrap(); + let partition_columns = physical_table_expr + .partition + .as_ref() + .unwrap() + .columns + .clone(); + let physical_table_ctx = Arc::new(TableContext::from(&physical_table_expr)); + + let logical_table_expr = CreateLogicalTableExprGeneratorBuilder::default() + .physical_table_ctx(physical_table_ctx) + .labels(3) + .include_partition_column(false) + .if_not_exists(false) + .build() + .unwrap() + .generate(&mut rng) + .unwrap(); + + assert!( + logical_table_expr + .columns + .iter() + .all(|column| !partition_columns.contains(&column.name)) + ); + } + #[test] fn test_create_logical_table_expr_generator_deterministic() { let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0); diff --git a/tests-fuzz/src/ir.rs b/tests-fuzz/src/ir.rs index e8c15dcf95..ce1628cd61 100644 --- a/tests-fuzz/src/ir.rs +++ b/tests-fuzz/src/ir.rs @@ -20,6 +20,7 @@ pub(crate) mod insert_expr; pub(crate) mod partition_expr; pub(crate) mod repartition_expr; pub(crate) mod select_expr; +pub(crate) mod string_value; use core::fmt; use std::collections::HashMap; @@ -126,20 +127,7 @@ pub fn generate_partition_bounds(datatype: &ConcreteDataType, bounds: usize) -> ConcreteDataType::Int64(_) => generate_values!(i64, bounds), ConcreteDataType::Float32(_) => generate_values!(f32, bounds), ConcreteDataType::Float64(_) => generate_values!(f64, bounds), - ConcreteDataType::String(_) => { - let base = b'A'; - let range = b'z' - b'A'; - let step = range / (bounds as u8 + 1); - (1..=bounds) - .map(|i| { - Value::from( - char::from(base + step * i as u8) - .escape_default() - .to_string(), - ) - }) - .collect() - } + ConcreteDataType::String(_) => string_value::generate_partition_bounds(bounds), _ => unimplemented!("unsupported type: {datatype}"), } } @@ -157,10 +145,7 @@ pub fn generate_random_value( ConcreteDataType::Int64(_) => Value::from(rng.random::()), ConcreteDataType::Float32(_) => Value::from(rng.random::()), ConcreteDataType::Float64(_) => Value::from(rng.random::()), - ConcreteDataType::String(_) => match random_str { - Some(random) => Value::from(random.generate(rng).value), - None => Value::from(rng.random::().to_string()), - }, + ConcreteDataType::String(_) => string_value::generate_data_string_value(rng, random_str), ConcreteDataType::Date(_) => generate_random_date(rng), _ => unimplemented!("unsupported type: {datatype}"), @@ -341,21 +326,7 @@ pub fn generate_partition_value( } } datatypes::data_type::ConcreteDataType::String(_) => { - let upper = match first { - datatypes::value::Value::String(v) => v.as_utf8(), - _ => "", - }; - if bound_idx == 0 { - if upper <= "A" { - datatypes::value::Value::from("") - } else { - datatypes::value::Value::from("A") - } - } else if bound_idx < bounds.len() { - bounds[bound_idx - 1].clone() - } else { - last.clone() - } + string_value::generate_partition_value(bounds, bound_idx) } _ => unimplemented!("unsupported partition column type: {column_type}"), } diff --git a/tests-fuzz/src/ir/partition_expr.rs b/tests-fuzz/src/ir/partition_expr.rs index c91dd487ae..908223366c 100644 --- a/tests-fuzz/src/ir/partition_expr.rs +++ b/tests-fuzz/src/ir/partition_expr.rs @@ -20,7 +20,7 @@ use snafu::ensure; use crate::context::TableContext; use crate::error::{self, Result}; -use crate::ir::{Ident, generate_random_value}; +use crate::ir::{Ident, generate_random_value, string_value}; /// A partitioning scheme that divides a single column into multiple ranges based on provided bounds. /// @@ -245,6 +245,10 @@ pub fn generate_unique_bound( datatype: &ConcreteDataType, bounds: &[Value], ) -> Result { + if matches!(datatype, ConcreteDataType::String(_)) { + return string_value::generate_unique_partition_bound(rng, bounds); + } + for _ in 0..16 { let candidate = generate_random_value(rng, datatype, None); if !bounds.contains(&candidate) { diff --git a/tests-fuzz/src/ir/string_value.rs b/tests-fuzz/src/ir/string_value.rs new file mode 100644 index 0000000000..6a53aa69de --- /dev/null +++ b/tests-fuzz/src/ir/string_value.rs @@ -0,0 +1,162 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use datatypes::value::Value; +use rand::Rng; + +use crate::error::{self, Result}; +use crate::generator::Random; +use crate::ir::Ident; + +const READABLE_CHARSET: &[u8] = b"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; + +fn readable_token(index: usize) -> String { + let base = READABLE_CHARSET.len(); + let mut n = index + 1; + let mut buf = Vec::new(); + + while n > 0 { + let rem = (n - 1) % base; + buf.push(READABLE_CHARSET[rem] as char); + n = (n - 1) / base; + } + + buf.iter().rev().collect() +} + +pub fn generate_data_string_value( + rng: &mut R, + random_str: Option<&dyn Random>, +) -> Value { + match random_str { + Some(random) => Value::from(random.generate(rng).value), + None => { + let idx = rng.random_range(0..(READABLE_CHARSET.len() * READABLE_CHARSET.len() * 4)); + Value::from(readable_token(idx)) + } + } +} + +/// Generates ordered readable string bounds for partition expressions. +pub fn generate_partition_bounds(bounds: usize) -> Vec { + let token_space = READABLE_CHARSET.len() * READABLE_CHARSET.len() * 1024; + (1..=bounds) + .map(|i| { + let idx = i * token_space / (bounds + 1); + Value::from(readable_token(idx)) + }) + .collect() +} + +/// Picks a representative string value for the target partition range. +pub fn generate_partition_value(bounds: &[Value], bound_idx: usize) -> Value { + let first = bounds.first().unwrap(); + let last = bounds.last().unwrap(); + let upper = match first { + Value::String(v) => v.as_utf8(), + _ => "", + }; + + if bound_idx == 0 { + if upper <= "0" { + Value::from("") + } else { + Value::from("0") + } + } else if bound_idx < bounds.len() { + bounds[bound_idx - 1].clone() + } else { + last.clone() + } +} + +/// Generates a unique readable bound not present in existing bounds. +pub fn generate_unique_partition_bound(rng: &mut R, bounds: &[Value]) -> Result { + let search_space = READABLE_CHARSET.len() * READABLE_CHARSET.len() * 1024; + let start = rng.random_range(0..search_space); + for offset in 0..search_space { + let idx = start + offset; + let candidate = Value::from(readable_token(idx)); + if !bounds.contains(&candidate) { + return Ok(candidate); + } + } + + error::UnexpectedSnafu { + violated: "unable to generate unique string partition bound".to_string(), + } + .fail() +} + +#[cfg(test)] +mod tests { + use rand::SeedableRng; + use rand_chacha::ChaCha8Rng; + + use super::*; + + #[test] + fn test_readable_token_grows_length() { + assert_eq!("0", readable_token(0)); + assert_eq!("9", readable_token(9)); + assert_eq!("A", readable_token(10)); + assert_eq!("z", readable_token(61)); + assert_eq!("00", readable_token(62)); + } + + #[test] + fn test_generate_partition_bounds_are_readable_and_unique() { + let bounds = generate_partition_bounds(8); + assert_eq!(8, bounds.len()); + + let mut values = bounds + .iter() + .map(|v| match v { + Value::String(s) => s.as_utf8().to_string(), + _ => panic!("expected string value"), + }) + .collect::>(); + let mut dedup = values.clone(); + dedup.sort(); + dedup.dedup(); + assert_eq!(values.len(), dedup.len()); + + for s in values.drain(..) { + assert!(s.chars().all(|c| c.is_ascii_alphanumeric())); + } + } + + #[test] + fn test_generate_partition_value_for_string_bounds() { + let bounds = vec![Value::from("A"), Value::from("M")]; + assert_eq!(Value::from("0"), generate_partition_value(&bounds, 0)); + assert_eq!(Value::from("A"), generate_partition_value(&bounds, 1)); + assert_eq!(Value::from("M"), generate_partition_value(&bounds, 2)); + } + + #[test] + fn test_generate_unique_partition_bound_not_in_existing() { + let mut rng = ChaCha8Rng::seed_from_u64(42); + let bounds = vec![Value::from("0"), Value::from("1"), Value::from("2")]; + let candidate = generate_unique_partition_bound(&mut rng, &bounds).unwrap(); + assert!(!bounds.contains(&candidate)); + match candidate { + Value::String(s) => { + assert!(!s.as_utf8().is_empty()); + assert!(s.as_utf8().chars().all(|c| c.is_ascii_alphanumeric())); + } + _ => panic!("expected string value"), + } + } +} diff --git a/tests-fuzz/src/translator.rs b/tests-fuzz/src/translator.rs index 673b543f2c..4c5e0bb6a4 100644 --- a/tests-fuzz/src/translator.rs +++ b/tests-fuzz/src/translator.rs @@ -13,6 +13,8 @@ // limitations under the License. mod common; +/// Translator that converts insert expressions into CSV records. +pub mod csv; pub mod mysql; pub mod postgres; diff --git a/tests-fuzz/src/translator/csv.rs b/tests-fuzz/src/translator/csv.rs new file mode 100644 index 0000000000..e95956862c --- /dev/null +++ b/tests-fuzz/src/translator/csv.rs @@ -0,0 +1,121 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::error::Error; +use crate::ir::insert_expr::{InsertIntoExpr, RowValue}; +use crate::translator::DslTranslator; + +/// One CSV record converted from an insert row. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CsvRecord { + /// Cell values in column order. + pub values: Vec, +} + +/// CSV records converted from an insert expression. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CsvRecords { + /// Target table name from insert expression. + pub table_name: String, + /// Header values from insert columns. + pub headers: Vec, + /// Converted row records. + pub records: Vec, +} + +/// Translates `InsertIntoExpr` into CSV-writer-ready records. +pub struct InsertExprToCsvRecordsTranslator; + +impl DslTranslator for InsertExprToCsvRecordsTranslator { + type Error = Error; + + fn translate(&self, input: &InsertIntoExpr) -> Result { + let headers = input + .columns + .iter() + .map(|column| column.name.to_string()) + .collect::>(); + let records = input + .values_list + .iter() + .map(|row| CsvRecord { + values: row.iter().map(Self::format_row_value).collect(), + }) + .collect::>(); + + Ok(CsvRecords { + table_name: input.table_name.to_string(), + headers, + records, + }) + } +} + +impl InsertExprToCsvRecordsTranslator { + fn format_row_value(value: &RowValue) -> String { + match value { + RowValue::Value(datatypes::value::Value::Null) => String::new(), + RowValue::Value(v) => v.to_string(), + RowValue::Default => "DEFAULT".to_string(), + } + } +} + +#[cfg(test)] +mod tests { + use datatypes::data_type::ConcreteDataType; + + use super::InsertExprToCsvRecordsTranslator; + use crate::ir::create_expr::ColumnOption; + use crate::ir::insert_expr::{InsertIntoExpr, RowValue}; + use crate::ir::{Column, Ident}; + use crate::translator::DslTranslator; + + #[test] + fn test_translate_insert_expr_to_csv_records() { + let input = InsertIntoExpr { + table_name: Ident::new("metric_a"), + omit_column_list: false, + columns: vec![ + Column { + name: "host".into(), + column_type: ConcreteDataType::string_datatype(), + options: vec![ColumnOption::PrimaryKey], + }, + Column { + name: "value".into(), + column_type: ConcreteDataType::float64_datatype(), + options: vec![], + }, + ], + values_list: vec![ + vec![ + RowValue::Value(datatypes::value::Value::String("web-1".into())), + RowValue::Value(datatypes::value::Value::Int32(15)), + ], + vec![ + RowValue::Value(datatypes::value::Value::Null), + RowValue::Default, + ], + ], + }; + + let output = InsertExprToCsvRecordsTranslator.translate(&input).unwrap(); + assert_eq!(output.table_name, "metric_a"); + assert_eq!(output.headers, vec!["host", "value"]); + assert_eq!(output.records.len(), 2); + assert_eq!(output.records[0].values, vec!["web-1", "15"]); + assert_eq!(output.records[1].values, vec!["", "DEFAULT"]); + } +} diff --git a/tests-fuzz/src/utils.rs b/tests-fuzz/src/utils.rs index 0780f6c93d..d55abab3c2 100644 --- a/tests-fuzz/src/utils.rs +++ b/tests-fuzz/src/utils.rs @@ -15,6 +15,8 @@ pub mod cluster_info; pub mod config; pub mod crd; +/// CSV dump writer utilities for fuzz tests. +pub mod csv_dump_writer; pub mod health; pub mod migration; pub mod partition; @@ -22,10 +24,15 @@ pub mod pod_failure; pub mod procedure; #[cfg(feature = "unstable")] pub mod process; +pub mod retry; +/// SQL dump writer utilities for fuzz tests. +pub mod sql_dump_writer; pub mod wait; use std::env; +use std::str::FromStr; +use common_base::readable_size::ReadableSize; use common_telemetry::info; use common_telemetry::tracing::log::LevelFilter; use paste::paste; @@ -126,6 +133,14 @@ pub const GT_FUZZ_INPUT_MAX_COLUMNS: &str = "GT_FUZZ_INPUT_MAX_COLUMNS"; pub const GT_FUZZ_INPUT_MAX_ALTER_ACTIONS: &str = "GT_FUZZ_INPUT_MAX_ALTER_ACTIONS"; pub const GT_FUZZ_INPUT_MAX_INSERT_ACTIONS: &str = "GT_FUZZ_INPUT_MAX_INSERT_ACTIONS"; pub const FUZZ_OVERRIDE_PREFIX: &str = "GT_FUZZ_OVERRIDE_"; +/// Enables CSV dump generation for fuzz runs. +pub const GT_FUZZ_DUMP_TABLE_CSV: &str = "GT_FUZZ_DUMP_TABLE_CSV"; +/// Base directory for CSV dump sessions. +pub const GT_FUZZ_DUMP_DIR: &str = "GT_FUZZ_DUMP_DIR"; +/// Directory suffix used by one CSV dump session. +pub const GT_FUZZ_DUMP_SUFFIX: &str = "GT_FUZZ_DUMP_SUFFIX"; +/// Max in-memory CSV buffer size before auto flush. +pub const GT_FUZZ_DUMP_BUFFER_MAX_BYTES: &str = "GT_FUZZ_DUMP_BUFFER_MAX_BYTES"; /// Reads an override value for a fuzz parameter from env `GT_FUZZ_OVERRIDE_`. pub fn get_fuzz_override(name: &str) -> Option @@ -137,6 +152,33 @@ where env::var(&key).ok().and_then(|v| v.parse().ok()) } +/// Returns CSV dump base directory. +pub fn get_gt_fuzz_dump_dir() -> String { + let _ = dotenv::dotenv(); + env::var(GT_FUZZ_DUMP_DIR).unwrap_or_else(|_| "/tmp/greptime-fuzz-dumps".to_string()) +} + +/// Returns CSV dump directory suffix. +pub fn get_gt_fuzz_dump_suffix() -> String { + let _ = dotenv::dotenv(); + env::var(GT_FUZZ_DUMP_SUFFIX).unwrap_or_else(|_| ".repartition-metric-csv".to_string()) +} + +/// Returns max CSV in-memory buffer size. +pub fn get_gt_fuzz_dump_buffer_max_bytes() -> usize { + let _ = dotenv::dotenv(); + env::var(GT_FUZZ_DUMP_BUFFER_MAX_BYTES) + .ok() + .and_then(|value| { + value.parse::().ok().or_else(|| { + ReadableSize::from_str(&value) + .ok() + .map(|size| size.as_bytes() as usize) + }) + }) + .unwrap_or(8 * 1024 * 1024) +} + macro_rules! make_get_from_env_helper { ($key:expr, $default: expr) => { paste! { diff --git a/tests-fuzz/src/utils/csv_dump_writer.rs b/tests-fuzz/src/utils/csv_dump_writer.rs new file mode 100644 index 0000000000..de16a23c24 --- /dev/null +++ b/tests-fuzz/src/utils/csv_dump_writer.rs @@ -0,0 +1,383 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, HashSet}; +use std::fs::{File, OpenOptions, create_dir_all, remove_dir_all}; +use std::io::Write; +use std::path::{Path, PathBuf}; + +use common_telemetry::{info, warn}; +use common_time::util::current_time_millis; +use snafu::ResultExt; + +use crate::error::{self, Result}; +use crate::translator::csv::CsvRecords; +use crate::utils::{ + get_gt_fuzz_dump_buffer_max_bytes, get_gt_fuzz_dump_dir, get_gt_fuzz_dump_suffix, +}; + +/// Metadata for one CSV dump session. +#[derive(Debug, Clone)] +pub struct CsvDumpMetadata { + /// Fuzz target name. + pub target: String, + /// Seed used by current fuzz input. + pub seed: u64, + /// Repartition action count. + pub actions: usize, + /// Initial partition count. + pub partitions: usize, + /// Logical table count. + pub tables: usize, + /// Session start time in unix milliseconds. + pub started_at_unix_ms: i64, +} + +impl CsvDumpMetadata { + /// Builds dump metadata with current timestamp. + pub fn new( + target: impl Into, + seed: u64, + actions: usize, + partitions: usize, + tables: usize, + ) -> Self { + Self { + target: target.into(), + seed, + actions, + partitions, + tables, + started_at_unix_ms: current_time_millis(), + } + } +} + +/// Session writer for staged CSV dump records. +#[derive(Debug)] +pub struct CsvDumpSession { + /// Session metadata. + pub metadata: CsvDumpMetadata, + /// Session directory path. + pub run_dir: PathBuf, + /// Max in-memory buffer size before auto flush. + pub max_buffer_bytes: usize, + records: Vec, + buffered_bytes: usize, + written_tables: HashSet, + full_headers_by_table: HashMap>, +} + +impl CsvDumpSession { + /// Creates session directory and writes seed metadata file. + pub fn new(metadata: CsvDumpMetadata) -> Result { + Self::new_with_buffer_limit(metadata, get_gt_fuzz_dump_buffer_max_bytes()) + } + + /// Creates session with a custom in-memory buffer limit. + pub fn new_with_buffer_limit( + metadata: CsvDumpMetadata, + max_buffer_bytes: usize, + ) -> Result { + let run_dir = build_run_dir(&metadata); + create_dir_all(&run_dir).context(error::CreateFileSnafu { + path: run_dir.to_string_lossy().to_string(), + })?; + write_seed_meta(&run_dir, &metadata)?; + info!( + "Create csv dump session, target: {}, run_dir: {}, max_buffer_bytes: {}", + metadata.target, + run_dir.display(), + max_buffer_bytes + ); + + Ok(Self { + metadata, + run_dir, + max_buffer_bytes, + records: Vec::new(), + buffered_bytes: 0, + written_tables: HashSet::new(), + full_headers_by_table: HashMap::new(), + }) + } + + /// Appends one table CSV records batch with full table headers. + pub fn append(&mut self, records: CsvRecords, full_headers: Vec) -> Result<()> { + self.full_headers_by_table + .entry(records.table_name.clone()) + .or_insert(full_headers); + self.buffered_bytes += estimate_csv_records_size(&records); + self.records.push(records); + if self.buffered_bytes >= self.max_buffer_bytes { + self.flush_buffered_records()?; + } + Ok(()) + } + + /// Flushes all appended batches to CSV files. + pub fn flush_all(&mut self) -> Result<()> { + self.flush_buffered_records() + } + + /// Removes session directory after successful validation. + pub fn cleanup_on_success(&self) -> std::io::Result<()> { + match remove_dir_all(&self.run_dir) { + Ok(_) => { + info!( + "Cleanup csv dump directory on success: {}", + self.run_dir.display() + ); + Ok(()) + } + Err(err) => { + warn!( + "Cleanup csv dump directory failed: {}, error: {:?}", + self.run_dir.display(), + err + ); + Err(err) + } + } + } + + fn flush_buffered_records(&mut self) -> Result<()> { + if self.records.is_empty() { + return Ok(()); + } + for batch in &self.records { + write_batch_csv( + &self.run_dir, + batch, + &mut self.written_tables, + &self.full_headers_by_table, + )?; + } + self.records.clear(); + self.buffered_bytes = 0; + Ok(()) + } +} + +fn write_seed_meta(run_dir: &Path, metadata: &CsvDumpMetadata) -> Result<()> { + let path = run_dir.join("seed.meta"); + let mut file = File::create(&path).context(error::CreateFileSnafu { + path: path.to_string_lossy().to_string(), + })?; + + let content = format!( + "target={}\nseed={}\nactions={}\npartitions={}\ntables={}\nstarted_at_unix_ms={}\n", + metadata.target, + metadata.seed, + metadata.actions, + metadata.partitions, + metadata.tables, + metadata.started_at_unix_ms, + ); + file.write_all(content.as_bytes()) + .context(error::WriteFileSnafu { + path: path.to_string_lossy().to_string(), + }) +} + +fn write_batch_csv( + run_dir: &Path, + batch: &CsvRecords, + written_tables: &mut HashSet, + full_headers_by_table: &HashMap>, +) -> Result<()> { + let output_headers = full_headers_by_table + .get(&batch.table_name) + .cloned() + .unwrap_or_else(|| batch.headers.clone()); + let file_name = format!("{}.table-data.csv", sanitize_file_name(&batch.table_name)); + let path = run_dir.join(file_name); + let mut file = OpenOptions::new() + .create(true) + .append(true) + .open(&path) + .context(error::CreateFileSnafu { + path: path.to_string_lossy().to_string(), + })?; + + if written_tables.insert(batch.table_name.clone()) { + file.write_all(join_line(&output_headers).as_bytes()) + .context(error::WriteFileSnafu { + path: path.to_string_lossy().to_string(), + })?; + file.write_all(b"\n").context(error::WriteFileSnafu { + path: path.to_string_lossy().to_string(), + })?; + } + + let header_index = batch + .headers + .iter() + .enumerate() + .map(|(idx, header)| (header.as_str(), idx)) + .collect::>(); + + for record in &batch.records { + let aligned_values = output_headers + .iter() + .map(|header| { + header_index + .get(header.as_str()) + .and_then(|idx| record.values.get(*idx)) + .cloned() + .unwrap_or_default() + }) + .collect::>(); + file.write_all(join_line(&aligned_values).as_bytes()) + .context(error::WriteFileSnafu { + path: path.to_string_lossy().to_string(), + })?; + file.write_all(b"\n").context(error::WriteFileSnafu { + path: path.to_string_lossy().to_string(), + })?; + } + + Ok(()) +} + +fn estimate_csv_records_size(records: &CsvRecords) -> usize { + let headers = records.headers.iter().map(String::len).sum::(); + let rows = records + .records + .iter() + .flat_map(|record| record.values.iter()) + .map(String::len) + .sum::(); + headers + rows +} + +fn join_line(cells: &[String]) -> String { + cells + .iter() + .map(|cell| escape_csv_cell(cell)) + .collect::>() + .join(",") +} + +fn escape_csv_cell(value: &str) -> String { + if value.contains([',', '"', '\n', '\r']) { + format!("\"{}\"", value.replace('"', "\"\"")) + } else { + value.to_string() + } +} + +fn sanitize_file_name(raw: &str) -> String { + raw.chars() + .map(|ch| { + if ch.is_ascii_alphanumeric() || ch == '_' || ch == '-' { + ch + } else { + '_' + } + }) + .collect() +} + +fn build_run_dir(metadata: &CsvDumpMetadata) -> PathBuf { + let base = PathBuf::from(get_gt_fuzz_dump_dir()); + let suffix = get_gt_fuzz_dump_suffix(); + let name = format!( + "{}_seed_{}_actions_{}_ts_{}{}", + metadata.target, metadata.seed, metadata.actions, metadata.started_at_unix_ms, suffix + ); + base.join(name) +} + +#[cfg(test)] +mod tests { + use super::{CsvDumpMetadata, CsvDumpSession}; + use crate::translator::csv::{CsvRecord, CsvRecords}; + + #[test] + fn test_create_session_and_flush() { + let mut session = CsvDumpSession::new_with_buffer_limit( + CsvDumpMetadata::new("fuzz_case", 1, 2, 3, 4), + 1024, + ) + .unwrap(); + session + .append( + CsvRecords { + table_name: "metric-a".to_string(), + headers: vec!["host".to_string(), "value".to_string()], + records: vec![CsvRecord { + values: vec!["web-1".to_string(), "10".to_string()], + }], + }, + vec!["host".to_string(), "value".to_string()], + ) + .unwrap(); + session.flush_all().unwrap(); + + assert!(session.run_dir.exists()); + assert!(session.run_dir.join("seed.meta").exists()); + assert!(session.run_dir.join("metric-a.table-data.csv").exists()); + } + + #[test] + fn test_auto_flush_on_buffer_limit() { + let mut session = + CsvDumpSession::new_with_buffer_limit(CsvDumpMetadata::new("fuzz_case", 5, 2, 3, 4), 1) + .unwrap(); + session + .append( + CsvRecords { + table_name: "metric-b".to_string(), + headers: vec!["host".to_string()], + records: vec![CsvRecord { + values: vec!["web-2".to_string()], + }], + }, + vec!["host".to_string()], + ) + .unwrap(); + + assert!(session.run_dir.join("metric-b.table-data.csv").exists()); + assert_eq!(session.buffered_bytes, 0); + } + + #[test] + fn test_flush_with_partial_headers_uses_full_headers() { + let mut session = CsvDumpSession::new_with_buffer_limit( + CsvDumpMetadata::new("fuzz_case", 7, 2, 3, 4), + 1024, + ) + .unwrap(); + session + .append( + CsvRecords { + table_name: "metric-c".to_string(), + headers: vec!["host".to_string(), "value".to_string()], + records: vec![CsvRecord { + values: vec!["web-3".to_string(), "12".to_string()], + }], + }, + vec!["host".to_string(), "idc".to_string(), "value".to_string()], + ) + .unwrap(); + session.flush_all().unwrap(); + + let file = + std::fs::read_to_string(session.run_dir.join("metric-c.table-data.csv")).unwrap(); + let mut lines = file.lines(); + assert_eq!(lines.next().unwrap(), "host,idc,value"); + assert_eq!(lines.next().unwrap(), "web-3,,12"); + } +} diff --git a/tests-fuzz/src/utils/retry.rs b/tests-fuzz/src/utils/retry.rs new file mode 100644 index 0000000000..06d1ede54f --- /dev/null +++ b/tests-fuzz/src/utils/retry.rs @@ -0,0 +1,49 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::future::Future; +use std::time::Duration; + +use common_telemetry::warn; + +pub async fn retry_with_backoff( + mut operation: F, + max_attempts: usize, + init_backoff: Duration, + max_backoff: Duration, +) -> Result +where + F: FnMut() -> Fut, + Fut: Future>, + E: std::fmt::Debug, +{ + let mut backoff = init_backoff; + for attempt in 0..max_attempts { + match operation().await { + Ok(result) => return Ok(result), + Err(err) if attempt + 1 == max_attempts => return Err(err), + Err(err) => { + let current_attempt = attempt + 1; + warn!( + "Retryable operation failed, attempt: {}, max_attempts: {}, backoff: {:?}, error: {:?}", + current_attempt, max_attempts, backoff, err + ); + tokio::time::sleep(backoff).await; + backoff = std::cmp::min(backoff * 2, max_backoff); + } + } + } + + panic!("retry loop should always return") +} diff --git a/tests-fuzz/src/utils/sql_dump_writer.rs b/tests-fuzz/src/utils/sql_dump_writer.rs new file mode 100644 index 0000000000..6f098d9584 --- /dev/null +++ b/tests-fuzz/src/utils/sql_dump_writer.rs @@ -0,0 +1,267 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::fs::{OpenOptions, create_dir_all}; +use std::io::Write; +use std::path::PathBuf; + +use snafu::ResultExt; + +use crate::error::{self, Result}; +use crate::utils::get_gt_fuzz_dump_buffer_max_bytes; + +/// Session writer for table-scoped SQL trace files. +#[derive(Debug)] +pub struct SqlDumpSession { + /// Session directory path. + pub run_dir: PathBuf, + /// Max in-memory buffer size before auto flush. + pub max_buffer_bytes: usize, + buffered_bytes: usize, + entries_by_table: HashMap>, +} + +impl SqlDumpSession { + /// Creates SQL dump session with default buffer limit. + pub fn new(run_dir: PathBuf) -> Result { + Self::new_with_buffer_limit(run_dir, get_gt_fuzz_dump_buffer_max_bytes()) + } + + /// Creates SQL dump session with custom buffer limit. + pub fn new_with_buffer_limit(run_dir: PathBuf, max_buffer_bytes: usize) -> Result { + create_dir_all(&run_dir).context(error::CreateFileSnafu { + path: run_dir.to_string_lossy().to_string(), + })?; + + Ok(Self { + run_dir, + max_buffer_bytes, + buffered_bytes: 0, + entries_by_table: HashMap::new(), + }) + } + + /// Appends one SQL statement for a logical table. + pub fn append_sql(&mut self, table: &str, sql: &str, comment: Option<&str>) -> Result<()> { + let entry = format_sql_entry(sql, comment); + self.push_entry(table, entry)?; + Ok(()) + } + + /// Broadcasts one comment event to all table trace files. + pub fn broadcast_event(&mut self, tables: I, event: &str, sql: &str) -> Result<()> + where + I: IntoIterator, + T: AsRef, + { + let entry = format_sql_entry(sql, Some(event)); + for table in tables { + self.push_entry(table.as_ref(), entry.clone())?; + } + Ok(()) + } + + /// Flushes all staged SQL traces to table-scoped files. + pub fn flush_all(&mut self) -> Result<()> { + self.flush_buffered_entries() + } + + fn push_entry(&mut self, table: &str, entry: String) -> Result<()> { + self.buffered_bytes += entry.len(); + self.entries_by_table + .entry(table.to_string()) + .or_default() + .push(entry); + + if self.buffered_bytes >= self.max_buffer_bytes { + self.flush_buffered_entries()?; + } + Ok(()) + } + + fn flush_buffered_entries(&mut self) -> Result<()> { + if self.entries_by_table.is_empty() { + return Ok(()); + } + + for (table, entries) in &self.entries_by_table { + let path = self + .run_dir + .join(format!("{}.trace.sql", sanitize_file_name(table))); + let mut file = OpenOptions::new() + .create(true) + .append(true) + .open(&path) + .context(error::CreateFileSnafu { + path: path.to_string_lossy().to_string(), + })?; + + for entry in entries { + file.write_all(entry.as_bytes()) + .context(error::WriteFileSnafu { + path: path.to_string_lossy().to_string(), + })?; + file.write_all(b"\n").context(error::WriteFileSnafu { + path: path.to_string_lossy().to_string(), + })?; + } + } + + self.entries_by_table.clear(); + self.buffered_bytes = 0; + Ok(()) + } +} + +fn format_sql_entry(sql: &str, comment: Option<&str>) -> String { + let normalized_sql = normalize_sql(sql); + if let Some(comment) = comment { + format!("{}\n{normalized_sql}", format_comment(comment)) + } else { + normalized_sql + } +} + +fn format_comment(comment: &str) -> String { + comment + .lines() + .map(|line| format!("-- {line}")) + .collect::>() + .join("\n") +} + +fn normalize_sql(sql: &str) -> String { + let trimmed = sql.trim_end(); + if trimmed.ends_with(';') { + trimmed.to_string() + } else { + format!("{trimmed};") + } +} + +fn sanitize_file_name(raw: &str) -> String { + raw.chars() + .map(|ch| { + if ch.is_ascii_alphanumeric() || ch == '_' || ch == '-' { + ch + } else { + '_' + } + }) + .collect() +} + +#[cfg(test)] +mod tests { + use std::time::{SystemTime, UNIX_EPOCH}; + + use super::SqlDumpSession; + + #[test] + fn test_append_sql_writes_table_trace_file() { + let run_dir = std::env::temp_dir().join(format!( + "tests-fuzz-sql-dump-{}", + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() + )); + + let mut session = SqlDumpSession::new_with_buffer_limit(run_dir.clone(), 1024).unwrap(); + session + .append_sql( + "metric-a", + "INSERT INTO t VALUES(1)", + Some("kind=insert elapsed_ms=10"), + ) + .unwrap(); + session.flush_all().unwrap(); + + let content = std::fs::read_to_string(run_dir.join("metric-a.trace.sql")).unwrap(); + assert!(content.contains("-- kind=insert elapsed_ms=10")); + assert!(content.contains("INSERT INTO t VALUES(1);")); + } + + #[test] + fn test_broadcast_event_writes_to_all_tables() { + let run_dir = std::env::temp_dir().join(format!( + "tests-fuzz-sql-broadcast-{}", + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() + )); + + let mut session = SqlDumpSession::new_with_buffer_limit(run_dir.clone(), 1024).unwrap(); + session + .broadcast_event( + ["metric-a", "metric-b"], + "repartition action_idx=3", + "ALTER TABLE t REPARTITION", + ) + .unwrap(); + session.flush_all().unwrap(); + + let content_a = std::fs::read_to_string(run_dir.join("metric-a.trace.sql")).unwrap(); + let content_b = std::fs::read_to_string(run_dir.join("metric-b.trace.sql")).unwrap(); + assert!(content_a.contains("-- repartition action_idx=3")); + assert!(content_a.contains("ALTER TABLE t REPARTITION;")); + assert!(content_b.contains("-- repartition action_idx=3")); + assert!(content_b.contains("ALTER TABLE t REPARTITION;")); + } + + #[test] + fn test_multiline_comment_is_prefixed_per_line() { + let run_dir = std::env::temp_dir().join(format!( + "tests-fuzz-sql-dump-comment-{}", + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() + )); + + let mut session = SqlDumpSession::new_with_buffer_limit(run_dir.clone(), 1024).unwrap(); + session + .append_sql( + "metric-a", + "INSERT INTO t VALUES(1)", + Some("kind=insert\nstarted_at_ms=1 elapsed_ms=2"), + ) + .unwrap(); + session.flush_all().unwrap(); + + let content = std::fs::read_to_string(run_dir.join("metric-a.trace.sql")).unwrap(); + assert!(content.contains("-- kind=insert\n-- started_at_ms=1 elapsed_ms=2")); + } + + #[test] + fn test_auto_flush_on_buffer_limit() { + let run_dir = std::env::temp_dir().join(format!( + "tests-fuzz-sql-dump-limit-{}", + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() + )); + + let mut session = SqlDumpSession::new_with_buffer_limit(run_dir.clone(), 1).unwrap(); + session + .append_sql("metric-a", "INSERT INTO t VALUES(1)", None) + .unwrap(); + + assert!(run_dir.join("metric-a.trace.sql").exists()); + assert_eq!(session.buffered_bytes, 0); + } +} diff --git a/tests-fuzz/targets/ddl/fuzz_repartition_metric_table.rs b/tests-fuzz/targets/ddl/fuzz_repartition_metric_table.rs new file mode 100644 index 0000000000..7932bc7759 --- /dev/null +++ b/tests-fuzz/targets/ddl/fuzz_repartition_metric_table.rs @@ -0,0 +1,684 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![no_main] + +use std::collections::{BTreeMap, HashMap}; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; + +use arbitrary::{Arbitrary, Unstructured}; +use common_telemetry::{info, warn}; +use common_time::Timestamp; +use common_time::util::current_time_millis; +use libfuzzer_sys::fuzz_target; +use rand::{Rng, SeedableRng}; +use rand_chacha::ChaChaRng; +use snafu::{ResultExt, ensure}; +use sqlx::{MySql, Pool}; +use tests_fuzz::context::{TableContext, TableContextRef}; +use tests_fuzz::error::{self, Result}; +use tests_fuzz::fake::{ + ConstGenerator, MappedGenerator, WordGenerator, merge_two_word_map_fn, random_capitalize_map, + uppercase_and_keyword_backtick_map, +}; +use tests_fuzz::generator::Generator; +use tests_fuzz::generator::create_expr::{ + CreateLogicalTableExprGeneratorBuilder, CreatePhysicalTableExprGeneratorBuilder, +}; +use tests_fuzz::generator::insert_expr::InsertExprGeneratorBuilder; +use tests_fuzz::generator::repartition_expr::{ + MergePartitionExprGeneratorBuilder, SplitPartitionExprGeneratorBuilder, +}; +use tests_fuzz::ir::{ + CreateTableExpr, Ident, InsertIntoExpr, RepartitionExpr, generate_random_value, + generate_unique_timestamp_for_mysql_with_clock, +}; +use tests_fuzz::translator::DslTranslator; +use tests_fuzz::translator::csv::InsertExprToCsvRecordsTranslator; +use tests_fuzz::translator::mysql::create_expr::CreateTableExprTranslator; +use tests_fuzz::translator::mysql::insert_expr::InsertIntoExprTranslator; +use tests_fuzz::translator::mysql::repartition_expr::RepartitionExprTranslator; +use tests_fuzz::utils::csv_dump_writer::{CsvDumpMetadata, CsvDumpSession}; +use tests_fuzz::utils::retry::retry_with_backoff; +use tests_fuzz::utils::sql_dump_writer::SqlDumpSession; +use tests_fuzz::utils::{ + Connections, get_fuzz_override, get_gt_fuzz_input_max_alter_actions, + get_gt_fuzz_input_max_tables, init_greptime_connections_via_env, +}; +use tests_fuzz::validator::row::count_values; +use tokio::sync::{mpsc, oneshot}; + +const BARRIER_ACK_TIMEOUT_SECS: u64 = 10; +const VALIDATE_QUERY_MAX_ATTEMPTS: usize = 6; +const VALIDATE_QUERY_INIT_BACKOFF: Duration = Duration::from_millis(50); +const VALIDATE_QUERY_MAX_BACKOFF: Duration = Duration::from_millis(800); + +#[derive(Clone)] +struct FuzzContext { + greptime: Pool, +} + +impl FuzzContext { + async fn close(self) { + self.greptime.close().await; + } +} + +#[derive(Clone, Debug)] +struct FuzzInput { + seed: u64, + actions: usize, + partitions: usize, + tables: usize, +} + +fn generate_create_physical_table_expr( + partitions: usize, + rng: &mut R, +) -> Result { + CreatePhysicalTableExprGeneratorBuilder::default() + .name_generator(Box::new(ConstGenerator::new(Ident::new( + "fuzz_repartition_metric_physical", + )))) + .if_not_exists(rng.random_bool(0.5)) + .partition(partitions) + .build() + .unwrap() + .generate(rng) +} + +fn generate_create_logical_table_expr( + physical_table_ctx: TableContextRef, + include_partition_column: bool, + rng: &mut R, +) -> Result { + CreateLogicalTableExprGeneratorBuilder::default() + .name_generator(Box::new(MappedGenerator::new( + WordGenerator, + merge_two_word_map_fn(random_capitalize_map, uppercase_and_keyword_backtick_map), + ))) + .physical_table_ctx(physical_table_ctx) + .labels(rng.random_range(1..=5)) + .if_not_exists(rng.random_bool(0.5)) + .include_partition_column(include_partition_column) + .build() + .unwrap() + .generate(rng) +} + +fn generate_insert_expr( + rows: usize, + rng: &mut R, + table_ctx: TableContextRef, + clock: Arc>, +) -> Result { + let ts_value_generator = generate_unique_timestamp_for_mysql_with_clock(clock); + InsertExprGeneratorBuilder::default() + .omit_column_list(false) + .table_ctx(table_ctx) + .rows(rows) + .value_generator(Box::new(generate_random_value)) + .ts_value_generator(ts_value_generator) + .build() + .unwrap() + .generate(rng) +} + +async fn create_metric_tables( + ctx: &FuzzContext, + rng: &mut R, + partitions: usize, + table_count: usize, +) -> Result<( + TableContextRef, + BTreeMap, + HashMap, + String, +)> { + let create_physical_expr = generate_create_physical_table_expr(partitions, rng)?; + let translator = CreateTableExprTranslator; + let create_physical_sql = translator.translate(&create_physical_expr)?; + let result = sqlx::query(&create_physical_sql) + .execute(&ctx.greptime) + .await + .context(error::ExecuteQuerySnafu { + sql: &create_physical_sql, + })?; + info!("Create physical table: {create_physical_sql}, result: {result:?}"); + let physical_table_ctx = Arc::new(TableContext::from(&create_physical_expr)); + ensure!( + physical_table_ctx.partition.is_some(), + error::AssertSnafu { + reason: "Physical metric table must have partition".to_string() + } + ); + + let mut logical_tables = BTreeMap::new(); + let mut create_logical_sqls = HashMap::new(); + let max_attempts = table_count * 3; + for _ in 0..max_attempts { + if logical_tables.len() >= table_count { + break; + } + + let include_partition_column = rng.random_bool(0.5); + let create_logical_expr = generate_create_logical_table_expr( + physical_table_ctx.clone(), + include_partition_column, + rng, + )?; + if logical_tables.contains_key(&create_logical_expr.table_name) { + continue; + } + + let create_logical_sql = translator.translate(&create_logical_expr)?; + let result = sqlx::query(&create_logical_sql) + .execute(&ctx.greptime) + .await + .context(error::ExecuteQuerySnafu { + sql: &create_logical_sql, + })?; + info!("Create logical table: {create_logical_sql}, result: {result:?}"); + let logical_ctx = Arc::new(TableContext::from(&create_logical_expr)); + create_logical_sqls.insert(logical_ctx.name.to_string(), create_logical_sql); + logical_tables.insert(logical_ctx.name.clone(), logical_ctx); + } + + ensure!( + !logical_tables.is_empty(), + error::AssertSnafu { + reason: "No logical table created".to_string() + } + ); + + Ok(( + physical_table_ctx, + logical_tables, + create_logical_sqls, + create_physical_sql, + )) +} + +async fn execute_insert_with_retry(ctx: &FuzzContext, sql: &str) -> Result<()> { + let mut delay = Duration::from_millis(100); + let mut attempt = 0; + let max_attempts = 10; + loop { + match sqlx::query(sql) + .persistent(false) + .execute(&ctx.greptime) + .await + { + Ok(_) => return Ok(()), + Err(err) => { + tokio::time::sleep(delay).await; + delay = std::cmp::min(delay * 2, Duration::from_secs(1)); + attempt += 1; + warn!("Execute insert with retry: {sql}, attempt: {attempt}, error: {err:?}"); + if attempt >= max_attempts { + return Err(err).context(error::ExecuteQuerySnafu { sql }); + } + } + } + } +} + +struct SharedState { + clock: Arc>, + inserted_rows: HashMap, + csv_dump_session: Option, + sql_dump_session: Option, + running: bool, +} + +enum WriterControl { + Barrier { + epoch: usize, + ack: oneshot::Sender<()>, + }, + Resume { + epoch: usize, + }, + Stop, +} + +fn handle_writer_control(control: WriterControl, paused: &mut bool) -> bool { + match control { + WriterControl::Barrier { epoch, ack } => { + info!("Writer received barrier control, epoch: {epoch}"); + *paused = true; + let _ = ack.send(()); + false + } + WriterControl::Resume { epoch } => { + info!("Writer received resume control, epoch: {epoch}"); + *paused = false; + false + } + WriterControl::Stop => { + info!("Writer received stop control"); + true + } + } +} + +async fn write_loop( + mut rng: R, + ctx: FuzzContext, + logical_tables: BTreeMap, + shared_state: Arc>, + mut control_rx: mpsc::UnboundedReceiver, +) -> Result<()> { + info!("Start write loop"); + let mut paused = false; + loop { + while let Ok(control) = control_rx.try_recv() { + if handle_writer_control(control, &mut paused) { + return Ok(()); + } + } + + if paused { + match control_rx.recv().await { + Some(control) => { + if handle_writer_control(control, &mut paused) { + return Ok(()); + } + } + None => return Ok(()), + } + continue; + } + + let (running, clock) = { + let state = shared_state.lock().unwrap(); + (state.running, state.clock.clone()) + }; + if !running { + break; + } + + for table_ctx in logical_tables.values() { + let rows = rng.random_range(1..=3); + let insert_expr = + generate_insert_expr(rows, &mut rng, table_ctx.clone(), clock.clone())?; + let translator = InsertIntoExprTranslator; + let sql = translator.translate(&insert_expr)?; + let inserted = insert_expr.values_list.len() as u64; + let csv_records = InsertExprToCsvRecordsTranslator.translate(&insert_expr)?; + let table_name = table_ctx.name.to_string(); + let full_headers = table_ctx + .columns + .iter() + .map(|column| column.name.value.clone()) + .collect::>(); + + let started_at_ms = current_time_millis(); + let now = Instant::now(); + execute_insert_with_retry(&ctx, &sql).await?; + let elapsed = now.elapsed(); + info!("Execute insert sql: {sql}, elapsed: {elapsed:?}"); + + let mut state = shared_state.lock().unwrap(); + if let Some(csv_dump_session) = state.csv_dump_session.as_mut() { + csv_dump_session.append(csv_records, full_headers)?; + } + if let Some(sql_dump_session) = state.sql_dump_session.as_mut() { + let comment = format!( + "kind=insert table={} started_at_ms={} elapsed_ms={}", + table_name, + started_at_ms, + elapsed.as_millis() + ); + sql_dump_session.append_sql(&table_name, &sql, Some(&comment))?; + } + *state.inserted_rows.entry(table_name).or_insert(0) += inserted; + } + + tokio::time::sleep(Duration::from_millis(100)).await; + } + info!("Write loop ended"); + + Ok(()) +} + +async fn validate_rows( + ctx: &FuzzContext, + logical_tables: &BTreeMap, + inserted_rows: &HashMap, +) -> Result<()> { + for table_ctx in logical_tables.values() { + let expected = *inserted_rows.get(&table_ctx.name.to_string()).unwrap_or(&0) as usize; + let count_sql = format!("SELECT COUNT(1) AS count FROM {}", table_ctx.name); + let count = retry_with_backoff( + || count_values(&ctx.greptime, &count_sql), + VALIDATE_QUERY_MAX_ATTEMPTS, + VALIDATE_QUERY_INIT_BACKOFF, + VALIDATE_QUERY_MAX_BACKOFF, + ) + .await?; + let distinct_count_sql = format!( + "SELECT COUNT(DISTINCT {}) AS count FROM {}", + table_ctx.timestamp_column().unwrap().name, + table_ctx.name + ); + let distinct_count = retry_with_backoff( + || count_values(&ctx.greptime, &distinct_count_sql), + VALIDATE_QUERY_MAX_ATTEMPTS, + VALIDATE_QUERY_INIT_BACKOFF, + VALIDATE_QUERY_MAX_BACKOFF, + ) + .await?; + info!( + "Validate rows for table: {}, expected: {}, count: {}, distinct_count: {}", + table_ctx.name, expected, count.count as usize, distinct_count.count as usize + ); + assert_eq!(count.count as usize, expected); + + assert_eq!(distinct_count.count as usize, expected); + } + Ok(()) +} + +fn flush_dump_sessions_and_snapshot( + shared_state: &Arc>, +) -> Result> { + let mut state = shared_state.lock().unwrap(); + if let Some(csv_dump_session) = state.csv_dump_session.as_mut() { + csv_dump_session.flush_all()?; + } + if let Some(sql_dump_session) = state.sql_dump_session.as_mut() { + sql_dump_session.flush_all()?; + } + Ok(state.inserted_rows.clone()) +} + +async fn cleanup_tables( + ctx: &FuzzContext, + physical_table_ctx: &TableContextRef, + logical_tables: &BTreeMap, +) -> Result<()> { + for table_ctx in logical_tables.values() { + let drop_logical_sql = format!("DROP TABLE {}", table_ctx.name); + let result = sqlx::query(&drop_logical_sql) + .execute(&ctx.greptime) + .await + .context(error::ExecuteQuerySnafu { + sql: &drop_logical_sql, + })?; + info!("Drop logical table: {drop_logical_sql}, result: {result:?}"); + } + + let drop_physical_sql = format!("DROP TABLE {}", physical_table_ctx.name); + let result = sqlx::query(&drop_physical_sql) + .execute(&ctx.greptime) + .await + .context(error::ExecuteQuerySnafu { + sql: &drop_physical_sql, + })?; + info!("Drop physical table: {drop_physical_sql}, result: {result:?}"); + Ok(()) +} + +fn repartition_operation( + table_ctx: &TableContextRef, + rng: &mut R, +) -> Result { + let split = rng.random_bool(0.5); + if table_ctx.partition.as_ref().unwrap().exprs.len() <= 2 || split { + let expr = SplitPartitionExprGeneratorBuilder::default() + .table_ctx(table_ctx.clone()) + .build() + .unwrap() + .generate(rng)?; + Ok(RepartitionExpr::Split(expr)) + } else { + let expr = MergePartitionExprGeneratorBuilder::default() + .table_ctx(table_ctx.clone()) + .build() + .unwrap() + .generate(rng)?; + Ok(RepartitionExpr::Merge(expr)) + } +} + +impl Arbitrary<'_> for FuzzInput { + fn arbitrary(u: &mut Unstructured<'_>) -> arbitrary::Result { + let seed = get_fuzz_override::("SEED").unwrap_or(u.int_in_range(u64::MIN..=u64::MAX)?); + let mut rng = ChaChaRng::seed_from_u64(seed); + let partitions = + get_fuzz_override::("PARTITIONS").unwrap_or_else(|| rng.random_range(2..8)); + let max_tables = get_gt_fuzz_input_max_tables(); + let tables = get_fuzz_override::("TABLES") + .unwrap_or_else(|| rng.random_range(1..=std::cmp::max(1, max_tables))); + let max_actions = get_gt_fuzz_input_max_alter_actions(); + let actions = get_fuzz_override::("ACTIONS") + .unwrap_or_else(|| rng.random_range(1..max_actions)); + + Ok(FuzzInput { + seed, + actions, + partitions, + tables, + }) + } +} + +async fn execute_repartition_metric_table(ctx: FuzzContext, input: FuzzInput) -> Result<()> { + info!("input: {input:?}"); + let mut rng = ChaChaRng::seed_from_u64(input.seed); + let clock = Arc::new(Mutex::new(Timestamp::current_millis())); + + let (mut physical_table_ctx, logical_tables, create_logical_sqls, create_physical_sql) = + create_metric_tables(&ctx, &mut rng, input.partitions, input.tables).await?; + + let mut inserted_rows = HashMap::with_capacity(logical_tables.len()); + for table_ctx in logical_tables.values() { + inserted_rows.insert(table_ctx.name.to_string(), 0); + } + let csv_dump_session = CsvDumpSession::new(CsvDumpMetadata::new( + "fuzz_repartition_metric_table", + input.seed, + input.actions, + input.partitions, + input.tables, + ))?; + let sql_dump_session = SqlDumpSession::new(csv_dump_session.run_dir.clone())?; + let logical_table_names = logical_tables + .values() + .map(|table_ctx| table_ctx.name.to_string()) + .collect::>(); + + let mut sql_dump_session = sql_dump_session; + sql_dump_session.append_sql( + &physical_table_ctx.name.to_string(), + &create_physical_sql, + Some("kind=create_physical_table"), + )?; + for table_name in &logical_table_names { + if let Some(create_sql) = create_logical_sqls.get(table_name) { + sql_dump_session.append_sql( + table_name, + create_sql, + Some("kind=create_logical_table"), + )?; + } + } + + let shared_state = Arc::new(Mutex::new(SharedState { + clock, + inserted_rows, + csv_dump_session: Some(csv_dump_session), + sql_dump_session: Some(sql_dump_session), + running: true, + })); + let writer_rng = ChaChaRng::seed_from_u64(input.seed ^ 0xA5A5_A5A5_A5A5_A5A5); + let (control_tx, control_rx) = mpsc::unbounded_channel::(); + let writer_task = tokio::spawn(write_loop( + writer_rng, + ctx.clone(), + logical_tables.clone(), + shared_state.clone(), + control_rx, + )); + tokio::time::sleep(Duration::from_millis(100)).await; + + for i in 0..input.actions { + let partition_num = physical_table_ctx.partition.as_ref().unwrap().exprs.len(); + info!( + "partition_num: {partition_num}, action: {}/{}, table: {}, logical table num: {}", + i + 1, + input.actions, + physical_table_ctx.name, + logical_tables.len() + ); + + let repartition_expr = repartition_operation(&physical_table_ctx, &mut rng)?; + let translator = RepartitionExprTranslator; + let sql = translator.translate(&repartition_expr)?; + info!("Repartition sql: {sql}"); + let started_at_ms = current_time_millis(); + let now = Instant::now(); + let result = sqlx::query(&sql) + .execute(&ctx.greptime) + .await + .context(error::ExecuteQuerySnafu { sql: &sql })?; + let elapsed = now.elapsed(); + info!("Repartition result: {result:?}, elapsed: {elapsed:?}"); + + physical_table_ctx = Arc::new( + Arc::unwrap_or_clone(physical_table_ctx) + .repartition(repartition_expr) + .unwrap(), + ); + + let partition_entries = tests_fuzz::validator::partition::fetch_partitions_info_schema( + &ctx.greptime, + "public".into(), + &physical_table_ctx.name, + ) + .await?; + tests_fuzz::validator::partition::assert_partitions( + physical_table_ctx.partition.as_ref().unwrap(), + &partition_entries, + )?; + + { + let mut state = shared_state.lock().unwrap(); + if let Some(sql_dump_session) = state.sql_dump_session.as_mut() { + let repartition_comment = format!( + "kind=repartition table={} action_idx={} started_at_ms={} elapsed_ms={}", + physical_table_ctx.name, + i + 1, + started_at_ms, + elapsed.as_millis() + ); + sql_dump_session.append_sql( + &physical_table_ctx.name.to_string(), + &sql, + Some(&repartition_comment), + )?; + let event = format!( + "repartition action_idx={} started_at_ms={} elapsed_ms={} sql={}", + i + 1, + started_at_ms, + elapsed.as_millis(), + sql + ); + sql_dump_session.broadcast_event(logical_table_names.iter(), &event, &sql)?; + } + } + + let (ack_tx, ack_rx) = oneshot::channel(); + control_tx + .send(WriterControl::Barrier { + epoch: i + 1, + ack: ack_tx, + }) + .expect("barrier control send must succeed"); + tokio::time::timeout(Duration::from_secs(BARRIER_ACK_TIMEOUT_SECS), ack_rx) + .await + .expect("barrier ack timeout") + .expect("barrier ack dropped"); + + let inserted_rows_snapshot = flush_dump_sessions_and_snapshot(&shared_state)?; + info!("validate rows, epoch: {}", i + 1); + validate_rows(&ctx, &logical_tables, &inserted_rows_snapshot).await?; + + control_tx + .send(WriterControl::Resume { epoch: i + 1 }) + .expect("resume control send must succeed"); + } + + let _ = control_tx.send(WriterControl::Stop); + shared_state.lock().unwrap().running = false; + writer_task.await.unwrap().unwrap(); + let inserted_rows = flush_dump_sessions_and_snapshot(&shared_state)?; + let (mut csv_dump_session, mut sql_dump_session) = { + let mut state = shared_state.lock().unwrap(); + (state.csv_dump_session.take(), state.sql_dump_session.take()) + }; + + let run_result = async { + validate_rows(&ctx, &logical_tables, &inserted_rows).await?; + cleanup_tables(&ctx, &physical_table_ctx, &logical_tables).await?; + Ok(()) + } + .await; + + if let Some(csv_dump_session) = csv_dump_session.take() { + match &run_result { + Ok(_) => { + if let Err(err) = csv_dump_session.cleanup_on_success() { + warn!( + "Cleanup csv dump directory failed, path: {}, error: {:?}", + csv_dump_session.run_dir.display(), + err + ); + } + } + Err(_) => { + warn!( + "Keep csv dump directory for failure analysis, path: {}", + csv_dump_session.run_dir.display() + ); + } + } + } + if let Some(sql_dump_session) = sql_dump_session.take() + && run_result.is_err() + { + warn!( + "Keep sql dump directory for failure analysis, path: {}", + sql_dump_session.run_dir.display() + ); + } + + ctx.close().await; + run_result +} + +fuzz_target!(|input: FuzzInput| { + common_telemetry::init_default_ut_logging(); + common_runtime::block_on_global(async { + let Connections { mysql } = init_greptime_connections_via_env().await; + let ctx = FuzzContext { + greptime: mysql.expect("mysql connection init must be succeed"), + }; + execute_repartition_metric_table(ctx, input) + .await + .unwrap_or_else(|err| panic!("fuzz test must be succeed: {err:?}")); + }) +});