feat: implement naive fuzz test for region migration (#4252)

* fix(fuzz): adapt for new partition rules

* feat: implement naive fuzz test for region migration

* chore(ci): add ci cfg

* chore: apply suggestions from CR

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2024-07-03 21:30:41 +08:00
committed by GitHub
parent 705b22411b
commit 76fac359cd
13 changed files with 670 additions and 86 deletions

View File

@@ -419,7 +419,7 @@ jobs:
timeout-minutes: 60
strategy:
matrix:
target: ["fuzz_failover_mito_regions"]
target: ["fuzz_migrate_mito_regions", "fuzz_failover_mito_regions"]
mode:
- name: "Remote WAL"
minio: true

View File

@@ -127,3 +127,10 @@ path = "targets/failover/fuzz_failover_mito_regions.rs"
test = false
bench = false
doc = false
[[bin]]
name = "fuzz_migrate_mito_regions"
path = "targets/migration/fuzz_migrate_mito_regions.rs"
test = false
bench = false
doc = false

View File

@@ -57,6 +57,11 @@ impl From<&CreateTableExpr> for TableContext {
}
impl TableContext {
/// Returns the timestamp column
pub fn timestamp_column(&self) -> Option<Column> {
self.columns.iter().find(|c| c.is_time_index()).cloned()
}
/// Applies the [AlterTableExpr].
pub fn alter(mut self, expr: AlterTableExpr) -> Result<TableContext> {
match expr.alter_options {

View File

@@ -17,6 +17,7 @@ use std::collections::HashMap;
use datatypes::data_type::ConcreteDataType;
use datatypes::value::Value;
use derive_builder::Builder;
use partition::expr::{Operand, PartitionExpr, RestrictedOp};
use partition::partition::{PartitionBound, PartitionDef};
use rand::seq::SliceRandom;
use rand::Rng;
@@ -29,7 +30,7 @@ use crate::fake::{random_capitalize_map, MappedGenerator, WordGenerator};
use crate::generator::{ColumnOptionGenerator, ConcreteDataTypeGenerator, Random};
use crate::ir::create_expr::{ColumnOption, CreateDatabaseExprBuilder, CreateTableExprBuilder};
use crate::ir::{
column_options_generator, generate_columns, generate_random_value,
column_options_generator, generate_columns, generate_partition_bounds, generate_random_value,
partible_column_options_generator, primary_key_options_generator, ts_column_options_generator,
Column, ColumnTypeGenerator, CreateDatabaseExpr, CreateTableExpr, Ident,
PartibleColumnTypeGenerator, StringColumnTypeGenerator, TsColumnTypeGenerator,
@@ -141,20 +142,12 @@ impl<R: Rng + 'static> Generator<CreateTableExpr, R> for CreateTableExprGenerato
.remove(0);
// Generates partition bounds.
let mut partition_bounds = Vec::with_capacity(self.partition);
for _ in 0..self.partition - 1 {
partition_bounds.push(PartitionBound::Value(generate_random_value(
rng,
&column.column_type,
None,
)));
partition_bounds.sort();
}
partition_bounds.push(PartitionBound::MaxValue);
builder.partition(PartitionDef::new(
vec![name.value.to_string()],
partition_bounds,
));
let partition_def = generate_partition_def(
self.partition,
column.column_type.clone(),
name.clone(),
);
builder.partition(partition_def);
columns.push(column);
}
// Generates the ts column.
@@ -203,6 +196,45 @@ impl<R: Rng + 'static> Generator<CreateTableExpr, R> for CreateTableExprGenerato
}
}
fn generate_partition_def(
partitions: usize,
column_type: ConcreteDataType,
column_name: Ident,
) -> PartitionDef {
let bounds = generate_partition_bounds(&column_type, partitions - 1);
let mut partition_bounds = Vec::with_capacity(partitions);
let first_bound = bounds[0].clone();
partition_bounds.push(PartitionBound::Expr(PartitionExpr::new(
Operand::Column(column_name.to_string()),
RestrictedOp::Lt,
Operand::Value(first_bound),
)));
for bound_idx in 1..bounds.len() {
partition_bounds.push(PartitionBound::Expr(PartitionExpr::new(
Operand::Expr(PartitionExpr::new(
Operand::Column(column_name.to_string()),
RestrictedOp::GtEq,
Operand::Value(bounds[bound_idx - 1].clone()),
)),
RestrictedOp::And,
Operand::Expr(PartitionExpr::new(
Operand::Column(column_name.to_string()),
RestrictedOp::Lt,
Operand::Value(bounds[bound_idx].clone()),
)),
)));
}
let last_bound = bounds.last().unwrap().clone();
partition_bounds.push(PartitionBound::Expr(PartitionExpr::new(
Operand::Column(column_name.to_string()),
RestrictedOp::GtEq,
Operand::Value(last_bound),
)));
PartitionDef::new(vec![column_name.to_string()], partition_bounds)
}
/// Generate a physical table with 2 columns: ts of TimestampType::Millisecond as time index and val of Float64Type.
#[derive(Builder)]
#[builder(pattern = "owned")]
@@ -400,7 +432,7 @@ mod tests {
.unwrap();
let serialized = serde_json::to_string(&expr).unwrap();
let expected = r#"{"table_name":{"value":"tEmporIbUS","quote_style":null},"columns":[{"name":{"value":"IMpEdIT","quote_style":null},"column_type":{"String":null},"options":["PrimaryKey","NotNull"]},{"name":{"value":"natuS","quote_style":null},"column_type":{"Timestamp":{"Nanosecond":null}},"options":["TimeIndex"]},{"name":{"value":"ADIPisCI","quote_style":null},"column_type":{"Int16":{}},"options":[{"DefaultValue":{"Int16":4864}}]},{"name":{"value":"EXpEdita","quote_style":null},"column_type":{"Int64":{}},"options":["PrimaryKey"]},{"name":{"value":"cUlpA","quote_style":null},"column_type":{"Float64":{}},"options":["NotNull"]},{"name":{"value":"MOLeStIAs","quote_style":null},"column_type":{"Boolean":null},"options":["Null"]},{"name":{"value":"cUmquE","quote_style":null},"column_type":{"Float32":{}},"options":[{"DefaultValue":{"Float32":0.21569687}}]},{"name":{"value":"toTAm","quote_style":null},"column_type":{"Float64":{}},"options":["NotNull"]},{"name":{"value":"deBitIs","quote_style":null},"column_type":{"Float32":{}},"options":["Null"]},{"name":{"value":"QUi","quote_style":null},"column_type":{"Int64":{}},"options":["Null"]}],"if_not_exists":true,"partition":{"partition_columns":["IMpEdIT"],"partition_bounds":[{"Value":{"String":"򟘲"}},{"Value":{"String":"򴥫"}},"MaxValue"]},"engine":"mito2","options":{},"primary_keys":[0,3]}"#;
let expected = r#"{"table_name":{"value":"animI","quote_style":null},"columns":[{"name":{"value":"IMpEdIT","quote_style":null},"column_type":{"Float64":{}},"options":["PrimaryKey","NotNull"]},{"name":{"value":"natuS","quote_style":null},"column_type":{"Timestamp":{"Millisecond":null}},"options":["TimeIndex"]},{"name":{"value":"ADIPisCI","quote_style":null},"column_type":{"Float64":{}},"options":["Null"]},{"name":{"value":"EXpEdita","quote_style":null},"column_type":{"Int16":{}},"options":[{"DefaultValue":{"Int16":4864}}]},{"name":{"value":"cUlpA","quote_style":null},"column_type":{"Int64":{}},"options":["PrimaryKey"]},{"name":{"value":"MOLeStIAs","quote_style":null},"column_type":{"Float64":{}},"options":["NotNull"]},{"name":{"value":"cUmquE","quote_style":null},"column_type":{"Boolean":null},"options":["Null"]},{"name":{"value":"toTAm","quote_style":null},"column_type":{"Float32":{}},"options":[{"DefaultValue":{"Float32":0.21569687}}]},{"name":{"value":"deBitIs","quote_style":null},"column_type":{"Float64":{}},"options":["NotNull"]},{"name":{"value":"QUi","quote_style":null},"column_type":{"Float32":{}},"options":["Null"]}],"if_not_exists":true,"partition":{"partition_columns":["IMpEdIT"],"partition_bounds":[{"Expr":{"lhs":{"Column":"IMpEdIT"},"op":"Lt","rhs":{"Value":{"Float64":5.992310449541053e307}}}},{"Expr":{"lhs":{"Expr":{"lhs":{"Column":"IMpEdIT"},"op":"GtEq","rhs":{"Value":{"Float64":5.992310449541053e307}}}},"op":"And","rhs":{"Expr":{"lhs":{"Column":"IMpEdIT"},"op":"Lt","rhs":{"Value":{"Float64":1.1984620899082105e308}}}}}},{"Expr":{"lhs":{"Column":"IMpEdIT"},"op":"GtEq","rhs":{"Value":{"Float64":1.1984620899082105e308}}}}]},"engine":"mito2","options":{},"primary_keys":[0,4]}"#;
assert_eq!(expected, serialized);
}

View File

@@ -21,10 +21,11 @@ pub(crate) mod select_expr;
use core::fmt;
use std::collections::HashMap;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::Duration;
pub use alter_expr::AlterTableExpr;
use common_time::timestamp::TimeUnit;
use common_time::{Date, DateTime, Timestamp};
pub use create_expr::{CreateDatabaseExpr, CreateTableExpr};
use datatypes::data_type::ConcreteDataType;
@@ -65,8 +66,6 @@ lazy_static! {
ConcreteDataType::float32_datatype(),
ConcreteDataType::float64_datatype(),
ConcreteDataType::string_datatype(),
ConcreteDataType::date_datatype(),
ConcreteDataType::datetime_datatype(),
];
pub static ref STRING_DATA_TYPES: Vec<ConcreteDataType> =
vec![ConcreteDataType::string_datatype()];
@@ -102,6 +101,43 @@ pub struct MySQLTsColumnTypeGenerator;
pub struct PartibleColumnTypeGenerator;
pub struct StringColumnTypeGenerator;
/// FIXME(weny): Waits for https://github.com/GreptimeTeam/greptimedb/issues/4247
macro_rules! generate_values {
($data_type:ty, $bounds:expr) => {{
let base = 0 as $data_type;
let step = <$data_type>::MAX / ($bounds as $data_type + 1 as $data_type) as $data_type;
(1..=$bounds)
.map(|i| Value::from(base + step * i as $data_type as $data_type))
.collect::<Vec<Value>>()
}};
}
/// Generates partition bounds.
pub fn generate_partition_bounds(datatype: &ConcreteDataType, bounds: usize) -> Vec<Value> {
match datatype {
ConcreteDataType::Int16(_) => generate_values!(i16, bounds),
ConcreteDataType::Int32(_) => generate_values!(i32, bounds),
ConcreteDataType::Int64(_) => generate_values!(i64, bounds),
ConcreteDataType::Float32(_) => generate_values!(f32, bounds),
ConcreteDataType::Float64(_) => generate_values!(f64, bounds),
ConcreteDataType::String(_) => {
let base = b'A';
let range = b'z' - b'A';
let step = range / (bounds as u8 + 1);
(1..=bounds)
.map(|i| {
Value::from(
char::from(base + step * i as u8)
.escape_default()
.to_string(),
)
})
.collect()
}
_ => unimplemented!("unsupported type: {datatype}"),
}
}
/// Generates a random [Value].
pub fn generate_random_value<R: Rng>(
rng: &mut R,
@@ -128,15 +164,19 @@ pub fn generate_random_value<R: Rng>(
/// Generate monotonically increasing timestamps for MySQL.
pub fn generate_unique_timestamp_for_mysql<R: Rng>(base: i64) -> TsValueGenerator<R> {
let base = Arc::new(AtomicI64::new(base));
let base = Timestamp::new_millisecond(base);
let clock = Arc::new(Mutex::new(base));
Box::new(move |_rng, ts_type| -> Value {
let value = base.fetch_add(1, Ordering::Relaxed);
let mut clock = clock.lock().unwrap();
let ts = clock.add_duration(Duration::from_secs(1)).unwrap();
*clock = ts;
let v = match ts_type {
TimestampType::Second(_) => Timestamp::new_second(1 + value),
TimestampType::Millisecond(_) => Timestamp::new_millisecond(1000 + value),
TimestampType::Microsecond(_) => Timestamp::new_microsecond(1_000_000 + value),
TimestampType::Nanosecond(_) => Timestamp::new_nanosecond(1_000_000_000 + value),
TimestampType::Second(_) => ts.convert_to(TimeUnit::Second).unwrap(),
TimestampType::Millisecond(_) => ts.convert_to(TimeUnit::Millisecond).unwrap(),
TimestampType::Microsecond(_) => ts.convert_to(TimeUnit::Microsecond).unwrap(),
TimestampType::Nanosecond(_) => ts.convert_to(TimeUnit::Nanosecond).unwrap(),
};
Value::from(v)
})
@@ -496,6 +536,31 @@ pub fn replace_default(
new_rows
}
/// Sorts a vector of rows based on the values in the specified primary key columns.
pub fn sort_by_primary_keys(rows: &mut [RowValues], primary_keys_idx: Vec<usize>) {
rows.sort_by(|a, b| {
let a_keys: Vec<_> = primary_keys_idx.iter().map(|&i| &a[i]).collect();
let b_keys: Vec<_> = primary_keys_idx.iter().map(|&i| &b[i]).collect();
for (a_key, b_key) in a_keys.iter().zip(b_keys.iter()) {
match a_key.cmp(b_key) {
Some(std::cmp::Ordering::Equal) => continue,
non_eq => return non_eq.unwrap(),
}
}
std::cmp::Ordering::Equal
});
}
/// Formats a slice of columns into a comma-separated string of column names.
pub fn format_columns(columns: &[Column]) -> String {
columns
.iter()
.map(|c| c.name.to_string())
.collect::<Vec<_>>()
.join(", ")
.to_string()
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -25,6 +25,45 @@ pub struct InsertIntoExpr {
pub values_list: Vec<RowValues>,
}
impl InsertIntoExpr {
/// Returns the timestamp column
pub fn timestamp_column(&self) -> Option<Column> {
self.columns.iter().find(|c| c.is_time_index()).cloned()
}
/// Returns index of the timestamp column
pub fn timestamp_column_idx(&self) -> Option<usize> {
self.columns
.iter()
.enumerate()
.find_map(|(idx, c)| if c.is_time_index() { Some(idx) } else { None })
}
/// Returns a vector of columns that are primary keys or time indices.
pub fn primary_key_columns(&self) -> Vec<Column> {
self.columns
.iter()
.filter(|c| c.is_primary_key() || c.is_time_index())
.cloned()
.collect::<Vec<_>>()
}
/// Returns the indices of columns that are primary keys or time indices.
pub fn primary_key_column_idx(&self) -> Vec<usize> {
self.columns
.iter()
.enumerate()
.filter_map(|(i, c)| {
if c.is_primary_key() || c.is_time_index() {
Some(i)
} else {
None
}
})
.collect::<Vec<_>>()
}
}
pub type RowValues = Vec<RowValue>;
#[derive(PartialEq, PartialOrd, Clone)]

View File

@@ -75,17 +75,12 @@ impl CreateTableExprTranslator {
fn format_partition(input: &CreateTableExpr) -> Option<String> {
input.partition.as_ref().map(|partition| {
format!(
"PARTITION BY RANGE COLUMNS({}) (\n{}\n)",
"PARTITION ON COLUMNS({}) (\n{}\n)",
partition.partition_columns().join(", "),
partition
.partition_bounds()
.iter()
.enumerate()
.map(|(i, bound)| format!(
"PARTITION r{} VALUES LESS THAN ({})",
i,
Self::format_partition_bound(bound)
))
.map(Self::format_partition_bound)
.collect::<Vec<_>>()
.join(",\n")
)
@@ -138,12 +133,12 @@ impl CreateTableExprTranslator {
fn format_table_options(input: &CreateTableExpr) -> String {
let mut output = vec![];
if !input.engine.is_empty() {
output.push(format!("ENGINE={}", input.engine));
}
if let Some(partition) = Self::format_partition(input) {
output.push(partition);
}
if !input.engine.is_empty() {
output.push(format!("ENGINE={}", input.engine));
}
output.join("\n")
}
@@ -187,7 +182,7 @@ impl CreateDatabaseExprTranslator {
#[cfg(test)]
mod tests {
use datatypes::value::Value;
use partition::expr::{Operand, PartitionExpr, RestrictedOp};
use partition::partition::{PartitionBound, PartitionDef};
use super::CreateTableExprTranslator;
@@ -206,9 +201,29 @@ mod tests {
.partition(PartitionDef::new(
vec!["idc".to_string()],
vec![
PartitionBound::Value(Value::String("a".into())),
PartitionBound::Value(Value::String("f".into())),
PartitionBound::MaxValue,
PartitionBound::Expr(PartitionExpr::new(
Operand::Column("idc".to_string()),
RestrictedOp::Lt,
Operand::Value(datatypes::value::Value::Int32(10)),
)),
PartitionBound::Expr(PartitionExpr::new(
Operand::Expr(PartitionExpr::new(
Operand::Column("idc".to_string()),
RestrictedOp::GtEq,
Operand::Value(datatypes::value::Value::Int32(10)),
)),
RestrictedOp::And,
Operand::Expr(PartitionExpr::new(
Operand::Column("idc".to_string()),
RestrictedOp::Lt,
Operand::Value(datatypes::value::Value::Int32(50)),
)),
)),
PartitionBound::Expr(PartitionExpr::new(
Operand::Column("idc".to_string()),
RestrictedOp::GtEq,
Operand::Value(datatypes::value::Value::Int32(50)),
)),
],
))
.build()
@@ -217,7 +232,6 @@ mod tests {
let output = CreateTableExprTranslator
.translate(&create_table_expr)
.unwrap();
assert_eq!(
"CREATE TABLE system_metrics(
host STRING,
@@ -228,12 +242,12 @@ disk_util DOUBLE,
ts TIMESTAMP(3) TIME INDEX,
PRIMARY KEY(host, idc)
)
ENGINE=mito
PARTITION BY RANGE COLUMNS(idc) (
PARTITION r0 VALUES LESS THAN ('a'),
PARTITION r1 VALUES LESS THAN ('f'),
PARTITION r2 VALUES LESS THAN (MAXVALUE)
);",
PARTITION ON COLUMNS(idc) (
idc < 10,
idc >= 10 AND idc < 50,
idc >= 50
)
ENGINE=mito;",
output
);
}

View File

@@ -16,8 +16,10 @@ pub mod cluster_info;
pub mod config;
pub mod crd;
pub mod health;
pub mod migration;
pub mod partition;
pub mod pod_failure;
pub mod procedure;
#[cfg(feature = "unstable")]
pub mod process;
pub mod wait;

View File

@@ -0,0 +1,92 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{BTreeMap, HashSet};
use std::time::Duration;
use common_telemetry::info;
use snafu::ResultExt;
use sqlx::{MySql, Pool, Row};
use store_api::storage::RegionId;
use crate::error::{self};
use crate::ir::Ident;
use crate::utils::partition::{fetch_partitions, region_distribution};
use crate::utils::wait::wait_condition_fn;
/// Migrates a region from one peer to another within a specified timeout.
///
/// Returns the procedure id.
pub async fn migrate_region(
e: &Pool<MySql>,
region_id: u64,
from_peer_id: u64,
to_peer_id: u64,
timeout_secs: u64,
) -> String {
let sql = format!(
"select migrate_region({region_id}, {from_peer_id}, {to_peer_id}, {timeout_secs}) as output;"
);
let result = sqlx::query(&sql)
.fetch_one(e)
.await
.context(error::ExecuteQuerySnafu { sql })
.unwrap();
result.try_get(0).unwrap()
}
/// Waits until the region distribution matches the expected distribution within a specified timeout.
pub async fn wait_for_region_distribution(
greptime: &Pool<MySql>,
timeout: Duration,
table_name: Ident,
expected_region_distribution: BTreeMap<u64, HashSet<RegionId>>,
) {
wait_condition_fn(
timeout,
|| {
let greptime = greptime.clone();
let table_name = table_name.clone();
Box::pin(async move {
let partitions = fetch_partitions(&greptime, table_name).await.unwrap();
region_distribution(partitions)
.into_iter()
.map(|(datanode, regions)| {
(datanode, regions.into_iter().collect::<HashSet<_>>())
})
.collect::<BTreeMap<_, _>>()
})
},
move |region_distribution| {
info!("Region Distribution: {:?}", region_distribution);
if expected_region_distribution.keys().len() != region_distribution.keys().len() {
return false;
}
for (datanode, expected_regions) in &expected_region_distribution {
match region_distribution.get(datanode) {
Some(regions) => {
if expected_regions != regions {
return false;
}
}
None => return false,
}
}
true
},
Duration::from_secs(5),
)
.await
}

View File

@@ -0,0 +1,55 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use common_telemetry::info;
use snafu::ResultExt;
use sqlx::{MySql, Pool, Row};
use super::wait::wait_condition_fn;
use crate::error;
/// Fetches the state of a procedure.
pub async fn procedure_state(e: &Pool<MySql>, procedure_id: String) -> String {
let sql = format!("select procedure_state(\"{procedure_id}\");");
let result = sqlx::query(&sql)
.fetch_one(e)
.await
.context(error::ExecuteQuerySnafu { sql })
.unwrap();
result.try_get(0).unwrap()
}
/// Waits for a procedure to finish within a specified timeout period.
pub async fn wait_for_procedure_finish(
greptime: &Pool<MySql>,
timeout: Duration,
procedure_id: String,
) {
wait_condition_fn(
timeout,
|| {
let greptime = greptime.clone();
let procedure_id = procedure_id.clone();
Box::pin(async move { procedure_state(&greptime, procedure_id).await })
},
|output| {
info!("Procedure({procedure_id}) state: {:?}", output);
output.to_lowercase().contains("done")
},
Duration::from_secs(5),
)
.await
}

View File

@@ -33,8 +33,8 @@ use tests_fuzz::generator::create_expr::CreateTableExprGeneratorBuilder;
use tests_fuzz::generator::insert_expr::InsertExprGeneratorBuilder;
use tests_fuzz::generator::Generator;
use tests_fuzz::ir::{
generate_random_timestamp_for_mysql, generate_random_value, replace_default, CreateTableExpr,
InsertIntoExpr, MySQLTsColumnTypeGenerator,
generate_random_timestamp_for_mysql, generate_random_value, replace_default,
sort_by_primary_keys, CreateTableExpr, InsertIntoExpr, MySQLTsColumnTypeGenerator,
};
use tests_fuzz::translator::mysql::create_expr::CreateTableExprTranslator;
use tests_fuzz::translator::mysql::insert_expr::InsertIntoExprTranslator;
@@ -189,23 +189,7 @@ async fn execute_insert(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
);
let fetched_rows = validator::row::fetch_values(&ctx.greptime, select_sql.as_str()).await?;
let mut expected_rows = replace_default(&insert_expr.values_list, &table_ctx, &insert_expr);
expected_rows.sort_by(|a, b| {
let a_keys: Vec<_> = primary_keys_idxs_in_insert_expr
.iter()
.map(|&i| &a[i])
.collect();
let b_keys: Vec<_> = primary_keys_idxs_in_insert_expr
.iter()
.map(|&i| &b[i])
.collect();
for (a_key, b_key) in a_keys.iter().zip(b_keys.iter()) {
match a_key.cmp(b_key) {
Some(std::cmp::Ordering::Equal) => continue,
non_eq => return non_eq.unwrap(),
}
}
std::cmp::Ordering::Equal
});
sort_by_primary_keys(&mut expected_rows, primary_keys_idxs_in_insert_expr);
validator::row::assert_eq::<MySql>(&insert_expr.columns, &fetched_rows, &expected_rows)?;
// Cleans up

View File

@@ -36,8 +36,8 @@ use tests_fuzz::generator::create_expr::{
use tests_fuzz::generator::insert_expr::InsertExprGeneratorBuilder;
use tests_fuzz::generator::Generator;
use tests_fuzz::ir::{
generate_random_timestamp_for_mysql, generate_random_value, replace_default, CreateTableExpr,
InsertIntoExpr,
generate_random_timestamp_for_mysql, generate_random_value, replace_default,
sort_by_primary_keys, CreateTableExpr, InsertIntoExpr,
};
use tests_fuzz::translator::mysql::create_expr::CreateTableExprTranslator;
use tests_fuzz::translator::mysql::insert_expr::InsertIntoExprTranslator;
@@ -187,23 +187,7 @@ async fn validate_values(
let fetched_rows = validator::row::fetch_values(&ctx.greptime, select_sql.as_str()).await?;
let mut expected_rows =
replace_default(&insert_expr.values_list, &logical_table_ctx, insert_expr);
expected_rows.sort_by(|a, b| {
let a_keys: Vec<_> = primary_keys_idxs_in_insert_expr
.iter()
.map(|&i| &a[i])
.collect();
let b_keys: Vec<_> = primary_keys_idxs_in_insert_expr
.iter()
.map(|&i| &b[i])
.collect();
for (a_key, b_key) in a_keys.iter().zip(b_keys.iter()) {
match a_key.cmp(b_key) {
Some(std::cmp::Ordering::Equal) => continue,
non_eq => return non_eq.unwrap(),
}
}
std::cmp::Ordering::Equal
});
sort_by_primary_keys(&mut expected_rows, primary_keys_idxs_in_insert_expr);
validator::row::assert_eq::<MySql>(&insert_expr.columns, &fetched_rows, &expected_rows)?;
Ok(())

View File

@@ -0,0 +1,305 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![no_main]
use std::collections::{BTreeMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
use arbitrary::{Arbitrary, Unstructured};
use common_telemetry::info;
use libfuzzer_sys::fuzz_target;
use rand::{Rng, SeedableRng};
use rand_chacha::ChaChaRng;
use snafu::{ensure, ResultExt};
use sqlx::{Executor, MySql, Pool};
use store_api::storage::RegionId;
use tests_fuzz::context::{TableContext, TableContextRef};
use tests_fuzz::error::{self, Result};
use tests_fuzz::fake::{
merge_two_word_map_fn, random_capitalize_map, uppercase_and_keyword_backtick_map,
MappedGenerator, WordGenerator,
};
use tests_fuzz::generator::create_expr::CreateTableExprGeneratorBuilder;
use tests_fuzz::generator::insert_expr::InsertExprGeneratorBuilder;
use tests_fuzz::generator::Generator;
use tests_fuzz::ir::{
format_columns, generate_random_value, generate_unique_timestamp_for_mysql, replace_default,
sort_by_primary_keys, CreateTableExpr, InsertIntoExpr, MySQLTsColumnTypeGenerator,
};
use tests_fuzz::translator::mysql::create_expr::CreateTableExprTranslator;
use tests_fuzz::translator::mysql::insert_expr::InsertIntoExprTranslator;
use tests_fuzz::translator::DslTranslator;
use tests_fuzz::utils::cluster_info::{fetch_nodes, PEER_TYPE_DATANODE};
use tests_fuzz::utils::migration::{migrate_region, wait_for_region_distribution};
use tests_fuzz::utils::partition::{fetch_partitions, region_distribution};
use tests_fuzz::utils::procedure::wait_for_procedure_finish;
use tests_fuzz::utils::{
compact_table, flush_memtable, init_greptime_connections_via_env, Connections,
};
use tests_fuzz::validator;
struct FuzzContext {
greptime: Pool<MySql>,
}
impl FuzzContext {
async fn close(self) {
self.greptime.close().await;
}
}
#[derive(Copy, Clone, Debug)]
struct FuzzInput {
seed: u64,
columns: usize,
partitions: usize,
rows: usize,
inserts: usize,
}
impl Arbitrary<'_> for FuzzInput {
fn arbitrary(u: &mut Unstructured<'_>) -> arbitrary::Result<Self> {
let seed = u.int_in_range(u64::MIN..=u64::MAX)?;
let mut rng = ChaChaRng::seed_from_u64(seed);
let partitions = rng.gen_range(3..32);
let columns = rng.gen_range(2..30);
let rows = rng.gen_range(128..1024);
let inserts = rng.gen_range(2..8);
Ok(FuzzInput {
partitions,
columns,
rows,
seed,
inserts,
})
}
}
fn generate_create_expr<R: Rng + 'static>(
input: FuzzInput,
rng: &mut R,
) -> Result<CreateTableExpr> {
let create_table_generator = CreateTableExprGeneratorBuilder::default()
.name_generator(Box::new(MappedGenerator::new(
WordGenerator,
merge_two_word_map_fn(random_capitalize_map, uppercase_and_keyword_backtick_map),
)))
.columns(input.columns)
.partition(input.partitions)
.engine("mito")
.ts_column_type_generator(Box::new(MySQLTsColumnTypeGenerator))
.build()
.unwrap();
create_table_generator.generate(rng)
}
fn generate_insert_exprs<R: Rng + 'static>(
input: FuzzInput,
rng: &mut R,
table_ctx: TableContextRef,
) -> Result<Vec<InsertIntoExpr>> {
let omit_column_list = rng.gen_bool(0.2);
let insert_generator = InsertExprGeneratorBuilder::default()
.table_ctx(table_ctx.clone())
.omit_column_list(omit_column_list)
.rows(input.rows)
.ts_value_generator(generate_unique_timestamp_for_mysql(0))
.value_generator(Box::new(generate_random_value))
.build()
.unwrap();
(0..input.inserts)
.map(|_| insert_generator.generate(rng))
.collect::<Result<Vec<_>>>()
}
struct Migration {
from_peer: u64,
to_peer: u64,
region_id: RegionId,
}
async fn execute_region_migration(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
info!("input: {:?}", input);
let mut rng = ChaChaRng::seed_from_u64(input.seed);
let create_expr = generate_create_expr(input, &mut rng)?;
let translator = CreateTableExprTranslator;
let sql = translator.translate(&create_expr)?;
let _result = sqlx::query(&sql)
.execute(&ctx.greptime)
.await
.context(error::ExecuteQuerySnafu { sql: &sql })?;
let table_ctx = Arc::new(TableContext::from(&create_expr));
// Inserts data into the table
let insert_exprs = generate_insert_exprs(input, &mut rng, table_ctx.clone())?;
for insert_expr in &insert_exprs {
let translator = InsertIntoExprTranslator;
let sql = translator.translate(insert_expr)?;
let result = ctx
.greptime
// unprepared query, see <https://github.com/GreptimeTeam/greptimedb/issues/3500>
.execute(sql.as_str())
.await
.context(error::ExecuteQuerySnafu { sql: &sql })?;
ensure!(
result.rows_affected() == input.rows as u64,
error::AssertSnafu {
reason: format!(
"expected rows affected: {}, actual: {}",
input.rows,
result.rows_affected(),
)
}
);
if rng.gen_bool(0.2) {
flush_memtable(&ctx.greptime, &create_expr.table_name).await?;
}
if rng.gen_bool(0.1) {
compact_table(&ctx.greptime, &create_expr.table_name).await?;
}
}
// Fetches region distribution
let partitions = fetch_partitions(&ctx.greptime, table_ctx.name.clone()).await?;
let num_partitions = partitions.len();
let region_distribution = region_distribution(partitions);
info!("Region distribution: {region_distribution:?}");
let datanodes = fetch_nodes(&ctx.greptime)
.await?
.into_iter()
.flat_map(|node| {
if node.peer_type == PEER_TYPE_DATANODE {
Some(node)
} else {
None
}
})
.collect::<Vec<_>>();
info!("List datanodes: {:?}", datanodes);
// Generates region migration task.
let mut migrations = Vec::with_capacity(num_partitions);
let mut new_distribution: BTreeMap<u64, HashSet<_>> = BTreeMap::new();
for (datanode_id, regions) in region_distribution {
let step = rng.gen_range(1..datanodes.len());
for region in regions {
let to_peer = (datanode_id + step as u64) % datanodes.len() as u64;
new_distribution.entry(to_peer).or_default().insert(region);
migrations.push(Migration {
from_peer: datanode_id,
to_peer,
region_id: region,
})
}
}
let mut procedure_ids = Vec::with_capacity(migrations.len());
// Triggers region migrations
for Migration {
from_peer,
to_peer,
region_id,
} in &migrations
{
let procedure_id =
migrate_region(&ctx.greptime, region_id.as_u64(), *from_peer, *to_peer, 120).await;
info!("Migrating region: {region_id} from {from_peer} to {to_peer}, procedure: {procedure_id}");
procedure_ids.push(procedure_id);
}
info!("Excepted new region distribution: {new_distribution:?}");
// Waits for all region migrated
wait_for_region_distribution(
&ctx.greptime,
Duration::from_secs(120),
table_ctx.name.clone(),
new_distribution,
)
.await;
for procedure_id in procedure_ids {
wait_for_procedure_finish(&ctx.greptime, Duration::from_secs(120), procedure_id).await;
}
// Values validation
info!("Validating rows");
let ts_column = table_ctx.timestamp_column().unwrap();
for (idx, insert_expr) in insert_exprs[0..insert_exprs.len() - 1].iter().enumerate() {
let ts_column_idx = insert_expr.timestamp_column_idx().unwrap();
let ts_value = insert_expr.values_list[0][ts_column_idx].clone();
let next_batch_ts_column_idx = insert_exprs[idx + 1].timestamp_column_idx().unwrap();
let next_batch_ts = insert_exprs[idx + 1].values_list[0][next_batch_ts_column_idx].clone();
let primary_keys_idx = insert_expr.primary_key_column_idx();
let column_list = format_columns(&insert_expr.columns);
let primary_keys_column_list = format_columns(&insert_expr.primary_key_columns());
let select_sql = format!(
"SELECT {} FROM {} WHERE {} >= {} AND {} < {} ORDER BY {};",
column_list,
create_expr.table_name,
ts_column.name,
ts_value,
ts_column.name,
next_batch_ts,
primary_keys_column_list
);
info!("Executing sql: {select_sql}");
let fetched_rows = ctx.greptime.fetch_all(select_sql.as_str()).await.unwrap();
let mut expected_rows = replace_default(&insert_expr.values_list, &table_ctx, insert_expr);
sort_by_primary_keys(&mut expected_rows, primary_keys_idx);
validator::row::assert_eq::<MySql>(&insert_expr.columns, &fetched_rows, &expected_rows)?;
}
let insert_expr = insert_exprs.last().unwrap();
let ts_column_idx = insert_expr.timestamp_column_idx().unwrap();
let ts_value = insert_expr.values_list[0][ts_column_idx].clone();
let primary_keys_idx = insert_expr.primary_key_column_idx();
let column_list = format_columns(&insert_expr.columns);
let primary_keys_column_list = format_columns(&insert_expr.primary_key_columns());
let select_sql = format!(
"SELECT {} FROM {} WHERE {} >= {} ORDER BY {};",
column_list, create_expr.table_name, ts_column.name, ts_value, primary_keys_column_list
);
info!("Executing sql: {select_sql}");
let fetched_rows = ctx.greptime.fetch_all(select_sql.as_str()).await.unwrap();
let mut expected_rows = replace_default(&insert_expr.values_list, &table_ctx, insert_expr);
sort_by_primary_keys(&mut expected_rows, primary_keys_idx);
validator::row::assert_eq::<MySql>(&insert_expr.columns, &fetched_rows, &expected_rows)?;
// Cleans up
let sql = format!("DROP TABLE {}", table_ctx.name);
let result = sqlx::query(&sql)
.execute(&ctx.greptime)
.await
.context(error::ExecuteQuerySnafu { sql })?;
info!("Drop table: {}\n\nResult: {result:?}\n\n", table_ctx.name);
ctx.close().await;
Ok(())
}
fuzz_target!(|input: FuzzInput| {
common_telemetry::init_default_ut_logging();
common_runtime::block_on_write(async {
let Connections { mysql } = init_greptime_connections_via_env().await;
let ctx = FuzzContext {
greptime: mysql.expect("mysql connection init must be succeed"),
};
execute_region_migration(ctx, input)
.await
.unwrap_or_else(|err| panic!("fuzz test must be succeed: {err:?}"));
})
});