From 20f38d8a6aabeb905e2f4a0c21743ad98fa7aee2 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Fri, 13 Mar 2026 16:00:09 +0800 Subject: [PATCH] test(fuzz): add metric table repartition fuzz target (#7754) * test: add fuzz_repartition_metric_table target scaffold Signed-off-by: WenyXu * test: add metric logical lifecycle in repartition fuzz target Signed-off-by: WenyXu * test: support partitioned metric tables in repartition fuzz Signed-off-by: WenyXu * test: add repartition loop and partition assertions for metric target Signed-off-by: WenyXu * test: use shared timestamp clock in metric repartition writes Signed-off-by: WenyXu * refactor: unify string value and bound generation for fuzzing Signed-off-by: WenyXu * test: use fixed physical table name in metric repartition fuzz Signed-off-by: WenyXu * chore: fmt Signed-off-by: WenyXu * ci: update ci config Signed-off-by: WenyXu * refactor: use btreemap Signed-off-by: WenyXu * print count result Signed-off-by: WenyXu * test: add csv translator for insert expr Introduce a dedicated top-level csv translator so fuzz insert expressions can be converted into writer-ready records through a structured path instead of ad-hoc formatting in targets. Signed-off-by: WenyXu * test: add csv dump session utilities Introduce CSV dump env helpers and a session writer that creates run directories, emits seed metadata, and flushes staged CSV records for fuzz workflows. Signed-off-by: WenyXu * test: bound csv dump buffer with auto flush Parse readable buffer sizes from env and flush staged CSV records automatically when the in-memory threshold is reached to prevent unbounded growth during long fuzz runs. Signed-off-by: WenyXu * test: flush csv dump before repartition validation Wire csv dump session into the metric repartition fuzz flow so successful inserts are translated from insert expressions into CSV records during write loops and flushed to disk right before row validation. Signed-off-by: WenyXu * test: keep csv dumps on failure and cleanup on pass Capture run outcomes in metric repartition fuzz, remove dump directories only after successful validation, and retain dump paths on failures so CI and local investigations can use the same artifacts. Signed-off-by: WenyXu * test: align partial csv records with table headers Keep append payload compact by storing partial insert-expression columns, then expand to full table-context headers at flush time and fill missing values with empty strings. Signed-off-by: WenyXu * chore: add logs Signed-off-by: WenyXu * dump csv Signed-off-by: WenyXu * ci: dump csv Signed-off-by: WenyXu * refactor Signed-off-by: WenyXu * test: add table-scoped sql dump writer primitives Signed-off-by: WenyXu * test: capture table-scoped sql traces after execution Record insert and repartition SQL only after successful execution, include started_at_ms and elapsed_ms in trace comments, and broadcast repartition events into every logical-table trace file for consistent debugging context. Signed-off-by: WenyXu * test: harden sql trace comments and include create sql Normalize multiline trace comments into valid SQL comment lines and append logical-table CREATE SQL to per-table traces for better timeline reconstruction during repartition debugging. Signed-off-by: WenyXu * test: dump physical create and repartition SQL traces Signed-off-by: WenyXu * dump repartition sql Signed-off-by: WenyXu * test: scaffold writer control channel for barrier flow Add Barrier/Resume/Stop control skeleton and channel wiring in write_loop to prepare per-repartition validation barriers. Also align SQL dump tests with broadcast SQL payload behavior. Signed-off-by: WenyXu * test: implement writer barrier pause and resume control Make writer control messages effective by pausing writes on barrier, resuming on resume, and stopping via channel signaling so the next commit can enforce deterministic per-repartition validation boundaries. Signed-off-by: WenyXu * test: validate rows after each repartition barrier Add per-action barrier/ack synchronization with timeout, run immediate logical-table row validation after each repartition, and resume writer only after validation completes to improve minimal failure localization. Signed-off-by: WenyXu * test: flush dump sessions before per-epoch validation Extract a shared flush-and-snapshot helper and call it before each immediate row validation so CSV/SQL artifacts are persisted at the same epoch boundary being validated. Signed-off-by: WenyXu * fix: fix unit tests Signed-off-by: WenyXu * chore: add retry Signed-off-by: WenyXu * chore: apply suggestions from CR Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- .github/workflows/develop.yml | 16 +- tests-fuzz/Cargo.toml | 7 + tests-fuzz/README.md | 20 + tests-fuzz/src/fake.rs | 20 + tests-fuzz/src/generator/create_expr.rs | 129 +++- tests-fuzz/src/ir.rs | 37 +- tests-fuzz/src/ir/partition_expr.rs | 6 +- tests-fuzz/src/ir/string_value.rs | 162 +++++ tests-fuzz/src/translator.rs | 2 + tests-fuzz/src/translator/csv.rs | 121 ++++ tests-fuzz/src/utils.rs | 42 ++ tests-fuzz/src/utils/csv_dump_writer.rs | 383 ++++++++++ tests-fuzz/src/utils/retry.rs | 49 ++ tests-fuzz/src/utils/sql_dump_writer.rs | 267 +++++++ .../ddl/fuzz_repartition_metric_table.rs | 684 ++++++++++++++++++ 15 files changed, 1892 insertions(+), 53 deletions(-) create mode 100644 tests-fuzz/src/ir/string_value.rs create mode 100644 tests-fuzz/src/translator/csv.rs create mode 100644 tests-fuzz/src/utils/csv_dump_writer.rs create mode 100644 tests-fuzz/src/utils/retry.rs create mode 100644 tests-fuzz/src/utils/sql_dump_writer.rs create mode 100644 tests-fuzz/targets/ddl/fuzz_repartition_metric_table.rs diff --git a/.github/workflows/develop.yml b/.github/workflows/develop.yml index 0238e92c8d..b6ab0f8926 100644 --- a/.github/workflows/develop.yml +++ b/.github/workflows/develop.yml @@ -319,7 +319,13 @@ jobs: include: - target: "fuzz_repartition_table" mode: - name: "Local WAL Repartition GC" + name: "Local WAL mito table repartition" + minio: true + kafka: false + values: "with-minio-repartition-gc.yaml" + - target: "fuzz_repartition_metric_table" + mode: + name: "Local WAL metric table repartition" minio: true kafka: false values: "with-minio-repartition-gc.yaml" @@ -455,6 +461,14 @@ jobs: path: /tmp/fuzz-monitor-dumps if-no-files-found: warn retention-days: 3 + - name: Upload CSV dumps + if: failure() + uses: actions/upload-artifact@v4 + with: + name: fuzz-tests-csv-dumps-${{ matrix.mode.name }}-${{ matrix.target }} + path: /tmp/greptime-fuzz-dumps + if-no-files-found: warn + retention-days: 3 - name: Delete cluster if: success() shell: bash diff --git a/tests-fuzz/Cargo.toml b/tests-fuzz/Cargo.toml index a537ca0687..bc687092c0 100644 --- a/tests-fuzz/Cargo.toml +++ b/tests-fuzz/Cargo.toml @@ -100,6 +100,13 @@ test = false bench = false doc = false +[[bin]] +name = "fuzz_repartition_metric_table" +path = "targets/ddl/fuzz_repartition_metric_table.rs" +test = false +bench = false +doc = false + [[bin]] name = "fuzz_alter_table" path = "targets/ddl/fuzz_alter_table.rs" diff --git a/tests-fuzz/README.md b/tests-fuzz/README.md index 6807e19a1c..cc9d7eb84e 100644 --- a/tests-fuzz/README.md +++ b/tests-fuzz/README.md @@ -66,3 +66,23 @@ GT_FUZZ_OVERRIDE_SEED=6666 GT_FUZZ_OVERRIDE_ACTIONS=175 cargo fuzz run fuzz_targ ``` For more details, visit [cargo fuzz](https://rust-fuzz.github.io/book/cargo-fuzz/tutorial.html) or run the command `cargo fuzz --help`. + +## Repartition Metric Dump Artifacts + +For `fuzz_repartition_metric_table`, dump artifacts are written under one run directory. + +- Table data snapshots: `.table-data.csv` +- SQL traces per logical table: `.trace.sql` +- Seed metadata: `seed.meta` + +SQL trace behavior: + +- Insert SQL is appended after successful execution with comment fields including + `started_at_ms` and `elapsed_ms`. +- Repartition events are broadcast to all logical table trace files with comment fields including + `action_idx`, `started_at_ms`, `elapsed_ms`, and SQL text. + +Run directory lifecycle: + +- On success, the run directory is cleaned up. +- On failure, the run directory is retained for CI/local diffing. diff --git a/tests-fuzz/src/fake.rs b/tests-fuzz/src/fake.rs index aa92e0293a..8910a39206 100644 --- a/tests-fuzz/src/fake.rs +++ b/tests-fuzz/src/fake.rs @@ -65,6 +65,26 @@ where _v: PhantomData, } +pub struct ConstGenerator { + value: V, +} + +impl ConstGenerator { + pub fn new(value: V) -> Self { + Self { value } + } +} + +impl Random for ConstGenerator +where + R: Rng, + V: Clone, +{ + fn choose(&self, _rng: &mut R, amount: usize) -> Vec { + vec![self.value.clone(); amount] + } +} + pub fn random_capitalize_map(rng: &mut R, s: Ident) -> Ident { let mut v = s.value.chars().collect::>(); diff --git a/tests-fuzz/src/generator/create_expr.rs b/tests-fuzz/src/generator/create_expr.rs index fae6a95eda..261a310db2 100644 --- a/tests-fuzz/src/generator/create_expr.rs +++ b/tests-fuzz/src/generator/create_expr.rs @@ -193,6 +193,26 @@ fn generate_partition_def( } } +fn generate_metric_partition(partitions: usize) -> Option<(Column, PartitionDef)> { + if partitions <= 1 { + return None; + } + + let partition_column = Column { + name: Ident::new("host"), + column_type: ConcreteDataType::string_datatype(), + options: vec![ColumnOption::PrimaryKey], + }; + let bounds = generate_partition_bounds(&partition_column.column_type, partitions - 1); + let partitions = SimplePartitions::new(partition_column.name.clone(), bounds); + let partition_def = PartitionDef { + columns: vec![partitions.column_name.clone()], + exprs: partitions.generate().unwrap(), + }; + + Some((partition_column, partition_def)) +} + /// Generate a physical table with 2 columns: ts of TimestampType::Millisecond as time index and val of Float64Type. #[derive(Builder)] #[builder(pattern = "owned")] @@ -201,6 +221,8 @@ pub struct CreatePhysicalTableExprGenerator { name_generator: Box>, #[builder(default = "false")] if_not_exists: bool, + #[builder(default = "0")] + partition: usize, #[builder(default, setter(into))] with_clause: HashMap, } @@ -215,25 +237,35 @@ impl Generator for CreatePhysicalTableExpr options.insert(key.clone(), Value::from(value.clone())); } + let mut columns = vec![ + Column { + name: Ident::new("ts"), + column_type: ConcreteDataType::timestamp_millisecond_datatype(), + options: vec![ColumnOption::TimeIndex], + }, + Column { + name: Ident::new("val"), + column_type: ConcreteDataType::float64_datatype(), + options: vec![], + }, + ]; + + let mut partition = None; + let mut primary_keys = vec![]; + if let Some((partition_column, partition_def)) = generate_metric_partition(self.partition) { + columns.push(partition_column); + partition = Some(partition_def); + primary_keys.push(columns.len() - 1); + } + Ok(CreateTableExpr { table_name: self.name_generator.generate(rng), - columns: vec![ - Column { - name: Ident::new("ts"), - column_type: ConcreteDataType::timestamp_millisecond_datatype(), - options: vec![ColumnOption::TimeIndex], - }, - Column { - name: Ident::new("val"), - column_type: ConcreteDataType::float64_datatype(), - options: vec![], - }, - ], + columns, if_not_exists: self.if_not_exists, - partition: None, + partition, engine: "metric".to_string(), options, - primary_keys: vec![], + primary_keys, }) } } @@ -245,6 +277,8 @@ pub struct CreateLogicalTableExprGenerator { physical_table_ctx: TableContextRef, labels: usize, if_not_exists: bool, + #[builder(default = "true")] + include_partition_column: bool, #[builder(default = "Box::new(WordGenerator)")] name_generator: Box>, } @@ -253,11 +287,11 @@ impl Generator for CreateLogicalTableExprG type Error = Error; fn generate(&self, rng: &mut R) -> Result { - // Currently we mock the usage of GreptimeDB as Prometheus' backend, the physical table must have two columns. + // Currently we mock the usage of GreptimeDB as Prometheus' backend, the physical table must have ts and val. ensure!( - self.physical_table_ctx.columns.len() == 2, + self.physical_table_ctx.columns.len() >= 2, error::UnexpectedSnafu { - violated: "The physical table must have two columns" + violated: "The physical table must have at least two columns" } ); @@ -265,9 +299,16 @@ impl Generator for CreateLogicalTableExprG let logical_table_name = self .physical_table_ctx .generate_unique_table_name(rng, self.name_generator.as_ref()); + let mut physical_columns = self.physical_table_ctx.columns.clone(); + if !self.include_partition_column + && let Some(partition_def) = &self.physical_table_ctx.partition + { + physical_columns.retain(|column| !partition_def.columns.contains(&column.name)); + } + let mut logical_table = CreateTableExpr { table_name: logical_table_name, - columns: self.physical_table_ctx.columns.clone(), + columns: physical_columns, if_not_exists: self.if_not_exists, partition: None, engine: "metric".to_string(), @@ -459,6 +500,58 @@ mod tests { })); } + #[test] + fn test_create_physical_table_expr_generator_with_partition() { + let mut rng = rand::rng(); + let physical_table_expr = CreatePhysicalTableExprGeneratorBuilder::default() + .partition(3) + .if_not_exists(false) + .build() + .unwrap() + .generate(&mut rng) + .unwrap(); + + assert_eq!(physical_table_expr.engine, "metric"); + assert!(physical_table_expr.partition.is_some()); + assert_eq!(physical_table_expr.partition.unwrap().exprs.len(), 3); + } + + #[test] + fn test_create_logical_table_expr_generator_without_partition_column() { + let mut rng = rand::rng(); + let physical_table_expr = CreatePhysicalTableExprGeneratorBuilder::default() + .partition(3) + .if_not_exists(false) + .build() + .unwrap() + .generate(&mut rng) + .unwrap(); + let partition_columns = physical_table_expr + .partition + .as_ref() + .unwrap() + .columns + .clone(); + let physical_table_ctx = Arc::new(TableContext::from(&physical_table_expr)); + + let logical_table_expr = CreateLogicalTableExprGeneratorBuilder::default() + .physical_table_ctx(physical_table_ctx) + .labels(3) + .include_partition_column(false) + .if_not_exists(false) + .build() + .unwrap() + .generate(&mut rng) + .unwrap(); + + assert!( + logical_table_expr + .columns + .iter() + .all(|column| !partition_columns.contains(&column.name)) + ); + } + #[test] fn test_create_logical_table_expr_generator_deterministic() { let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0); diff --git a/tests-fuzz/src/ir.rs b/tests-fuzz/src/ir.rs index e8c15dcf95..ce1628cd61 100644 --- a/tests-fuzz/src/ir.rs +++ b/tests-fuzz/src/ir.rs @@ -20,6 +20,7 @@ pub(crate) mod insert_expr; pub(crate) mod partition_expr; pub(crate) mod repartition_expr; pub(crate) mod select_expr; +pub(crate) mod string_value; use core::fmt; use std::collections::HashMap; @@ -126,20 +127,7 @@ pub fn generate_partition_bounds(datatype: &ConcreteDataType, bounds: usize) -> ConcreteDataType::Int64(_) => generate_values!(i64, bounds), ConcreteDataType::Float32(_) => generate_values!(f32, bounds), ConcreteDataType::Float64(_) => generate_values!(f64, bounds), - ConcreteDataType::String(_) => { - let base = b'A'; - let range = b'z' - b'A'; - let step = range / (bounds as u8 + 1); - (1..=bounds) - .map(|i| { - Value::from( - char::from(base + step * i as u8) - .escape_default() - .to_string(), - ) - }) - .collect() - } + ConcreteDataType::String(_) => string_value::generate_partition_bounds(bounds), _ => unimplemented!("unsupported type: {datatype}"), } } @@ -157,10 +145,7 @@ pub fn generate_random_value( ConcreteDataType::Int64(_) => Value::from(rng.random::()), ConcreteDataType::Float32(_) => Value::from(rng.random::()), ConcreteDataType::Float64(_) => Value::from(rng.random::()), - ConcreteDataType::String(_) => match random_str { - Some(random) => Value::from(random.generate(rng).value), - None => Value::from(rng.random::().to_string()), - }, + ConcreteDataType::String(_) => string_value::generate_data_string_value(rng, random_str), ConcreteDataType::Date(_) => generate_random_date(rng), _ => unimplemented!("unsupported type: {datatype}"), @@ -341,21 +326,7 @@ pub fn generate_partition_value( } } datatypes::data_type::ConcreteDataType::String(_) => { - let upper = match first { - datatypes::value::Value::String(v) => v.as_utf8(), - _ => "", - }; - if bound_idx == 0 { - if upper <= "A" { - datatypes::value::Value::from("") - } else { - datatypes::value::Value::from("A") - } - } else if bound_idx < bounds.len() { - bounds[bound_idx - 1].clone() - } else { - last.clone() - } + string_value::generate_partition_value(bounds, bound_idx) } _ => unimplemented!("unsupported partition column type: {column_type}"), } diff --git a/tests-fuzz/src/ir/partition_expr.rs b/tests-fuzz/src/ir/partition_expr.rs index c91dd487ae..908223366c 100644 --- a/tests-fuzz/src/ir/partition_expr.rs +++ b/tests-fuzz/src/ir/partition_expr.rs @@ -20,7 +20,7 @@ use snafu::ensure; use crate::context::TableContext; use crate::error::{self, Result}; -use crate::ir::{Ident, generate_random_value}; +use crate::ir::{Ident, generate_random_value, string_value}; /// A partitioning scheme that divides a single column into multiple ranges based on provided bounds. /// @@ -245,6 +245,10 @@ pub fn generate_unique_bound( datatype: &ConcreteDataType, bounds: &[Value], ) -> Result { + if matches!(datatype, ConcreteDataType::String(_)) { + return string_value::generate_unique_partition_bound(rng, bounds); + } + for _ in 0..16 { let candidate = generate_random_value(rng, datatype, None); if !bounds.contains(&candidate) { diff --git a/tests-fuzz/src/ir/string_value.rs b/tests-fuzz/src/ir/string_value.rs new file mode 100644 index 0000000000..6a53aa69de --- /dev/null +++ b/tests-fuzz/src/ir/string_value.rs @@ -0,0 +1,162 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use datatypes::value::Value; +use rand::Rng; + +use crate::error::{self, Result}; +use crate::generator::Random; +use crate::ir::Ident; + +const READABLE_CHARSET: &[u8] = b"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; + +fn readable_token(index: usize) -> String { + let base = READABLE_CHARSET.len(); + let mut n = index + 1; + let mut buf = Vec::new(); + + while n > 0 { + let rem = (n - 1) % base; + buf.push(READABLE_CHARSET[rem] as char); + n = (n - 1) / base; + } + + buf.iter().rev().collect() +} + +pub fn generate_data_string_value( + rng: &mut R, + random_str: Option<&dyn Random>, +) -> Value { + match random_str { + Some(random) => Value::from(random.generate(rng).value), + None => { + let idx = rng.random_range(0..(READABLE_CHARSET.len() * READABLE_CHARSET.len() * 4)); + Value::from(readable_token(idx)) + } + } +} + +/// Generates ordered readable string bounds for partition expressions. +pub fn generate_partition_bounds(bounds: usize) -> Vec { + let token_space = READABLE_CHARSET.len() * READABLE_CHARSET.len() * 1024; + (1..=bounds) + .map(|i| { + let idx = i * token_space / (bounds + 1); + Value::from(readable_token(idx)) + }) + .collect() +} + +/// Picks a representative string value for the target partition range. +pub fn generate_partition_value(bounds: &[Value], bound_idx: usize) -> Value { + let first = bounds.first().unwrap(); + let last = bounds.last().unwrap(); + let upper = match first { + Value::String(v) => v.as_utf8(), + _ => "", + }; + + if bound_idx == 0 { + if upper <= "0" { + Value::from("") + } else { + Value::from("0") + } + } else if bound_idx < bounds.len() { + bounds[bound_idx - 1].clone() + } else { + last.clone() + } +} + +/// Generates a unique readable bound not present in existing bounds. +pub fn generate_unique_partition_bound(rng: &mut R, bounds: &[Value]) -> Result { + let search_space = READABLE_CHARSET.len() * READABLE_CHARSET.len() * 1024; + let start = rng.random_range(0..search_space); + for offset in 0..search_space { + let idx = start + offset; + let candidate = Value::from(readable_token(idx)); + if !bounds.contains(&candidate) { + return Ok(candidate); + } + } + + error::UnexpectedSnafu { + violated: "unable to generate unique string partition bound".to_string(), + } + .fail() +} + +#[cfg(test)] +mod tests { + use rand::SeedableRng; + use rand_chacha::ChaCha8Rng; + + use super::*; + + #[test] + fn test_readable_token_grows_length() { + assert_eq!("0", readable_token(0)); + assert_eq!("9", readable_token(9)); + assert_eq!("A", readable_token(10)); + assert_eq!("z", readable_token(61)); + assert_eq!("00", readable_token(62)); + } + + #[test] + fn test_generate_partition_bounds_are_readable_and_unique() { + let bounds = generate_partition_bounds(8); + assert_eq!(8, bounds.len()); + + let mut values = bounds + .iter() + .map(|v| match v { + Value::String(s) => s.as_utf8().to_string(), + _ => panic!("expected string value"), + }) + .collect::>(); + let mut dedup = values.clone(); + dedup.sort(); + dedup.dedup(); + assert_eq!(values.len(), dedup.len()); + + for s in values.drain(..) { + assert!(s.chars().all(|c| c.is_ascii_alphanumeric())); + } + } + + #[test] + fn test_generate_partition_value_for_string_bounds() { + let bounds = vec![Value::from("A"), Value::from("M")]; + assert_eq!(Value::from("0"), generate_partition_value(&bounds, 0)); + assert_eq!(Value::from("A"), generate_partition_value(&bounds, 1)); + assert_eq!(Value::from("M"), generate_partition_value(&bounds, 2)); + } + + #[test] + fn test_generate_unique_partition_bound_not_in_existing() { + let mut rng = ChaCha8Rng::seed_from_u64(42); + let bounds = vec![Value::from("0"), Value::from("1"), Value::from("2")]; + let candidate = generate_unique_partition_bound(&mut rng, &bounds).unwrap(); + assert!(!bounds.contains(&candidate)); + match candidate { + Value::String(s) => { + assert!(!s.as_utf8().is_empty()); + assert!(s.as_utf8().chars().all(|c| c.is_ascii_alphanumeric())); + } + _ => panic!("expected string value"), + } + } +} diff --git a/tests-fuzz/src/translator.rs b/tests-fuzz/src/translator.rs index 673b543f2c..4c5e0bb6a4 100644 --- a/tests-fuzz/src/translator.rs +++ b/tests-fuzz/src/translator.rs @@ -13,6 +13,8 @@ // limitations under the License. mod common; +/// Translator that converts insert expressions into CSV records. +pub mod csv; pub mod mysql; pub mod postgres; diff --git a/tests-fuzz/src/translator/csv.rs b/tests-fuzz/src/translator/csv.rs new file mode 100644 index 0000000000..e95956862c --- /dev/null +++ b/tests-fuzz/src/translator/csv.rs @@ -0,0 +1,121 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::error::Error; +use crate::ir::insert_expr::{InsertIntoExpr, RowValue}; +use crate::translator::DslTranslator; + +/// One CSV record converted from an insert row. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CsvRecord { + /// Cell values in column order. + pub values: Vec, +} + +/// CSV records converted from an insert expression. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CsvRecords { + /// Target table name from insert expression. + pub table_name: String, + /// Header values from insert columns. + pub headers: Vec, + /// Converted row records. + pub records: Vec, +} + +/// Translates `InsertIntoExpr` into CSV-writer-ready records. +pub struct InsertExprToCsvRecordsTranslator; + +impl DslTranslator for InsertExprToCsvRecordsTranslator { + type Error = Error; + + fn translate(&self, input: &InsertIntoExpr) -> Result { + let headers = input + .columns + .iter() + .map(|column| column.name.to_string()) + .collect::>(); + let records = input + .values_list + .iter() + .map(|row| CsvRecord { + values: row.iter().map(Self::format_row_value).collect(), + }) + .collect::>(); + + Ok(CsvRecords { + table_name: input.table_name.to_string(), + headers, + records, + }) + } +} + +impl InsertExprToCsvRecordsTranslator { + fn format_row_value(value: &RowValue) -> String { + match value { + RowValue::Value(datatypes::value::Value::Null) => String::new(), + RowValue::Value(v) => v.to_string(), + RowValue::Default => "DEFAULT".to_string(), + } + } +} + +#[cfg(test)] +mod tests { + use datatypes::data_type::ConcreteDataType; + + use super::InsertExprToCsvRecordsTranslator; + use crate::ir::create_expr::ColumnOption; + use crate::ir::insert_expr::{InsertIntoExpr, RowValue}; + use crate::ir::{Column, Ident}; + use crate::translator::DslTranslator; + + #[test] + fn test_translate_insert_expr_to_csv_records() { + let input = InsertIntoExpr { + table_name: Ident::new("metric_a"), + omit_column_list: false, + columns: vec![ + Column { + name: "host".into(), + column_type: ConcreteDataType::string_datatype(), + options: vec![ColumnOption::PrimaryKey], + }, + Column { + name: "value".into(), + column_type: ConcreteDataType::float64_datatype(), + options: vec![], + }, + ], + values_list: vec![ + vec![ + RowValue::Value(datatypes::value::Value::String("web-1".into())), + RowValue::Value(datatypes::value::Value::Int32(15)), + ], + vec![ + RowValue::Value(datatypes::value::Value::Null), + RowValue::Default, + ], + ], + }; + + let output = InsertExprToCsvRecordsTranslator.translate(&input).unwrap(); + assert_eq!(output.table_name, "metric_a"); + assert_eq!(output.headers, vec!["host", "value"]); + assert_eq!(output.records.len(), 2); + assert_eq!(output.records[0].values, vec!["web-1", "15"]); + assert_eq!(output.records[1].values, vec!["", "DEFAULT"]); + } +} diff --git a/tests-fuzz/src/utils.rs b/tests-fuzz/src/utils.rs index 0780f6c93d..d55abab3c2 100644 --- a/tests-fuzz/src/utils.rs +++ b/tests-fuzz/src/utils.rs @@ -15,6 +15,8 @@ pub mod cluster_info; pub mod config; pub mod crd; +/// CSV dump writer utilities for fuzz tests. +pub mod csv_dump_writer; pub mod health; pub mod migration; pub mod partition; @@ -22,10 +24,15 @@ pub mod pod_failure; pub mod procedure; #[cfg(feature = "unstable")] pub mod process; +pub mod retry; +/// SQL dump writer utilities for fuzz tests. +pub mod sql_dump_writer; pub mod wait; use std::env; +use std::str::FromStr; +use common_base::readable_size::ReadableSize; use common_telemetry::info; use common_telemetry::tracing::log::LevelFilter; use paste::paste; @@ -126,6 +133,14 @@ pub const GT_FUZZ_INPUT_MAX_COLUMNS: &str = "GT_FUZZ_INPUT_MAX_COLUMNS"; pub const GT_FUZZ_INPUT_MAX_ALTER_ACTIONS: &str = "GT_FUZZ_INPUT_MAX_ALTER_ACTIONS"; pub const GT_FUZZ_INPUT_MAX_INSERT_ACTIONS: &str = "GT_FUZZ_INPUT_MAX_INSERT_ACTIONS"; pub const FUZZ_OVERRIDE_PREFIX: &str = "GT_FUZZ_OVERRIDE_"; +/// Enables CSV dump generation for fuzz runs. +pub const GT_FUZZ_DUMP_TABLE_CSV: &str = "GT_FUZZ_DUMP_TABLE_CSV"; +/// Base directory for CSV dump sessions. +pub const GT_FUZZ_DUMP_DIR: &str = "GT_FUZZ_DUMP_DIR"; +/// Directory suffix used by one CSV dump session. +pub const GT_FUZZ_DUMP_SUFFIX: &str = "GT_FUZZ_DUMP_SUFFIX"; +/// Max in-memory CSV buffer size before auto flush. +pub const GT_FUZZ_DUMP_BUFFER_MAX_BYTES: &str = "GT_FUZZ_DUMP_BUFFER_MAX_BYTES"; /// Reads an override value for a fuzz parameter from env `GT_FUZZ_OVERRIDE_`. pub fn get_fuzz_override(name: &str) -> Option @@ -137,6 +152,33 @@ where env::var(&key).ok().and_then(|v| v.parse().ok()) } +/// Returns CSV dump base directory. +pub fn get_gt_fuzz_dump_dir() -> String { + let _ = dotenv::dotenv(); + env::var(GT_FUZZ_DUMP_DIR).unwrap_or_else(|_| "/tmp/greptime-fuzz-dumps".to_string()) +} + +/// Returns CSV dump directory suffix. +pub fn get_gt_fuzz_dump_suffix() -> String { + let _ = dotenv::dotenv(); + env::var(GT_FUZZ_DUMP_SUFFIX).unwrap_or_else(|_| ".repartition-metric-csv".to_string()) +} + +/// Returns max CSV in-memory buffer size. +pub fn get_gt_fuzz_dump_buffer_max_bytes() -> usize { + let _ = dotenv::dotenv(); + env::var(GT_FUZZ_DUMP_BUFFER_MAX_BYTES) + .ok() + .and_then(|value| { + value.parse::().ok().or_else(|| { + ReadableSize::from_str(&value) + .ok() + .map(|size| size.as_bytes() as usize) + }) + }) + .unwrap_or(8 * 1024 * 1024) +} + macro_rules! make_get_from_env_helper { ($key:expr, $default: expr) => { paste! { diff --git a/tests-fuzz/src/utils/csv_dump_writer.rs b/tests-fuzz/src/utils/csv_dump_writer.rs new file mode 100644 index 0000000000..de16a23c24 --- /dev/null +++ b/tests-fuzz/src/utils/csv_dump_writer.rs @@ -0,0 +1,383 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, HashSet}; +use std::fs::{File, OpenOptions, create_dir_all, remove_dir_all}; +use std::io::Write; +use std::path::{Path, PathBuf}; + +use common_telemetry::{info, warn}; +use common_time::util::current_time_millis; +use snafu::ResultExt; + +use crate::error::{self, Result}; +use crate::translator::csv::CsvRecords; +use crate::utils::{ + get_gt_fuzz_dump_buffer_max_bytes, get_gt_fuzz_dump_dir, get_gt_fuzz_dump_suffix, +}; + +/// Metadata for one CSV dump session. +#[derive(Debug, Clone)] +pub struct CsvDumpMetadata { + /// Fuzz target name. + pub target: String, + /// Seed used by current fuzz input. + pub seed: u64, + /// Repartition action count. + pub actions: usize, + /// Initial partition count. + pub partitions: usize, + /// Logical table count. + pub tables: usize, + /// Session start time in unix milliseconds. + pub started_at_unix_ms: i64, +} + +impl CsvDumpMetadata { + /// Builds dump metadata with current timestamp. + pub fn new( + target: impl Into, + seed: u64, + actions: usize, + partitions: usize, + tables: usize, + ) -> Self { + Self { + target: target.into(), + seed, + actions, + partitions, + tables, + started_at_unix_ms: current_time_millis(), + } + } +} + +/// Session writer for staged CSV dump records. +#[derive(Debug)] +pub struct CsvDumpSession { + /// Session metadata. + pub metadata: CsvDumpMetadata, + /// Session directory path. + pub run_dir: PathBuf, + /// Max in-memory buffer size before auto flush. + pub max_buffer_bytes: usize, + records: Vec, + buffered_bytes: usize, + written_tables: HashSet, + full_headers_by_table: HashMap>, +} + +impl CsvDumpSession { + /// Creates session directory and writes seed metadata file. + pub fn new(metadata: CsvDumpMetadata) -> Result { + Self::new_with_buffer_limit(metadata, get_gt_fuzz_dump_buffer_max_bytes()) + } + + /// Creates session with a custom in-memory buffer limit. + pub fn new_with_buffer_limit( + metadata: CsvDumpMetadata, + max_buffer_bytes: usize, + ) -> Result { + let run_dir = build_run_dir(&metadata); + create_dir_all(&run_dir).context(error::CreateFileSnafu { + path: run_dir.to_string_lossy().to_string(), + })?; + write_seed_meta(&run_dir, &metadata)?; + info!( + "Create csv dump session, target: {}, run_dir: {}, max_buffer_bytes: {}", + metadata.target, + run_dir.display(), + max_buffer_bytes + ); + + Ok(Self { + metadata, + run_dir, + max_buffer_bytes, + records: Vec::new(), + buffered_bytes: 0, + written_tables: HashSet::new(), + full_headers_by_table: HashMap::new(), + }) + } + + /// Appends one table CSV records batch with full table headers. + pub fn append(&mut self, records: CsvRecords, full_headers: Vec) -> Result<()> { + self.full_headers_by_table + .entry(records.table_name.clone()) + .or_insert(full_headers); + self.buffered_bytes += estimate_csv_records_size(&records); + self.records.push(records); + if self.buffered_bytes >= self.max_buffer_bytes { + self.flush_buffered_records()?; + } + Ok(()) + } + + /// Flushes all appended batches to CSV files. + pub fn flush_all(&mut self) -> Result<()> { + self.flush_buffered_records() + } + + /// Removes session directory after successful validation. + pub fn cleanup_on_success(&self) -> std::io::Result<()> { + match remove_dir_all(&self.run_dir) { + Ok(_) => { + info!( + "Cleanup csv dump directory on success: {}", + self.run_dir.display() + ); + Ok(()) + } + Err(err) => { + warn!( + "Cleanup csv dump directory failed: {}, error: {:?}", + self.run_dir.display(), + err + ); + Err(err) + } + } + } + + fn flush_buffered_records(&mut self) -> Result<()> { + if self.records.is_empty() { + return Ok(()); + } + for batch in &self.records { + write_batch_csv( + &self.run_dir, + batch, + &mut self.written_tables, + &self.full_headers_by_table, + )?; + } + self.records.clear(); + self.buffered_bytes = 0; + Ok(()) + } +} + +fn write_seed_meta(run_dir: &Path, metadata: &CsvDumpMetadata) -> Result<()> { + let path = run_dir.join("seed.meta"); + let mut file = File::create(&path).context(error::CreateFileSnafu { + path: path.to_string_lossy().to_string(), + })?; + + let content = format!( + "target={}\nseed={}\nactions={}\npartitions={}\ntables={}\nstarted_at_unix_ms={}\n", + metadata.target, + metadata.seed, + metadata.actions, + metadata.partitions, + metadata.tables, + metadata.started_at_unix_ms, + ); + file.write_all(content.as_bytes()) + .context(error::WriteFileSnafu { + path: path.to_string_lossy().to_string(), + }) +} + +fn write_batch_csv( + run_dir: &Path, + batch: &CsvRecords, + written_tables: &mut HashSet, + full_headers_by_table: &HashMap>, +) -> Result<()> { + let output_headers = full_headers_by_table + .get(&batch.table_name) + .cloned() + .unwrap_or_else(|| batch.headers.clone()); + let file_name = format!("{}.table-data.csv", sanitize_file_name(&batch.table_name)); + let path = run_dir.join(file_name); + let mut file = OpenOptions::new() + .create(true) + .append(true) + .open(&path) + .context(error::CreateFileSnafu { + path: path.to_string_lossy().to_string(), + })?; + + if written_tables.insert(batch.table_name.clone()) { + file.write_all(join_line(&output_headers).as_bytes()) + .context(error::WriteFileSnafu { + path: path.to_string_lossy().to_string(), + })?; + file.write_all(b"\n").context(error::WriteFileSnafu { + path: path.to_string_lossy().to_string(), + })?; + } + + let header_index = batch + .headers + .iter() + .enumerate() + .map(|(idx, header)| (header.as_str(), idx)) + .collect::>(); + + for record in &batch.records { + let aligned_values = output_headers + .iter() + .map(|header| { + header_index + .get(header.as_str()) + .and_then(|idx| record.values.get(*idx)) + .cloned() + .unwrap_or_default() + }) + .collect::>(); + file.write_all(join_line(&aligned_values).as_bytes()) + .context(error::WriteFileSnafu { + path: path.to_string_lossy().to_string(), + })?; + file.write_all(b"\n").context(error::WriteFileSnafu { + path: path.to_string_lossy().to_string(), + })?; + } + + Ok(()) +} + +fn estimate_csv_records_size(records: &CsvRecords) -> usize { + let headers = records.headers.iter().map(String::len).sum::(); + let rows = records + .records + .iter() + .flat_map(|record| record.values.iter()) + .map(String::len) + .sum::(); + headers + rows +} + +fn join_line(cells: &[String]) -> String { + cells + .iter() + .map(|cell| escape_csv_cell(cell)) + .collect::>() + .join(",") +} + +fn escape_csv_cell(value: &str) -> String { + if value.contains([',', '"', '\n', '\r']) { + format!("\"{}\"", value.replace('"', "\"\"")) + } else { + value.to_string() + } +} + +fn sanitize_file_name(raw: &str) -> String { + raw.chars() + .map(|ch| { + if ch.is_ascii_alphanumeric() || ch == '_' || ch == '-' { + ch + } else { + '_' + } + }) + .collect() +} + +fn build_run_dir(metadata: &CsvDumpMetadata) -> PathBuf { + let base = PathBuf::from(get_gt_fuzz_dump_dir()); + let suffix = get_gt_fuzz_dump_suffix(); + let name = format!( + "{}_seed_{}_actions_{}_ts_{}{}", + metadata.target, metadata.seed, metadata.actions, metadata.started_at_unix_ms, suffix + ); + base.join(name) +} + +#[cfg(test)] +mod tests { + use super::{CsvDumpMetadata, CsvDumpSession}; + use crate::translator::csv::{CsvRecord, CsvRecords}; + + #[test] + fn test_create_session_and_flush() { + let mut session = CsvDumpSession::new_with_buffer_limit( + CsvDumpMetadata::new("fuzz_case", 1, 2, 3, 4), + 1024, + ) + .unwrap(); + session + .append( + CsvRecords { + table_name: "metric-a".to_string(), + headers: vec!["host".to_string(), "value".to_string()], + records: vec![CsvRecord { + values: vec!["web-1".to_string(), "10".to_string()], + }], + }, + vec!["host".to_string(), "value".to_string()], + ) + .unwrap(); + session.flush_all().unwrap(); + + assert!(session.run_dir.exists()); + assert!(session.run_dir.join("seed.meta").exists()); + assert!(session.run_dir.join("metric-a.table-data.csv").exists()); + } + + #[test] + fn test_auto_flush_on_buffer_limit() { + let mut session = + CsvDumpSession::new_with_buffer_limit(CsvDumpMetadata::new("fuzz_case", 5, 2, 3, 4), 1) + .unwrap(); + session + .append( + CsvRecords { + table_name: "metric-b".to_string(), + headers: vec!["host".to_string()], + records: vec![CsvRecord { + values: vec!["web-2".to_string()], + }], + }, + vec!["host".to_string()], + ) + .unwrap(); + + assert!(session.run_dir.join("metric-b.table-data.csv").exists()); + assert_eq!(session.buffered_bytes, 0); + } + + #[test] + fn test_flush_with_partial_headers_uses_full_headers() { + let mut session = CsvDumpSession::new_with_buffer_limit( + CsvDumpMetadata::new("fuzz_case", 7, 2, 3, 4), + 1024, + ) + .unwrap(); + session + .append( + CsvRecords { + table_name: "metric-c".to_string(), + headers: vec!["host".to_string(), "value".to_string()], + records: vec![CsvRecord { + values: vec!["web-3".to_string(), "12".to_string()], + }], + }, + vec!["host".to_string(), "idc".to_string(), "value".to_string()], + ) + .unwrap(); + session.flush_all().unwrap(); + + let file = + std::fs::read_to_string(session.run_dir.join("metric-c.table-data.csv")).unwrap(); + let mut lines = file.lines(); + assert_eq!(lines.next().unwrap(), "host,idc,value"); + assert_eq!(lines.next().unwrap(), "web-3,,12"); + } +} diff --git a/tests-fuzz/src/utils/retry.rs b/tests-fuzz/src/utils/retry.rs new file mode 100644 index 0000000000..06d1ede54f --- /dev/null +++ b/tests-fuzz/src/utils/retry.rs @@ -0,0 +1,49 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::future::Future; +use std::time::Duration; + +use common_telemetry::warn; + +pub async fn retry_with_backoff( + mut operation: F, + max_attempts: usize, + init_backoff: Duration, + max_backoff: Duration, +) -> Result +where + F: FnMut() -> Fut, + Fut: Future>, + E: std::fmt::Debug, +{ + let mut backoff = init_backoff; + for attempt in 0..max_attempts { + match operation().await { + Ok(result) => return Ok(result), + Err(err) if attempt + 1 == max_attempts => return Err(err), + Err(err) => { + let current_attempt = attempt + 1; + warn!( + "Retryable operation failed, attempt: {}, max_attempts: {}, backoff: {:?}, error: {:?}", + current_attempt, max_attempts, backoff, err + ); + tokio::time::sleep(backoff).await; + backoff = std::cmp::min(backoff * 2, max_backoff); + } + } + } + + panic!("retry loop should always return") +} diff --git a/tests-fuzz/src/utils/sql_dump_writer.rs b/tests-fuzz/src/utils/sql_dump_writer.rs new file mode 100644 index 0000000000..6f098d9584 --- /dev/null +++ b/tests-fuzz/src/utils/sql_dump_writer.rs @@ -0,0 +1,267 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::fs::{OpenOptions, create_dir_all}; +use std::io::Write; +use std::path::PathBuf; + +use snafu::ResultExt; + +use crate::error::{self, Result}; +use crate::utils::get_gt_fuzz_dump_buffer_max_bytes; + +/// Session writer for table-scoped SQL trace files. +#[derive(Debug)] +pub struct SqlDumpSession { + /// Session directory path. + pub run_dir: PathBuf, + /// Max in-memory buffer size before auto flush. + pub max_buffer_bytes: usize, + buffered_bytes: usize, + entries_by_table: HashMap>, +} + +impl SqlDumpSession { + /// Creates SQL dump session with default buffer limit. + pub fn new(run_dir: PathBuf) -> Result { + Self::new_with_buffer_limit(run_dir, get_gt_fuzz_dump_buffer_max_bytes()) + } + + /// Creates SQL dump session with custom buffer limit. + pub fn new_with_buffer_limit(run_dir: PathBuf, max_buffer_bytes: usize) -> Result { + create_dir_all(&run_dir).context(error::CreateFileSnafu { + path: run_dir.to_string_lossy().to_string(), + })?; + + Ok(Self { + run_dir, + max_buffer_bytes, + buffered_bytes: 0, + entries_by_table: HashMap::new(), + }) + } + + /// Appends one SQL statement for a logical table. + pub fn append_sql(&mut self, table: &str, sql: &str, comment: Option<&str>) -> Result<()> { + let entry = format_sql_entry(sql, comment); + self.push_entry(table, entry)?; + Ok(()) + } + + /// Broadcasts one comment event to all table trace files. + pub fn broadcast_event(&mut self, tables: I, event: &str, sql: &str) -> Result<()> + where + I: IntoIterator, + T: AsRef, + { + let entry = format_sql_entry(sql, Some(event)); + for table in tables { + self.push_entry(table.as_ref(), entry.clone())?; + } + Ok(()) + } + + /// Flushes all staged SQL traces to table-scoped files. + pub fn flush_all(&mut self) -> Result<()> { + self.flush_buffered_entries() + } + + fn push_entry(&mut self, table: &str, entry: String) -> Result<()> { + self.buffered_bytes += entry.len(); + self.entries_by_table + .entry(table.to_string()) + .or_default() + .push(entry); + + if self.buffered_bytes >= self.max_buffer_bytes { + self.flush_buffered_entries()?; + } + Ok(()) + } + + fn flush_buffered_entries(&mut self) -> Result<()> { + if self.entries_by_table.is_empty() { + return Ok(()); + } + + for (table, entries) in &self.entries_by_table { + let path = self + .run_dir + .join(format!("{}.trace.sql", sanitize_file_name(table))); + let mut file = OpenOptions::new() + .create(true) + .append(true) + .open(&path) + .context(error::CreateFileSnafu { + path: path.to_string_lossy().to_string(), + })?; + + for entry in entries { + file.write_all(entry.as_bytes()) + .context(error::WriteFileSnafu { + path: path.to_string_lossy().to_string(), + })?; + file.write_all(b"\n").context(error::WriteFileSnafu { + path: path.to_string_lossy().to_string(), + })?; + } + } + + self.entries_by_table.clear(); + self.buffered_bytes = 0; + Ok(()) + } +} + +fn format_sql_entry(sql: &str, comment: Option<&str>) -> String { + let normalized_sql = normalize_sql(sql); + if let Some(comment) = comment { + format!("{}\n{normalized_sql}", format_comment(comment)) + } else { + normalized_sql + } +} + +fn format_comment(comment: &str) -> String { + comment + .lines() + .map(|line| format!("-- {line}")) + .collect::>() + .join("\n") +} + +fn normalize_sql(sql: &str) -> String { + let trimmed = sql.trim_end(); + if trimmed.ends_with(';') { + trimmed.to_string() + } else { + format!("{trimmed};") + } +} + +fn sanitize_file_name(raw: &str) -> String { + raw.chars() + .map(|ch| { + if ch.is_ascii_alphanumeric() || ch == '_' || ch == '-' { + ch + } else { + '_' + } + }) + .collect() +} + +#[cfg(test)] +mod tests { + use std::time::{SystemTime, UNIX_EPOCH}; + + use super::SqlDumpSession; + + #[test] + fn test_append_sql_writes_table_trace_file() { + let run_dir = std::env::temp_dir().join(format!( + "tests-fuzz-sql-dump-{}", + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() + )); + + let mut session = SqlDumpSession::new_with_buffer_limit(run_dir.clone(), 1024).unwrap(); + session + .append_sql( + "metric-a", + "INSERT INTO t VALUES(1)", + Some("kind=insert elapsed_ms=10"), + ) + .unwrap(); + session.flush_all().unwrap(); + + let content = std::fs::read_to_string(run_dir.join("metric-a.trace.sql")).unwrap(); + assert!(content.contains("-- kind=insert elapsed_ms=10")); + assert!(content.contains("INSERT INTO t VALUES(1);")); + } + + #[test] + fn test_broadcast_event_writes_to_all_tables() { + let run_dir = std::env::temp_dir().join(format!( + "tests-fuzz-sql-broadcast-{}", + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() + )); + + let mut session = SqlDumpSession::new_with_buffer_limit(run_dir.clone(), 1024).unwrap(); + session + .broadcast_event( + ["metric-a", "metric-b"], + "repartition action_idx=3", + "ALTER TABLE t REPARTITION", + ) + .unwrap(); + session.flush_all().unwrap(); + + let content_a = std::fs::read_to_string(run_dir.join("metric-a.trace.sql")).unwrap(); + let content_b = std::fs::read_to_string(run_dir.join("metric-b.trace.sql")).unwrap(); + assert!(content_a.contains("-- repartition action_idx=3")); + assert!(content_a.contains("ALTER TABLE t REPARTITION;")); + assert!(content_b.contains("-- repartition action_idx=3")); + assert!(content_b.contains("ALTER TABLE t REPARTITION;")); + } + + #[test] + fn test_multiline_comment_is_prefixed_per_line() { + let run_dir = std::env::temp_dir().join(format!( + "tests-fuzz-sql-dump-comment-{}", + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() + )); + + let mut session = SqlDumpSession::new_with_buffer_limit(run_dir.clone(), 1024).unwrap(); + session + .append_sql( + "metric-a", + "INSERT INTO t VALUES(1)", + Some("kind=insert\nstarted_at_ms=1 elapsed_ms=2"), + ) + .unwrap(); + session.flush_all().unwrap(); + + let content = std::fs::read_to_string(run_dir.join("metric-a.trace.sql")).unwrap(); + assert!(content.contains("-- kind=insert\n-- started_at_ms=1 elapsed_ms=2")); + } + + #[test] + fn test_auto_flush_on_buffer_limit() { + let run_dir = std::env::temp_dir().join(format!( + "tests-fuzz-sql-dump-limit-{}", + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() + )); + + let mut session = SqlDumpSession::new_with_buffer_limit(run_dir.clone(), 1).unwrap(); + session + .append_sql("metric-a", "INSERT INTO t VALUES(1)", None) + .unwrap(); + + assert!(run_dir.join("metric-a.trace.sql").exists()); + assert_eq!(session.buffered_bytes, 0); + } +} diff --git a/tests-fuzz/targets/ddl/fuzz_repartition_metric_table.rs b/tests-fuzz/targets/ddl/fuzz_repartition_metric_table.rs new file mode 100644 index 0000000000..7932bc7759 --- /dev/null +++ b/tests-fuzz/targets/ddl/fuzz_repartition_metric_table.rs @@ -0,0 +1,684 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![no_main] + +use std::collections::{BTreeMap, HashMap}; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; + +use arbitrary::{Arbitrary, Unstructured}; +use common_telemetry::{info, warn}; +use common_time::Timestamp; +use common_time::util::current_time_millis; +use libfuzzer_sys::fuzz_target; +use rand::{Rng, SeedableRng}; +use rand_chacha::ChaChaRng; +use snafu::{ResultExt, ensure}; +use sqlx::{MySql, Pool}; +use tests_fuzz::context::{TableContext, TableContextRef}; +use tests_fuzz::error::{self, Result}; +use tests_fuzz::fake::{ + ConstGenerator, MappedGenerator, WordGenerator, merge_two_word_map_fn, random_capitalize_map, + uppercase_and_keyword_backtick_map, +}; +use tests_fuzz::generator::Generator; +use tests_fuzz::generator::create_expr::{ + CreateLogicalTableExprGeneratorBuilder, CreatePhysicalTableExprGeneratorBuilder, +}; +use tests_fuzz::generator::insert_expr::InsertExprGeneratorBuilder; +use tests_fuzz::generator::repartition_expr::{ + MergePartitionExprGeneratorBuilder, SplitPartitionExprGeneratorBuilder, +}; +use tests_fuzz::ir::{ + CreateTableExpr, Ident, InsertIntoExpr, RepartitionExpr, generate_random_value, + generate_unique_timestamp_for_mysql_with_clock, +}; +use tests_fuzz::translator::DslTranslator; +use tests_fuzz::translator::csv::InsertExprToCsvRecordsTranslator; +use tests_fuzz::translator::mysql::create_expr::CreateTableExprTranslator; +use tests_fuzz::translator::mysql::insert_expr::InsertIntoExprTranslator; +use tests_fuzz::translator::mysql::repartition_expr::RepartitionExprTranslator; +use tests_fuzz::utils::csv_dump_writer::{CsvDumpMetadata, CsvDumpSession}; +use tests_fuzz::utils::retry::retry_with_backoff; +use tests_fuzz::utils::sql_dump_writer::SqlDumpSession; +use tests_fuzz::utils::{ + Connections, get_fuzz_override, get_gt_fuzz_input_max_alter_actions, + get_gt_fuzz_input_max_tables, init_greptime_connections_via_env, +}; +use tests_fuzz::validator::row::count_values; +use tokio::sync::{mpsc, oneshot}; + +const BARRIER_ACK_TIMEOUT_SECS: u64 = 10; +const VALIDATE_QUERY_MAX_ATTEMPTS: usize = 6; +const VALIDATE_QUERY_INIT_BACKOFF: Duration = Duration::from_millis(50); +const VALIDATE_QUERY_MAX_BACKOFF: Duration = Duration::from_millis(800); + +#[derive(Clone)] +struct FuzzContext { + greptime: Pool, +} + +impl FuzzContext { + async fn close(self) { + self.greptime.close().await; + } +} + +#[derive(Clone, Debug)] +struct FuzzInput { + seed: u64, + actions: usize, + partitions: usize, + tables: usize, +} + +fn generate_create_physical_table_expr( + partitions: usize, + rng: &mut R, +) -> Result { + CreatePhysicalTableExprGeneratorBuilder::default() + .name_generator(Box::new(ConstGenerator::new(Ident::new( + "fuzz_repartition_metric_physical", + )))) + .if_not_exists(rng.random_bool(0.5)) + .partition(partitions) + .build() + .unwrap() + .generate(rng) +} + +fn generate_create_logical_table_expr( + physical_table_ctx: TableContextRef, + include_partition_column: bool, + rng: &mut R, +) -> Result { + CreateLogicalTableExprGeneratorBuilder::default() + .name_generator(Box::new(MappedGenerator::new( + WordGenerator, + merge_two_word_map_fn(random_capitalize_map, uppercase_and_keyword_backtick_map), + ))) + .physical_table_ctx(physical_table_ctx) + .labels(rng.random_range(1..=5)) + .if_not_exists(rng.random_bool(0.5)) + .include_partition_column(include_partition_column) + .build() + .unwrap() + .generate(rng) +} + +fn generate_insert_expr( + rows: usize, + rng: &mut R, + table_ctx: TableContextRef, + clock: Arc>, +) -> Result { + let ts_value_generator = generate_unique_timestamp_for_mysql_with_clock(clock); + InsertExprGeneratorBuilder::default() + .omit_column_list(false) + .table_ctx(table_ctx) + .rows(rows) + .value_generator(Box::new(generate_random_value)) + .ts_value_generator(ts_value_generator) + .build() + .unwrap() + .generate(rng) +} + +async fn create_metric_tables( + ctx: &FuzzContext, + rng: &mut R, + partitions: usize, + table_count: usize, +) -> Result<( + TableContextRef, + BTreeMap, + HashMap, + String, +)> { + let create_physical_expr = generate_create_physical_table_expr(partitions, rng)?; + let translator = CreateTableExprTranslator; + let create_physical_sql = translator.translate(&create_physical_expr)?; + let result = sqlx::query(&create_physical_sql) + .execute(&ctx.greptime) + .await + .context(error::ExecuteQuerySnafu { + sql: &create_physical_sql, + })?; + info!("Create physical table: {create_physical_sql}, result: {result:?}"); + let physical_table_ctx = Arc::new(TableContext::from(&create_physical_expr)); + ensure!( + physical_table_ctx.partition.is_some(), + error::AssertSnafu { + reason: "Physical metric table must have partition".to_string() + } + ); + + let mut logical_tables = BTreeMap::new(); + let mut create_logical_sqls = HashMap::new(); + let max_attempts = table_count * 3; + for _ in 0..max_attempts { + if logical_tables.len() >= table_count { + break; + } + + let include_partition_column = rng.random_bool(0.5); + let create_logical_expr = generate_create_logical_table_expr( + physical_table_ctx.clone(), + include_partition_column, + rng, + )?; + if logical_tables.contains_key(&create_logical_expr.table_name) { + continue; + } + + let create_logical_sql = translator.translate(&create_logical_expr)?; + let result = sqlx::query(&create_logical_sql) + .execute(&ctx.greptime) + .await + .context(error::ExecuteQuerySnafu { + sql: &create_logical_sql, + })?; + info!("Create logical table: {create_logical_sql}, result: {result:?}"); + let logical_ctx = Arc::new(TableContext::from(&create_logical_expr)); + create_logical_sqls.insert(logical_ctx.name.to_string(), create_logical_sql); + logical_tables.insert(logical_ctx.name.clone(), logical_ctx); + } + + ensure!( + !logical_tables.is_empty(), + error::AssertSnafu { + reason: "No logical table created".to_string() + } + ); + + Ok(( + physical_table_ctx, + logical_tables, + create_logical_sqls, + create_physical_sql, + )) +} + +async fn execute_insert_with_retry(ctx: &FuzzContext, sql: &str) -> Result<()> { + let mut delay = Duration::from_millis(100); + let mut attempt = 0; + let max_attempts = 10; + loop { + match sqlx::query(sql) + .persistent(false) + .execute(&ctx.greptime) + .await + { + Ok(_) => return Ok(()), + Err(err) => { + tokio::time::sleep(delay).await; + delay = std::cmp::min(delay * 2, Duration::from_secs(1)); + attempt += 1; + warn!("Execute insert with retry: {sql}, attempt: {attempt}, error: {err:?}"); + if attempt >= max_attempts { + return Err(err).context(error::ExecuteQuerySnafu { sql }); + } + } + } + } +} + +struct SharedState { + clock: Arc>, + inserted_rows: HashMap, + csv_dump_session: Option, + sql_dump_session: Option, + running: bool, +} + +enum WriterControl { + Barrier { + epoch: usize, + ack: oneshot::Sender<()>, + }, + Resume { + epoch: usize, + }, + Stop, +} + +fn handle_writer_control(control: WriterControl, paused: &mut bool) -> bool { + match control { + WriterControl::Barrier { epoch, ack } => { + info!("Writer received barrier control, epoch: {epoch}"); + *paused = true; + let _ = ack.send(()); + false + } + WriterControl::Resume { epoch } => { + info!("Writer received resume control, epoch: {epoch}"); + *paused = false; + false + } + WriterControl::Stop => { + info!("Writer received stop control"); + true + } + } +} + +async fn write_loop( + mut rng: R, + ctx: FuzzContext, + logical_tables: BTreeMap, + shared_state: Arc>, + mut control_rx: mpsc::UnboundedReceiver, +) -> Result<()> { + info!("Start write loop"); + let mut paused = false; + loop { + while let Ok(control) = control_rx.try_recv() { + if handle_writer_control(control, &mut paused) { + return Ok(()); + } + } + + if paused { + match control_rx.recv().await { + Some(control) => { + if handle_writer_control(control, &mut paused) { + return Ok(()); + } + } + None => return Ok(()), + } + continue; + } + + let (running, clock) = { + let state = shared_state.lock().unwrap(); + (state.running, state.clock.clone()) + }; + if !running { + break; + } + + for table_ctx in logical_tables.values() { + let rows = rng.random_range(1..=3); + let insert_expr = + generate_insert_expr(rows, &mut rng, table_ctx.clone(), clock.clone())?; + let translator = InsertIntoExprTranslator; + let sql = translator.translate(&insert_expr)?; + let inserted = insert_expr.values_list.len() as u64; + let csv_records = InsertExprToCsvRecordsTranslator.translate(&insert_expr)?; + let table_name = table_ctx.name.to_string(); + let full_headers = table_ctx + .columns + .iter() + .map(|column| column.name.value.clone()) + .collect::>(); + + let started_at_ms = current_time_millis(); + let now = Instant::now(); + execute_insert_with_retry(&ctx, &sql).await?; + let elapsed = now.elapsed(); + info!("Execute insert sql: {sql}, elapsed: {elapsed:?}"); + + let mut state = shared_state.lock().unwrap(); + if let Some(csv_dump_session) = state.csv_dump_session.as_mut() { + csv_dump_session.append(csv_records, full_headers)?; + } + if let Some(sql_dump_session) = state.sql_dump_session.as_mut() { + let comment = format!( + "kind=insert table={} started_at_ms={} elapsed_ms={}", + table_name, + started_at_ms, + elapsed.as_millis() + ); + sql_dump_session.append_sql(&table_name, &sql, Some(&comment))?; + } + *state.inserted_rows.entry(table_name).or_insert(0) += inserted; + } + + tokio::time::sleep(Duration::from_millis(100)).await; + } + info!("Write loop ended"); + + Ok(()) +} + +async fn validate_rows( + ctx: &FuzzContext, + logical_tables: &BTreeMap, + inserted_rows: &HashMap, +) -> Result<()> { + for table_ctx in logical_tables.values() { + let expected = *inserted_rows.get(&table_ctx.name.to_string()).unwrap_or(&0) as usize; + let count_sql = format!("SELECT COUNT(1) AS count FROM {}", table_ctx.name); + let count = retry_with_backoff( + || count_values(&ctx.greptime, &count_sql), + VALIDATE_QUERY_MAX_ATTEMPTS, + VALIDATE_QUERY_INIT_BACKOFF, + VALIDATE_QUERY_MAX_BACKOFF, + ) + .await?; + let distinct_count_sql = format!( + "SELECT COUNT(DISTINCT {}) AS count FROM {}", + table_ctx.timestamp_column().unwrap().name, + table_ctx.name + ); + let distinct_count = retry_with_backoff( + || count_values(&ctx.greptime, &distinct_count_sql), + VALIDATE_QUERY_MAX_ATTEMPTS, + VALIDATE_QUERY_INIT_BACKOFF, + VALIDATE_QUERY_MAX_BACKOFF, + ) + .await?; + info!( + "Validate rows for table: {}, expected: {}, count: {}, distinct_count: {}", + table_ctx.name, expected, count.count as usize, distinct_count.count as usize + ); + assert_eq!(count.count as usize, expected); + + assert_eq!(distinct_count.count as usize, expected); + } + Ok(()) +} + +fn flush_dump_sessions_and_snapshot( + shared_state: &Arc>, +) -> Result> { + let mut state = shared_state.lock().unwrap(); + if let Some(csv_dump_session) = state.csv_dump_session.as_mut() { + csv_dump_session.flush_all()?; + } + if let Some(sql_dump_session) = state.sql_dump_session.as_mut() { + sql_dump_session.flush_all()?; + } + Ok(state.inserted_rows.clone()) +} + +async fn cleanup_tables( + ctx: &FuzzContext, + physical_table_ctx: &TableContextRef, + logical_tables: &BTreeMap, +) -> Result<()> { + for table_ctx in logical_tables.values() { + let drop_logical_sql = format!("DROP TABLE {}", table_ctx.name); + let result = sqlx::query(&drop_logical_sql) + .execute(&ctx.greptime) + .await + .context(error::ExecuteQuerySnafu { + sql: &drop_logical_sql, + })?; + info!("Drop logical table: {drop_logical_sql}, result: {result:?}"); + } + + let drop_physical_sql = format!("DROP TABLE {}", physical_table_ctx.name); + let result = sqlx::query(&drop_physical_sql) + .execute(&ctx.greptime) + .await + .context(error::ExecuteQuerySnafu { + sql: &drop_physical_sql, + })?; + info!("Drop physical table: {drop_physical_sql}, result: {result:?}"); + Ok(()) +} + +fn repartition_operation( + table_ctx: &TableContextRef, + rng: &mut R, +) -> Result { + let split = rng.random_bool(0.5); + if table_ctx.partition.as_ref().unwrap().exprs.len() <= 2 || split { + let expr = SplitPartitionExprGeneratorBuilder::default() + .table_ctx(table_ctx.clone()) + .build() + .unwrap() + .generate(rng)?; + Ok(RepartitionExpr::Split(expr)) + } else { + let expr = MergePartitionExprGeneratorBuilder::default() + .table_ctx(table_ctx.clone()) + .build() + .unwrap() + .generate(rng)?; + Ok(RepartitionExpr::Merge(expr)) + } +} + +impl Arbitrary<'_> for FuzzInput { + fn arbitrary(u: &mut Unstructured<'_>) -> arbitrary::Result { + let seed = get_fuzz_override::("SEED").unwrap_or(u.int_in_range(u64::MIN..=u64::MAX)?); + let mut rng = ChaChaRng::seed_from_u64(seed); + let partitions = + get_fuzz_override::("PARTITIONS").unwrap_or_else(|| rng.random_range(2..8)); + let max_tables = get_gt_fuzz_input_max_tables(); + let tables = get_fuzz_override::("TABLES") + .unwrap_or_else(|| rng.random_range(1..=std::cmp::max(1, max_tables))); + let max_actions = get_gt_fuzz_input_max_alter_actions(); + let actions = get_fuzz_override::("ACTIONS") + .unwrap_or_else(|| rng.random_range(1..max_actions)); + + Ok(FuzzInput { + seed, + actions, + partitions, + tables, + }) + } +} + +async fn execute_repartition_metric_table(ctx: FuzzContext, input: FuzzInput) -> Result<()> { + info!("input: {input:?}"); + let mut rng = ChaChaRng::seed_from_u64(input.seed); + let clock = Arc::new(Mutex::new(Timestamp::current_millis())); + + let (mut physical_table_ctx, logical_tables, create_logical_sqls, create_physical_sql) = + create_metric_tables(&ctx, &mut rng, input.partitions, input.tables).await?; + + let mut inserted_rows = HashMap::with_capacity(logical_tables.len()); + for table_ctx in logical_tables.values() { + inserted_rows.insert(table_ctx.name.to_string(), 0); + } + let csv_dump_session = CsvDumpSession::new(CsvDumpMetadata::new( + "fuzz_repartition_metric_table", + input.seed, + input.actions, + input.partitions, + input.tables, + ))?; + let sql_dump_session = SqlDumpSession::new(csv_dump_session.run_dir.clone())?; + let logical_table_names = logical_tables + .values() + .map(|table_ctx| table_ctx.name.to_string()) + .collect::>(); + + let mut sql_dump_session = sql_dump_session; + sql_dump_session.append_sql( + &physical_table_ctx.name.to_string(), + &create_physical_sql, + Some("kind=create_physical_table"), + )?; + for table_name in &logical_table_names { + if let Some(create_sql) = create_logical_sqls.get(table_name) { + sql_dump_session.append_sql( + table_name, + create_sql, + Some("kind=create_logical_table"), + )?; + } + } + + let shared_state = Arc::new(Mutex::new(SharedState { + clock, + inserted_rows, + csv_dump_session: Some(csv_dump_session), + sql_dump_session: Some(sql_dump_session), + running: true, + })); + let writer_rng = ChaChaRng::seed_from_u64(input.seed ^ 0xA5A5_A5A5_A5A5_A5A5); + let (control_tx, control_rx) = mpsc::unbounded_channel::(); + let writer_task = tokio::spawn(write_loop( + writer_rng, + ctx.clone(), + logical_tables.clone(), + shared_state.clone(), + control_rx, + )); + tokio::time::sleep(Duration::from_millis(100)).await; + + for i in 0..input.actions { + let partition_num = physical_table_ctx.partition.as_ref().unwrap().exprs.len(); + info!( + "partition_num: {partition_num}, action: {}/{}, table: {}, logical table num: {}", + i + 1, + input.actions, + physical_table_ctx.name, + logical_tables.len() + ); + + let repartition_expr = repartition_operation(&physical_table_ctx, &mut rng)?; + let translator = RepartitionExprTranslator; + let sql = translator.translate(&repartition_expr)?; + info!("Repartition sql: {sql}"); + let started_at_ms = current_time_millis(); + let now = Instant::now(); + let result = sqlx::query(&sql) + .execute(&ctx.greptime) + .await + .context(error::ExecuteQuerySnafu { sql: &sql })?; + let elapsed = now.elapsed(); + info!("Repartition result: {result:?}, elapsed: {elapsed:?}"); + + physical_table_ctx = Arc::new( + Arc::unwrap_or_clone(physical_table_ctx) + .repartition(repartition_expr) + .unwrap(), + ); + + let partition_entries = tests_fuzz::validator::partition::fetch_partitions_info_schema( + &ctx.greptime, + "public".into(), + &physical_table_ctx.name, + ) + .await?; + tests_fuzz::validator::partition::assert_partitions( + physical_table_ctx.partition.as_ref().unwrap(), + &partition_entries, + )?; + + { + let mut state = shared_state.lock().unwrap(); + if let Some(sql_dump_session) = state.sql_dump_session.as_mut() { + let repartition_comment = format!( + "kind=repartition table={} action_idx={} started_at_ms={} elapsed_ms={}", + physical_table_ctx.name, + i + 1, + started_at_ms, + elapsed.as_millis() + ); + sql_dump_session.append_sql( + &physical_table_ctx.name.to_string(), + &sql, + Some(&repartition_comment), + )?; + let event = format!( + "repartition action_idx={} started_at_ms={} elapsed_ms={} sql={}", + i + 1, + started_at_ms, + elapsed.as_millis(), + sql + ); + sql_dump_session.broadcast_event(logical_table_names.iter(), &event, &sql)?; + } + } + + let (ack_tx, ack_rx) = oneshot::channel(); + control_tx + .send(WriterControl::Barrier { + epoch: i + 1, + ack: ack_tx, + }) + .expect("barrier control send must succeed"); + tokio::time::timeout(Duration::from_secs(BARRIER_ACK_TIMEOUT_SECS), ack_rx) + .await + .expect("barrier ack timeout") + .expect("barrier ack dropped"); + + let inserted_rows_snapshot = flush_dump_sessions_and_snapshot(&shared_state)?; + info!("validate rows, epoch: {}", i + 1); + validate_rows(&ctx, &logical_tables, &inserted_rows_snapshot).await?; + + control_tx + .send(WriterControl::Resume { epoch: i + 1 }) + .expect("resume control send must succeed"); + } + + let _ = control_tx.send(WriterControl::Stop); + shared_state.lock().unwrap().running = false; + writer_task.await.unwrap().unwrap(); + let inserted_rows = flush_dump_sessions_and_snapshot(&shared_state)?; + let (mut csv_dump_session, mut sql_dump_session) = { + let mut state = shared_state.lock().unwrap(); + (state.csv_dump_session.take(), state.sql_dump_session.take()) + }; + + let run_result = async { + validate_rows(&ctx, &logical_tables, &inserted_rows).await?; + cleanup_tables(&ctx, &physical_table_ctx, &logical_tables).await?; + Ok(()) + } + .await; + + if let Some(csv_dump_session) = csv_dump_session.take() { + match &run_result { + Ok(_) => { + if let Err(err) = csv_dump_session.cleanup_on_success() { + warn!( + "Cleanup csv dump directory failed, path: {}, error: {:?}", + csv_dump_session.run_dir.display(), + err + ); + } + } + Err(_) => { + warn!( + "Keep csv dump directory for failure analysis, path: {}", + csv_dump_session.run_dir.display() + ); + } + } + } + if let Some(sql_dump_session) = sql_dump_session.take() + && run_result.is_err() + { + warn!( + "Keep sql dump directory for failure analysis, path: {}", + sql_dump_session.run_dir.display() + ); + } + + ctx.close().await; + run_result +} + +fuzz_target!(|input: FuzzInput| { + common_telemetry::init_default_ut_logging(); + common_runtime::block_on_global(async { + let Connections { mysql } = init_greptime_connections_via_env().await; + let ctx = FuzzContext { + greptime: mysql.expect("mysql connection init must be succeed"), + }; + execute_repartition_metric_table(ctx, input) + .await + .unwrap_or_else(|err| panic!("fuzz test must be succeed: {err:?}")); + }) +});