From 76fac359cd2474c86c7f8902b7128b67fa4433f3 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Wed, 3 Jul 2024 21:30:41 +0800 Subject: [PATCH] feat: implement naive fuzz test for region migration (#4252) * fix(fuzz): adapt for new partition rules * feat: implement naive fuzz test for region migration * chore(ci): add ci cfg * chore: apply suggestions from CR * chore: apply suggestions from CR --- .github/workflows/develop.yml | 2 +- tests-fuzz/Cargo.toml | 7 + tests-fuzz/src/context.rs | 5 + tests-fuzz/src/generator/create_expr.rs | 64 +++- tests-fuzz/src/ir.rs | 85 ++++- tests-fuzz/src/ir/insert_expr.rs | 39 +++ .../src/translator/mysql/create_expr.rs | 56 ++-- tests-fuzz/src/utils.rs | 2 + tests-fuzz/src/utils/migration.rs | 92 ++++++ tests-fuzz/src/utils/procedure.rs | 55 ++++ tests-fuzz/targets/fuzz_insert.rs | 22 +- .../targets/fuzz_insert_logical_table.rs | 22 +- .../migration/fuzz_migrate_mito_regions.rs | 305 ++++++++++++++++++ 13 files changed, 670 insertions(+), 86 deletions(-) create mode 100644 tests-fuzz/src/utils/migration.rs create mode 100644 tests-fuzz/src/utils/procedure.rs create mode 100644 tests-fuzz/targets/migration/fuzz_migrate_mito_regions.rs diff --git a/.github/workflows/develop.yml b/.github/workflows/develop.yml index 529655cf9a..53f089bf51 100644 --- a/.github/workflows/develop.yml +++ b/.github/workflows/develop.yml @@ -419,7 +419,7 @@ jobs: timeout-minutes: 60 strategy: matrix: - target: ["fuzz_failover_mito_regions"] + target: ["fuzz_migrate_mito_regions", "fuzz_failover_mito_regions"] mode: - name: "Remote WAL" minio: true diff --git a/tests-fuzz/Cargo.toml b/tests-fuzz/Cargo.toml index 0dd1b05d7d..1cb647b022 100644 --- a/tests-fuzz/Cargo.toml +++ b/tests-fuzz/Cargo.toml @@ -127,3 +127,10 @@ path = "targets/failover/fuzz_failover_mito_regions.rs" test = false bench = false doc = false + +[[bin]] +name = "fuzz_migrate_mito_regions" +path = "targets/migration/fuzz_migrate_mito_regions.rs" +test = false +bench = false +doc = false diff --git a/tests-fuzz/src/context.rs b/tests-fuzz/src/context.rs index 97d7bf5e2c..8cfd0ca9fa 100644 --- a/tests-fuzz/src/context.rs +++ b/tests-fuzz/src/context.rs @@ -57,6 +57,11 @@ impl From<&CreateTableExpr> for TableContext { } impl TableContext { + /// Returns the timestamp column + pub fn timestamp_column(&self) -> Option { + self.columns.iter().find(|c| c.is_time_index()).cloned() + } + /// Applies the [AlterTableExpr]. pub fn alter(mut self, expr: AlterTableExpr) -> Result { match expr.alter_options { diff --git a/tests-fuzz/src/generator/create_expr.rs b/tests-fuzz/src/generator/create_expr.rs index b274f0b997..93809c06d4 100644 --- a/tests-fuzz/src/generator/create_expr.rs +++ b/tests-fuzz/src/generator/create_expr.rs @@ -17,6 +17,7 @@ use std::collections::HashMap; use datatypes::data_type::ConcreteDataType; use datatypes::value::Value; use derive_builder::Builder; +use partition::expr::{Operand, PartitionExpr, RestrictedOp}; use partition::partition::{PartitionBound, PartitionDef}; use rand::seq::SliceRandom; use rand::Rng; @@ -29,7 +30,7 @@ use crate::fake::{random_capitalize_map, MappedGenerator, WordGenerator}; use crate::generator::{ColumnOptionGenerator, ConcreteDataTypeGenerator, Random}; use crate::ir::create_expr::{ColumnOption, CreateDatabaseExprBuilder, CreateTableExprBuilder}; use crate::ir::{ - column_options_generator, generate_columns, generate_random_value, + column_options_generator, generate_columns, generate_partition_bounds, generate_random_value, partible_column_options_generator, primary_key_options_generator, ts_column_options_generator, Column, ColumnTypeGenerator, CreateDatabaseExpr, CreateTableExpr, Ident, PartibleColumnTypeGenerator, StringColumnTypeGenerator, TsColumnTypeGenerator, @@ -141,20 +142,12 @@ impl Generator for CreateTableExprGenerato .remove(0); // Generates partition bounds. - let mut partition_bounds = Vec::with_capacity(self.partition); - for _ in 0..self.partition - 1 { - partition_bounds.push(PartitionBound::Value(generate_random_value( - rng, - &column.column_type, - None, - ))); - partition_bounds.sort(); - } - partition_bounds.push(PartitionBound::MaxValue); - builder.partition(PartitionDef::new( - vec![name.value.to_string()], - partition_bounds, - )); + let partition_def = generate_partition_def( + self.partition, + column.column_type.clone(), + name.clone(), + ); + builder.partition(partition_def); columns.push(column); } // Generates the ts column. @@ -203,6 +196,45 @@ impl Generator for CreateTableExprGenerato } } +fn generate_partition_def( + partitions: usize, + column_type: ConcreteDataType, + column_name: Ident, +) -> PartitionDef { + let bounds = generate_partition_bounds(&column_type, partitions - 1); + let mut partition_bounds = Vec::with_capacity(partitions); + + let first_bound = bounds[0].clone(); + partition_bounds.push(PartitionBound::Expr(PartitionExpr::new( + Operand::Column(column_name.to_string()), + RestrictedOp::Lt, + Operand::Value(first_bound), + ))); + for bound_idx in 1..bounds.len() { + partition_bounds.push(PartitionBound::Expr(PartitionExpr::new( + Operand::Expr(PartitionExpr::new( + Operand::Column(column_name.to_string()), + RestrictedOp::GtEq, + Operand::Value(bounds[bound_idx - 1].clone()), + )), + RestrictedOp::And, + Operand::Expr(PartitionExpr::new( + Operand::Column(column_name.to_string()), + RestrictedOp::Lt, + Operand::Value(bounds[bound_idx].clone()), + )), + ))); + } + let last_bound = bounds.last().unwrap().clone(); + partition_bounds.push(PartitionBound::Expr(PartitionExpr::new( + Operand::Column(column_name.to_string()), + RestrictedOp::GtEq, + Operand::Value(last_bound), + ))); + + PartitionDef::new(vec![column_name.to_string()], partition_bounds) +} + /// Generate a physical table with 2 columns: ts of TimestampType::Millisecond as time index and val of Float64Type. #[derive(Builder)] #[builder(pattern = "owned")] @@ -400,7 +432,7 @@ mod tests { .unwrap(); let serialized = serde_json::to_string(&expr).unwrap(); - let expected = r#"{"table_name":{"value":"tEmporIbUS","quote_style":null},"columns":[{"name":{"value":"IMpEdIT","quote_style":null},"column_type":{"String":null},"options":["PrimaryKey","NotNull"]},{"name":{"value":"natuS","quote_style":null},"column_type":{"Timestamp":{"Nanosecond":null}},"options":["TimeIndex"]},{"name":{"value":"ADIPisCI","quote_style":null},"column_type":{"Int16":{}},"options":[{"DefaultValue":{"Int16":4864}}]},{"name":{"value":"EXpEdita","quote_style":null},"column_type":{"Int64":{}},"options":["PrimaryKey"]},{"name":{"value":"cUlpA","quote_style":null},"column_type":{"Float64":{}},"options":["NotNull"]},{"name":{"value":"MOLeStIAs","quote_style":null},"column_type":{"Boolean":null},"options":["Null"]},{"name":{"value":"cUmquE","quote_style":null},"column_type":{"Float32":{}},"options":[{"DefaultValue":{"Float32":0.21569687}}]},{"name":{"value":"toTAm","quote_style":null},"column_type":{"Float64":{}},"options":["NotNull"]},{"name":{"value":"deBitIs","quote_style":null},"column_type":{"Float32":{}},"options":["Null"]},{"name":{"value":"QUi","quote_style":null},"column_type":{"Int64":{}},"options":["Null"]}],"if_not_exists":true,"partition":{"partition_columns":["IMpEdIT"],"partition_bounds":[{"Value":{"String":"򟘲"}},{"Value":{"String":"򴥫"}},"MaxValue"]},"engine":"mito2","options":{},"primary_keys":[0,3]}"#; + let expected = r#"{"table_name":{"value":"animI","quote_style":null},"columns":[{"name":{"value":"IMpEdIT","quote_style":null},"column_type":{"Float64":{}},"options":["PrimaryKey","NotNull"]},{"name":{"value":"natuS","quote_style":null},"column_type":{"Timestamp":{"Millisecond":null}},"options":["TimeIndex"]},{"name":{"value":"ADIPisCI","quote_style":null},"column_type":{"Float64":{}},"options":["Null"]},{"name":{"value":"EXpEdita","quote_style":null},"column_type":{"Int16":{}},"options":[{"DefaultValue":{"Int16":4864}}]},{"name":{"value":"cUlpA","quote_style":null},"column_type":{"Int64":{}},"options":["PrimaryKey"]},{"name":{"value":"MOLeStIAs","quote_style":null},"column_type":{"Float64":{}},"options":["NotNull"]},{"name":{"value":"cUmquE","quote_style":null},"column_type":{"Boolean":null},"options":["Null"]},{"name":{"value":"toTAm","quote_style":null},"column_type":{"Float32":{}},"options":[{"DefaultValue":{"Float32":0.21569687}}]},{"name":{"value":"deBitIs","quote_style":null},"column_type":{"Float64":{}},"options":["NotNull"]},{"name":{"value":"QUi","quote_style":null},"column_type":{"Float32":{}},"options":["Null"]}],"if_not_exists":true,"partition":{"partition_columns":["IMpEdIT"],"partition_bounds":[{"Expr":{"lhs":{"Column":"IMpEdIT"},"op":"Lt","rhs":{"Value":{"Float64":5.992310449541053e307}}}},{"Expr":{"lhs":{"Expr":{"lhs":{"Column":"IMpEdIT"},"op":"GtEq","rhs":{"Value":{"Float64":5.992310449541053e307}}}},"op":"And","rhs":{"Expr":{"lhs":{"Column":"IMpEdIT"},"op":"Lt","rhs":{"Value":{"Float64":1.1984620899082105e308}}}}}},{"Expr":{"lhs":{"Column":"IMpEdIT"},"op":"GtEq","rhs":{"Value":{"Float64":1.1984620899082105e308}}}}]},"engine":"mito2","options":{},"primary_keys":[0,4]}"#; assert_eq!(expected, serialized); } diff --git a/tests-fuzz/src/ir.rs b/tests-fuzz/src/ir.rs index 4777eb8166..53b3e42148 100644 --- a/tests-fuzz/src/ir.rs +++ b/tests-fuzz/src/ir.rs @@ -21,10 +21,11 @@ pub(crate) mod select_expr; use core::fmt; use std::collections::HashMap; -use std::sync::atomic::{AtomicI64, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; +use std::time::Duration; pub use alter_expr::AlterTableExpr; +use common_time::timestamp::TimeUnit; use common_time::{Date, DateTime, Timestamp}; pub use create_expr::{CreateDatabaseExpr, CreateTableExpr}; use datatypes::data_type::ConcreteDataType; @@ -65,8 +66,6 @@ lazy_static! { ConcreteDataType::float32_datatype(), ConcreteDataType::float64_datatype(), ConcreteDataType::string_datatype(), - ConcreteDataType::date_datatype(), - ConcreteDataType::datetime_datatype(), ]; pub static ref STRING_DATA_TYPES: Vec = vec![ConcreteDataType::string_datatype()]; @@ -102,6 +101,43 @@ pub struct MySQLTsColumnTypeGenerator; pub struct PartibleColumnTypeGenerator; pub struct StringColumnTypeGenerator; +/// FIXME(weny): Waits for https://github.com/GreptimeTeam/greptimedb/issues/4247 +macro_rules! generate_values { + ($data_type:ty, $bounds:expr) => {{ + let base = 0 as $data_type; + let step = <$data_type>::MAX / ($bounds as $data_type + 1 as $data_type) as $data_type; + (1..=$bounds) + .map(|i| Value::from(base + step * i as $data_type as $data_type)) + .collect::>() + }}; +} + +/// Generates partition bounds. +pub fn generate_partition_bounds(datatype: &ConcreteDataType, bounds: usize) -> Vec { + match datatype { + ConcreteDataType::Int16(_) => generate_values!(i16, bounds), + ConcreteDataType::Int32(_) => generate_values!(i32, bounds), + ConcreteDataType::Int64(_) => generate_values!(i64, bounds), + ConcreteDataType::Float32(_) => generate_values!(f32, bounds), + ConcreteDataType::Float64(_) => generate_values!(f64, bounds), + ConcreteDataType::String(_) => { + let base = b'A'; + let range = b'z' - b'A'; + let step = range / (bounds as u8 + 1); + (1..=bounds) + .map(|i| { + Value::from( + char::from(base + step * i as u8) + .escape_default() + .to_string(), + ) + }) + .collect() + } + _ => unimplemented!("unsupported type: {datatype}"), + } +} + /// Generates a random [Value]. pub fn generate_random_value( rng: &mut R, @@ -128,15 +164,19 @@ pub fn generate_random_value( /// Generate monotonically increasing timestamps for MySQL. pub fn generate_unique_timestamp_for_mysql(base: i64) -> TsValueGenerator { - let base = Arc::new(AtomicI64::new(base)); + let base = Timestamp::new_millisecond(base); + let clock = Arc::new(Mutex::new(base)); Box::new(move |_rng, ts_type| -> Value { - let value = base.fetch_add(1, Ordering::Relaxed); + let mut clock = clock.lock().unwrap(); + let ts = clock.add_duration(Duration::from_secs(1)).unwrap(); + *clock = ts; + let v = match ts_type { - TimestampType::Second(_) => Timestamp::new_second(1 + value), - TimestampType::Millisecond(_) => Timestamp::new_millisecond(1000 + value), - TimestampType::Microsecond(_) => Timestamp::new_microsecond(1_000_000 + value), - TimestampType::Nanosecond(_) => Timestamp::new_nanosecond(1_000_000_000 + value), + TimestampType::Second(_) => ts.convert_to(TimeUnit::Second).unwrap(), + TimestampType::Millisecond(_) => ts.convert_to(TimeUnit::Millisecond).unwrap(), + TimestampType::Microsecond(_) => ts.convert_to(TimeUnit::Microsecond).unwrap(), + TimestampType::Nanosecond(_) => ts.convert_to(TimeUnit::Nanosecond).unwrap(), }; Value::from(v) }) @@ -496,6 +536,31 @@ pub fn replace_default( new_rows } +/// Sorts a vector of rows based on the values in the specified primary key columns. +pub fn sort_by_primary_keys(rows: &mut [RowValues], primary_keys_idx: Vec) { + rows.sort_by(|a, b| { + let a_keys: Vec<_> = primary_keys_idx.iter().map(|&i| &a[i]).collect(); + let b_keys: Vec<_> = primary_keys_idx.iter().map(|&i| &b[i]).collect(); + for (a_key, b_key) in a_keys.iter().zip(b_keys.iter()) { + match a_key.cmp(b_key) { + Some(std::cmp::Ordering::Equal) => continue, + non_eq => return non_eq.unwrap(), + } + } + std::cmp::Ordering::Equal + }); +} + +/// Formats a slice of columns into a comma-separated string of column names. +pub fn format_columns(columns: &[Column]) -> String { + columns + .iter() + .map(|c| c.name.to_string()) + .collect::>() + .join(", ") + .to_string() +} + #[cfg(test)] mod tests { use super::*; diff --git a/tests-fuzz/src/ir/insert_expr.rs b/tests-fuzz/src/ir/insert_expr.rs index 1ecdb7eb58..da8ca202ca 100644 --- a/tests-fuzz/src/ir/insert_expr.rs +++ b/tests-fuzz/src/ir/insert_expr.rs @@ -25,6 +25,45 @@ pub struct InsertIntoExpr { pub values_list: Vec, } +impl InsertIntoExpr { + /// Returns the timestamp column + pub fn timestamp_column(&self) -> Option { + self.columns.iter().find(|c| c.is_time_index()).cloned() + } + + /// Returns index of the timestamp column + pub fn timestamp_column_idx(&self) -> Option { + self.columns + .iter() + .enumerate() + .find_map(|(idx, c)| if c.is_time_index() { Some(idx) } else { None }) + } + + /// Returns a vector of columns that are primary keys or time indices. + pub fn primary_key_columns(&self) -> Vec { + self.columns + .iter() + .filter(|c| c.is_primary_key() || c.is_time_index()) + .cloned() + .collect::>() + } + + /// Returns the indices of columns that are primary keys or time indices. + pub fn primary_key_column_idx(&self) -> Vec { + self.columns + .iter() + .enumerate() + .filter_map(|(i, c)| { + if c.is_primary_key() || c.is_time_index() { + Some(i) + } else { + None + } + }) + .collect::>() + } +} + pub type RowValues = Vec; #[derive(PartialEq, PartialOrd, Clone)] diff --git a/tests-fuzz/src/translator/mysql/create_expr.rs b/tests-fuzz/src/translator/mysql/create_expr.rs index 5e5e76fa55..3ce659bf6e 100644 --- a/tests-fuzz/src/translator/mysql/create_expr.rs +++ b/tests-fuzz/src/translator/mysql/create_expr.rs @@ -75,17 +75,12 @@ impl CreateTableExprTranslator { fn format_partition(input: &CreateTableExpr) -> Option { input.partition.as_ref().map(|partition| { format!( - "PARTITION BY RANGE COLUMNS({}) (\n{}\n)", + "PARTITION ON COLUMNS({}) (\n{}\n)", partition.partition_columns().join(", "), partition .partition_bounds() .iter() - .enumerate() - .map(|(i, bound)| format!( - "PARTITION r{} VALUES LESS THAN ({})", - i, - Self::format_partition_bound(bound) - )) + .map(Self::format_partition_bound) .collect::>() .join(",\n") ) @@ -138,12 +133,12 @@ impl CreateTableExprTranslator { fn format_table_options(input: &CreateTableExpr) -> String { let mut output = vec![]; - if !input.engine.is_empty() { - output.push(format!("ENGINE={}", input.engine)); - } if let Some(partition) = Self::format_partition(input) { output.push(partition); } + if !input.engine.is_empty() { + output.push(format!("ENGINE={}", input.engine)); + } output.join("\n") } @@ -187,7 +182,7 @@ impl CreateDatabaseExprTranslator { #[cfg(test)] mod tests { - use datatypes::value::Value; + use partition::expr::{Operand, PartitionExpr, RestrictedOp}; use partition::partition::{PartitionBound, PartitionDef}; use super::CreateTableExprTranslator; @@ -206,9 +201,29 @@ mod tests { .partition(PartitionDef::new( vec!["idc".to_string()], vec![ - PartitionBound::Value(Value::String("a".into())), - PartitionBound::Value(Value::String("f".into())), - PartitionBound::MaxValue, + PartitionBound::Expr(PartitionExpr::new( + Operand::Column("idc".to_string()), + RestrictedOp::Lt, + Operand::Value(datatypes::value::Value::Int32(10)), + )), + PartitionBound::Expr(PartitionExpr::new( + Operand::Expr(PartitionExpr::new( + Operand::Column("idc".to_string()), + RestrictedOp::GtEq, + Operand::Value(datatypes::value::Value::Int32(10)), + )), + RestrictedOp::And, + Operand::Expr(PartitionExpr::new( + Operand::Column("idc".to_string()), + RestrictedOp::Lt, + Operand::Value(datatypes::value::Value::Int32(50)), + )), + )), + PartitionBound::Expr(PartitionExpr::new( + Operand::Column("idc".to_string()), + RestrictedOp::GtEq, + Operand::Value(datatypes::value::Value::Int32(50)), + )), ], )) .build() @@ -217,7 +232,6 @@ mod tests { let output = CreateTableExprTranslator .translate(&create_table_expr) .unwrap(); - assert_eq!( "CREATE TABLE system_metrics( host STRING, @@ -228,12 +242,12 @@ disk_util DOUBLE, ts TIMESTAMP(3) TIME INDEX, PRIMARY KEY(host, idc) ) -ENGINE=mito -PARTITION BY RANGE COLUMNS(idc) ( -PARTITION r0 VALUES LESS THAN ('a'), -PARTITION r1 VALUES LESS THAN ('f'), -PARTITION r2 VALUES LESS THAN (MAXVALUE) -);", +PARTITION ON COLUMNS(idc) ( +idc < 10, +idc >= 10 AND idc < 50, +idc >= 50 +) +ENGINE=mito;", output ); } diff --git a/tests-fuzz/src/utils.rs b/tests-fuzz/src/utils.rs index 0251e988d3..d1b75e51d3 100644 --- a/tests-fuzz/src/utils.rs +++ b/tests-fuzz/src/utils.rs @@ -16,8 +16,10 @@ pub mod cluster_info; pub mod config; pub mod crd; pub mod health; +pub mod migration; pub mod partition; pub mod pod_failure; +pub mod procedure; #[cfg(feature = "unstable")] pub mod process; pub mod wait; diff --git a/tests-fuzz/src/utils/migration.rs b/tests-fuzz/src/utils/migration.rs new file mode 100644 index 0000000000..385a5a93f3 --- /dev/null +++ b/tests-fuzz/src/utils/migration.rs @@ -0,0 +1,92 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{BTreeMap, HashSet}; +use std::time::Duration; + +use common_telemetry::info; +use snafu::ResultExt; +use sqlx::{MySql, Pool, Row}; +use store_api::storage::RegionId; + +use crate::error::{self}; +use crate::ir::Ident; +use crate::utils::partition::{fetch_partitions, region_distribution}; +use crate::utils::wait::wait_condition_fn; + +/// Migrates a region from one peer to another within a specified timeout. +/// +/// Returns the procedure id. +pub async fn migrate_region( + e: &Pool, + region_id: u64, + from_peer_id: u64, + to_peer_id: u64, + timeout_secs: u64, +) -> String { + let sql = format!( + "select migrate_region({region_id}, {from_peer_id}, {to_peer_id}, {timeout_secs}) as output;" + ); + let result = sqlx::query(&sql) + .fetch_one(e) + .await + .context(error::ExecuteQuerySnafu { sql }) + .unwrap(); + result.try_get(0).unwrap() +} + +/// Waits until the region distribution matches the expected distribution within a specified timeout. +pub async fn wait_for_region_distribution( + greptime: &Pool, + timeout: Duration, + table_name: Ident, + expected_region_distribution: BTreeMap>, +) { + wait_condition_fn( + timeout, + || { + let greptime = greptime.clone(); + let table_name = table_name.clone(); + Box::pin(async move { + let partitions = fetch_partitions(&greptime, table_name).await.unwrap(); + region_distribution(partitions) + .into_iter() + .map(|(datanode, regions)| { + (datanode, regions.into_iter().collect::>()) + }) + .collect::>() + }) + }, + move |region_distribution| { + info!("Region Distribution: {:?}", region_distribution); + if expected_region_distribution.keys().len() != region_distribution.keys().len() { + return false; + } + + for (datanode, expected_regions) in &expected_region_distribution { + match region_distribution.get(datanode) { + Some(regions) => { + if expected_regions != regions { + return false; + } + } + None => return false, + } + } + true + }, + Duration::from_secs(5), + ) + .await +} diff --git a/tests-fuzz/src/utils/procedure.rs b/tests-fuzz/src/utils/procedure.rs new file mode 100644 index 0000000000..3eab6fd9eb --- /dev/null +++ b/tests-fuzz/src/utils/procedure.rs @@ -0,0 +1,55 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use common_telemetry::info; +use snafu::ResultExt; +use sqlx::{MySql, Pool, Row}; + +use super::wait::wait_condition_fn; +use crate::error; + +/// Fetches the state of a procedure. +pub async fn procedure_state(e: &Pool, procedure_id: String) -> String { + let sql = format!("select procedure_state(\"{procedure_id}\");"); + let result = sqlx::query(&sql) + .fetch_one(e) + .await + .context(error::ExecuteQuerySnafu { sql }) + .unwrap(); + result.try_get(0).unwrap() +} + +/// Waits for a procedure to finish within a specified timeout period. +pub async fn wait_for_procedure_finish( + greptime: &Pool, + timeout: Duration, + procedure_id: String, +) { + wait_condition_fn( + timeout, + || { + let greptime = greptime.clone(); + let procedure_id = procedure_id.clone(); + Box::pin(async move { procedure_state(&greptime, procedure_id).await }) + }, + |output| { + info!("Procedure({procedure_id}) state: {:?}", output); + output.to_lowercase().contains("done") + }, + Duration::from_secs(5), + ) + .await +} diff --git a/tests-fuzz/targets/fuzz_insert.rs b/tests-fuzz/targets/fuzz_insert.rs index e73d2287ba..1e75ef3b37 100644 --- a/tests-fuzz/targets/fuzz_insert.rs +++ b/tests-fuzz/targets/fuzz_insert.rs @@ -33,8 +33,8 @@ use tests_fuzz::generator::create_expr::CreateTableExprGeneratorBuilder; use tests_fuzz::generator::insert_expr::InsertExprGeneratorBuilder; use tests_fuzz::generator::Generator; use tests_fuzz::ir::{ - generate_random_timestamp_for_mysql, generate_random_value, replace_default, CreateTableExpr, - InsertIntoExpr, MySQLTsColumnTypeGenerator, + generate_random_timestamp_for_mysql, generate_random_value, replace_default, + sort_by_primary_keys, CreateTableExpr, InsertIntoExpr, MySQLTsColumnTypeGenerator, }; use tests_fuzz::translator::mysql::create_expr::CreateTableExprTranslator; use tests_fuzz::translator::mysql::insert_expr::InsertIntoExprTranslator; @@ -189,23 +189,7 @@ async fn execute_insert(ctx: FuzzContext, input: FuzzInput) -> Result<()> { ); let fetched_rows = validator::row::fetch_values(&ctx.greptime, select_sql.as_str()).await?; let mut expected_rows = replace_default(&insert_expr.values_list, &table_ctx, &insert_expr); - expected_rows.sort_by(|a, b| { - let a_keys: Vec<_> = primary_keys_idxs_in_insert_expr - .iter() - .map(|&i| &a[i]) - .collect(); - let b_keys: Vec<_> = primary_keys_idxs_in_insert_expr - .iter() - .map(|&i| &b[i]) - .collect(); - for (a_key, b_key) in a_keys.iter().zip(b_keys.iter()) { - match a_key.cmp(b_key) { - Some(std::cmp::Ordering::Equal) => continue, - non_eq => return non_eq.unwrap(), - } - } - std::cmp::Ordering::Equal - }); + sort_by_primary_keys(&mut expected_rows, primary_keys_idxs_in_insert_expr); validator::row::assert_eq::(&insert_expr.columns, &fetched_rows, &expected_rows)?; // Cleans up diff --git a/tests-fuzz/targets/fuzz_insert_logical_table.rs b/tests-fuzz/targets/fuzz_insert_logical_table.rs index 9d7a0a1c00..a9ca79bc2c 100644 --- a/tests-fuzz/targets/fuzz_insert_logical_table.rs +++ b/tests-fuzz/targets/fuzz_insert_logical_table.rs @@ -36,8 +36,8 @@ use tests_fuzz::generator::create_expr::{ use tests_fuzz::generator::insert_expr::InsertExprGeneratorBuilder; use tests_fuzz::generator::Generator; use tests_fuzz::ir::{ - generate_random_timestamp_for_mysql, generate_random_value, replace_default, CreateTableExpr, - InsertIntoExpr, + generate_random_timestamp_for_mysql, generate_random_value, replace_default, + sort_by_primary_keys, CreateTableExpr, InsertIntoExpr, }; use tests_fuzz::translator::mysql::create_expr::CreateTableExprTranslator; use tests_fuzz::translator::mysql::insert_expr::InsertIntoExprTranslator; @@ -187,23 +187,7 @@ async fn validate_values( let fetched_rows = validator::row::fetch_values(&ctx.greptime, select_sql.as_str()).await?; let mut expected_rows = replace_default(&insert_expr.values_list, &logical_table_ctx, insert_expr); - expected_rows.sort_by(|a, b| { - let a_keys: Vec<_> = primary_keys_idxs_in_insert_expr - .iter() - .map(|&i| &a[i]) - .collect(); - let b_keys: Vec<_> = primary_keys_idxs_in_insert_expr - .iter() - .map(|&i| &b[i]) - .collect(); - for (a_key, b_key) in a_keys.iter().zip(b_keys.iter()) { - match a_key.cmp(b_key) { - Some(std::cmp::Ordering::Equal) => continue, - non_eq => return non_eq.unwrap(), - } - } - std::cmp::Ordering::Equal - }); + sort_by_primary_keys(&mut expected_rows, primary_keys_idxs_in_insert_expr); validator::row::assert_eq::(&insert_expr.columns, &fetched_rows, &expected_rows)?; Ok(()) diff --git a/tests-fuzz/targets/migration/fuzz_migrate_mito_regions.rs b/tests-fuzz/targets/migration/fuzz_migrate_mito_regions.rs new file mode 100644 index 0000000000..d94742ab9b --- /dev/null +++ b/tests-fuzz/targets/migration/fuzz_migrate_mito_regions.rs @@ -0,0 +1,305 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![no_main] + +use std::collections::{BTreeMap, HashSet}; +use std::sync::Arc; +use std::time::Duration; + +use arbitrary::{Arbitrary, Unstructured}; +use common_telemetry::info; +use libfuzzer_sys::fuzz_target; +use rand::{Rng, SeedableRng}; +use rand_chacha::ChaChaRng; +use snafu::{ensure, ResultExt}; +use sqlx::{Executor, MySql, Pool}; +use store_api::storage::RegionId; +use tests_fuzz::context::{TableContext, TableContextRef}; +use tests_fuzz::error::{self, Result}; +use tests_fuzz::fake::{ + merge_two_word_map_fn, random_capitalize_map, uppercase_and_keyword_backtick_map, + MappedGenerator, WordGenerator, +}; +use tests_fuzz::generator::create_expr::CreateTableExprGeneratorBuilder; +use tests_fuzz::generator::insert_expr::InsertExprGeneratorBuilder; +use tests_fuzz::generator::Generator; +use tests_fuzz::ir::{ + format_columns, generate_random_value, generate_unique_timestamp_for_mysql, replace_default, + sort_by_primary_keys, CreateTableExpr, InsertIntoExpr, MySQLTsColumnTypeGenerator, +}; +use tests_fuzz::translator::mysql::create_expr::CreateTableExprTranslator; +use tests_fuzz::translator::mysql::insert_expr::InsertIntoExprTranslator; +use tests_fuzz::translator::DslTranslator; +use tests_fuzz::utils::cluster_info::{fetch_nodes, PEER_TYPE_DATANODE}; +use tests_fuzz::utils::migration::{migrate_region, wait_for_region_distribution}; +use tests_fuzz::utils::partition::{fetch_partitions, region_distribution}; +use tests_fuzz::utils::procedure::wait_for_procedure_finish; +use tests_fuzz::utils::{ + compact_table, flush_memtable, init_greptime_connections_via_env, Connections, +}; +use tests_fuzz::validator; + +struct FuzzContext { + greptime: Pool, +} + +impl FuzzContext { + async fn close(self) { + self.greptime.close().await; + } +} + +#[derive(Copy, Clone, Debug)] +struct FuzzInput { + seed: u64, + columns: usize, + partitions: usize, + rows: usize, + inserts: usize, +} + +impl Arbitrary<'_> for FuzzInput { + fn arbitrary(u: &mut Unstructured<'_>) -> arbitrary::Result { + let seed = u.int_in_range(u64::MIN..=u64::MAX)?; + let mut rng = ChaChaRng::seed_from_u64(seed); + let partitions = rng.gen_range(3..32); + let columns = rng.gen_range(2..30); + let rows = rng.gen_range(128..1024); + let inserts = rng.gen_range(2..8); + Ok(FuzzInput { + partitions, + columns, + rows, + seed, + inserts, + }) + } +} + +fn generate_create_expr( + input: FuzzInput, + rng: &mut R, +) -> Result { + let create_table_generator = CreateTableExprGeneratorBuilder::default() + .name_generator(Box::new(MappedGenerator::new( + WordGenerator, + merge_two_word_map_fn(random_capitalize_map, uppercase_and_keyword_backtick_map), + ))) + .columns(input.columns) + .partition(input.partitions) + .engine("mito") + .ts_column_type_generator(Box::new(MySQLTsColumnTypeGenerator)) + .build() + .unwrap(); + create_table_generator.generate(rng) +} + +fn generate_insert_exprs( + input: FuzzInput, + rng: &mut R, + table_ctx: TableContextRef, +) -> Result> { + let omit_column_list = rng.gen_bool(0.2); + let insert_generator = InsertExprGeneratorBuilder::default() + .table_ctx(table_ctx.clone()) + .omit_column_list(omit_column_list) + .rows(input.rows) + .ts_value_generator(generate_unique_timestamp_for_mysql(0)) + .value_generator(Box::new(generate_random_value)) + .build() + .unwrap(); + (0..input.inserts) + .map(|_| insert_generator.generate(rng)) + .collect::>>() +} + +struct Migration { + from_peer: u64, + to_peer: u64, + region_id: RegionId, +} + +async fn execute_region_migration(ctx: FuzzContext, input: FuzzInput) -> Result<()> { + info!("input: {:?}", input); + let mut rng = ChaChaRng::seed_from_u64(input.seed); + + let create_expr = generate_create_expr(input, &mut rng)?; + let translator = CreateTableExprTranslator; + let sql = translator.translate(&create_expr)?; + let _result = sqlx::query(&sql) + .execute(&ctx.greptime) + .await + .context(error::ExecuteQuerySnafu { sql: &sql })?; + + let table_ctx = Arc::new(TableContext::from(&create_expr)); + // Inserts data into the table + let insert_exprs = generate_insert_exprs(input, &mut rng, table_ctx.clone())?; + for insert_expr in &insert_exprs { + let translator = InsertIntoExprTranslator; + let sql = translator.translate(insert_expr)?; + let result = ctx + .greptime + // unprepared query, see + .execute(sql.as_str()) + .await + .context(error::ExecuteQuerySnafu { sql: &sql })?; + ensure!( + result.rows_affected() == input.rows as u64, + error::AssertSnafu { + reason: format!( + "expected rows affected: {}, actual: {}", + input.rows, + result.rows_affected(), + ) + } + ); + if rng.gen_bool(0.2) { + flush_memtable(&ctx.greptime, &create_expr.table_name).await?; + } + if rng.gen_bool(0.1) { + compact_table(&ctx.greptime, &create_expr.table_name).await?; + } + } + + // Fetches region distribution + let partitions = fetch_partitions(&ctx.greptime, table_ctx.name.clone()).await?; + let num_partitions = partitions.len(); + let region_distribution = region_distribution(partitions); + info!("Region distribution: {region_distribution:?}"); + let datanodes = fetch_nodes(&ctx.greptime) + .await? + .into_iter() + .flat_map(|node| { + if node.peer_type == PEER_TYPE_DATANODE { + Some(node) + } else { + None + } + }) + .collect::>(); + info!("List datanodes: {:?}", datanodes); + + // Generates region migration task. + let mut migrations = Vec::with_capacity(num_partitions); + let mut new_distribution: BTreeMap> = BTreeMap::new(); + for (datanode_id, regions) in region_distribution { + let step = rng.gen_range(1..datanodes.len()); + for region in regions { + let to_peer = (datanode_id + step as u64) % datanodes.len() as u64; + new_distribution.entry(to_peer).or_default().insert(region); + migrations.push(Migration { + from_peer: datanode_id, + to_peer, + region_id: region, + }) + } + } + + let mut procedure_ids = Vec::with_capacity(migrations.len()); + // Triggers region migrations + for Migration { + from_peer, + to_peer, + region_id, + } in &migrations + { + let procedure_id = + migrate_region(&ctx.greptime, region_id.as_u64(), *from_peer, *to_peer, 120).await; + info!("Migrating region: {region_id} from {from_peer} to {to_peer}, procedure: {procedure_id}"); + procedure_ids.push(procedure_id); + } + info!("Excepted new region distribution: {new_distribution:?}"); + + // Waits for all region migrated + wait_for_region_distribution( + &ctx.greptime, + Duration::from_secs(120), + table_ctx.name.clone(), + new_distribution, + ) + .await; + + for procedure_id in procedure_ids { + wait_for_procedure_finish(&ctx.greptime, Duration::from_secs(120), procedure_id).await; + } + + // Values validation + info!("Validating rows"); + let ts_column = table_ctx.timestamp_column().unwrap(); + for (idx, insert_expr) in insert_exprs[0..insert_exprs.len() - 1].iter().enumerate() { + let ts_column_idx = insert_expr.timestamp_column_idx().unwrap(); + let ts_value = insert_expr.values_list[0][ts_column_idx].clone(); + let next_batch_ts_column_idx = insert_exprs[idx + 1].timestamp_column_idx().unwrap(); + let next_batch_ts = insert_exprs[idx + 1].values_list[0][next_batch_ts_column_idx].clone(); + + let primary_keys_idx = insert_expr.primary_key_column_idx(); + let column_list = format_columns(&insert_expr.columns); + let primary_keys_column_list = format_columns(&insert_expr.primary_key_columns()); + let select_sql = format!( + "SELECT {} FROM {} WHERE {} >= {} AND {} < {} ORDER BY {};", + column_list, + create_expr.table_name, + ts_column.name, + ts_value, + ts_column.name, + next_batch_ts, + primary_keys_column_list + ); + info!("Executing sql: {select_sql}"); + let fetched_rows = ctx.greptime.fetch_all(select_sql.as_str()).await.unwrap(); + let mut expected_rows = replace_default(&insert_expr.values_list, &table_ctx, insert_expr); + sort_by_primary_keys(&mut expected_rows, primary_keys_idx); + validator::row::assert_eq::(&insert_expr.columns, &fetched_rows, &expected_rows)?; + } + let insert_expr = insert_exprs.last().unwrap(); + let ts_column_idx = insert_expr.timestamp_column_idx().unwrap(); + let ts_value = insert_expr.values_list[0][ts_column_idx].clone(); + let primary_keys_idx = insert_expr.primary_key_column_idx(); + let column_list = format_columns(&insert_expr.columns); + let primary_keys_column_list = format_columns(&insert_expr.primary_key_columns()); + let select_sql = format!( + "SELECT {} FROM {} WHERE {} >= {} ORDER BY {};", + column_list, create_expr.table_name, ts_column.name, ts_value, primary_keys_column_list + ); + info!("Executing sql: {select_sql}"); + let fetched_rows = ctx.greptime.fetch_all(select_sql.as_str()).await.unwrap(); + let mut expected_rows = replace_default(&insert_expr.values_list, &table_ctx, insert_expr); + sort_by_primary_keys(&mut expected_rows, primary_keys_idx); + validator::row::assert_eq::(&insert_expr.columns, &fetched_rows, &expected_rows)?; + + // Cleans up + let sql = format!("DROP TABLE {}", table_ctx.name); + let result = sqlx::query(&sql) + .execute(&ctx.greptime) + .await + .context(error::ExecuteQuerySnafu { sql })?; + info!("Drop table: {}\n\nResult: {result:?}\n\n", table_ctx.name); + ctx.close().await; + Ok(()) +} + +fuzz_target!(|input: FuzzInput| { + common_telemetry::init_default_ut_logging(); + common_runtime::block_on_write(async { + let Connections { mysql } = init_greptime_connections_via_env().await; + let ctx = FuzzContext { + greptime: mysql.expect("mysql connection init must be succeed"), + }; + + execute_region_migration(ctx, input) + .await + .unwrap_or_else(|err| panic!("fuzz test must be succeed: {err:?}")); + }) +});