test(fuzz): repartition validation and add dedicated CI GC profile (#7703)

* test(fuzz): add concurrent write loop and partition-bound value generation for repartition validation

Signed-off-by: WenyXu <wenymedia@gmail.com>

* ci: run repartition fuzz target with dedicated local-wal GC config

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: fix typos

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: fix typo

Signed-off-by: WenyXu <wenymedia@gmail.com>

* count distinct timestamp value

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2026-03-03 21:30:34 +08:00
committed by GitHub
parent f17a334058
commit b1b76fde0b
9 changed files with 369 additions and 14 deletions

View File

@@ -0,0 +1,39 @@
meta:
configData: |-
[runtime]
global_rt_size = 4
[datanode]
[datanode.client]
timeout = "120s"
[gc]
enable = true
datanode:
configData: |-
[runtime]
global_rt_size = 4
compact_rt_size = 2
[[region_engine]]
[region_engine.mito]
[region_engine.mito.gc]
enable = true
lingering_time = "0s"
unknown_file_lingering_time = "0s"
frontend:
configData: |-
[runtime]
global_rt_size = 4
[meta_client]
ddl_timeout = "120s"
objectStorage:
s3:
bucket: default
region: us-west-2
root: test-root
endpoint: http://minio.minio.svc.cluster.local
credentials:
accessKeyId: rootuser
secretAccessKey: rootpass123

View File

@@ -316,6 +316,13 @@ jobs:
minio: true
kafka: true
values: "with-remote-wal.yaml"
include:
- target: "fuzz_repartition_table"
mode:
name: "Local WAL Repartition GC"
minio: true
kafka: false
values: "with-minio-repartition-gc.yaml"
steps:
- name: Remove unused software
run: |

View File

@@ -27,7 +27,7 @@ use rand::Rng;
use crate::error::Error;
use crate::ir::create_expr::ColumnOption;
use crate::ir::{AlterTableExpr, CreateTableExpr, Ident};
use crate::ir::{AlterTableExpr, Column, CreateTableExpr, Ident, RowValue};
pub type CreateTableExprGenerator<R> =
Box<dyn Generator<CreateTableExpr, R, Error = Error> + Sync + Send>;
@@ -44,6 +44,8 @@ pub type ValueGenerator<R> =
pub type TsValueGenerator<R> = Box<dyn Fn(&mut R, TimestampType) -> Value>;
pub type ValueOverride<R> = Box<dyn Fn(&Column, &mut R) -> Option<RowValue>>;
pub trait Generator<T, R: Rng> {
type Error: Sync + Send + fmt::Debug;

View File

@@ -23,7 +23,7 @@ use super::TsValueGenerator;
use crate::context::TableContextRef;
use crate::error::{Error, Result};
use crate::fake::WordGenerator;
use crate::generator::{Generator, Random, ValueGenerator};
use crate::generator::{Generator, Random, ValueGenerator, ValueOverride};
use crate::ir::insert_expr::{InsertIntoExpr, RowValue};
use crate::ir::{Ident, generate_random_timestamp, generate_random_value};
@@ -34,6 +34,10 @@ pub struct InsertExprGenerator<R: Rng + 'static> {
table_ctx: TableContextRef,
// Whether to omit all columns, i.e. INSERT INTO table_name VALUES (...)
omit_column_list: bool,
#[builder(default)]
required_columns: Vec<Ident>,
#[builder(default)]
value_overrides: Option<ValueOverride<R>>,
#[builder(default = "1")]
rows: usize,
#[builder(default = "Box::new(WordGenerator)")]
@@ -57,10 +61,11 @@ impl<R: Rng + 'static> Generator<InsertIntoExpr, R> for InsertExprGenerator<R> {
values_columns.clone_from(&self.table_ctx.columns);
} else {
for column in &self.table_ctx.columns {
let is_required = self.required_columns.contains(&column.name);
let can_omit = column.is_nullable() || column.has_default_value();
// 50% chance to omit a column if it's not required
if !can_omit || rng.random_bool(0.5) {
if is_required || !can_omit || rng.random_bool(0.5) {
values_columns.push(column.clone());
}
}
@@ -76,6 +81,12 @@ impl<R: Rng + 'static> Generator<InsertIntoExpr, R> for InsertExprGenerator<R> {
for _ in 0..self.rows {
let mut row = Vec::with_capacity(values_columns.len());
for column in &values_columns {
if let Some(override_fn) = self.value_overrides.as_ref()
&& let Some(value) = override_fn(column, rng)
{
row.push(value);
continue;
}
if column.is_nullable() && rng.random_bool(0.2) {
row.push(RowValue::Value(Value::Null));
continue;

View File

@@ -36,17 +36,19 @@ use datatypes::value::Value;
use derive_builder::Builder;
pub use insert_expr::InsertIntoExpr;
use lazy_static::lazy_static;
pub use partition_expr::SimplePartitions;
use rand::Rng;
use rand::seq::{IndexedRandom, SliceRandom};
pub use repartition_expr::RepartitionExpr;
use serde::{Deserialize, Serialize};
use self::insert_expr::{RowValue, RowValues};
use self::insert_expr::RowValues;
use crate::context::TableContextRef;
use crate::fake::WordGenerator;
use crate::generator::{Random, TsValueGenerator};
use crate::impl_random;
use crate::ir::create_expr::ColumnOption;
pub use crate::ir::insert_expr::RowValue;
lazy_static! {
pub static ref DATA_TYPES: Vec<ConcreteDataType> = vec![
@@ -168,8 +170,13 @@ pub fn generate_random_value<R: Rng>(
/// Generate monotonically increasing timestamps for MySQL.
pub fn generate_unique_timestamp_for_mysql<R: Rng>(base: i64) -> TsValueGenerator<R> {
let base = Timestamp::new_millisecond(base);
let clock = Arc::new(Mutex::new(base));
generate_unique_timestamp_for_mysql_with_clock(Arc::new(Mutex::new(base)))
}
/// Generates a unique timestamp for MySQL.
pub fn generate_unique_timestamp_for_mysql_with_clock<R: Rng>(
clock: Arc<Mutex<Timestamp>>,
) -> TsValueGenerator<R> {
Box::new(move |_rng, ts_type| -> Value {
let mut clock = clock.lock().unwrap();
let ts = clock.add_duration(Duration::from_secs(1)).unwrap();
@@ -255,6 +262,105 @@ fn generate_random_date<R: Rng>(rng: &mut R) -> Value {
Value::from(Date::from(date))
}
/// Generates a partition value for the given column type and bounds.
pub fn generate_partition_value<R: Rng + 'static>(
rng: &mut R,
column_type: &ConcreteDataType,
bounds: &[Value],
bound_idx: usize,
) -> Value {
if bounds.is_empty() {
return generate_random_value(rng, column_type, None);
}
let first = bounds.first().unwrap();
let last = bounds.last().unwrap();
match column_type {
datatypes::data_type::ConcreteDataType::Int16(_) => {
let first_value = match first {
datatypes::value::Value::Int16(v) => *v,
_ => 0,
};
if bound_idx == 0 {
datatypes::value::Value::from(first_value.saturating_sub(1))
} else if bound_idx < bounds.len() {
bounds[bound_idx - 1].clone()
} else {
last.clone()
}
}
datatypes::data_type::ConcreteDataType::Int32(_) => {
let first_value = match first {
datatypes::value::Value::Int32(v) => *v,
_ => 0,
};
if bound_idx == 0 {
datatypes::value::Value::from(first_value.saturating_sub(1))
} else if bound_idx < bounds.len() {
bounds[bound_idx - 1].clone()
} else {
last.clone()
}
}
datatypes::data_type::ConcreteDataType::Int64(_) => {
let first_value = match first {
datatypes::value::Value::Int64(v) => *v,
_ => 0,
};
if bound_idx == 0 {
datatypes::value::Value::from(first_value.saturating_sub(1))
} else if bound_idx < bounds.len() {
bounds[bound_idx - 1].clone()
} else {
last.clone()
}
}
datatypes::data_type::ConcreteDataType::Float32(_) => {
let first_value = match first {
datatypes::value::Value::Float32(v) => v.0,
_ => 0.0,
};
if bound_idx == 0 {
datatypes::value::Value::from(first_value - 1.0)
} else if bound_idx < bounds.len() {
bounds[bound_idx - 1].clone()
} else {
last.clone()
}
}
datatypes::data_type::ConcreteDataType::Float64(_) => {
let first_value = match first {
datatypes::value::Value::Float64(v) => v.0,
_ => 0.0,
};
if bound_idx == 0 {
datatypes::value::Value::from(first_value - 1.0)
} else if bound_idx < bounds.len() {
bounds[bound_idx - 1].clone()
} else {
last.clone()
}
}
datatypes::data_type::ConcreteDataType::String(_) => {
let upper = match first {
datatypes::value::Value::String(v) => v.as_utf8(),
_ => "",
};
if bound_idx == 0 {
if upper <= "A" {
datatypes::value::Value::from("")
} else {
datatypes::value::Value::from("A")
}
} else if bound_idx < bounds.len() {
bounds[bound_idx - 1].clone()
} else {
last.clone()
}
}
_ => unimplemented!("unsupported partition column type: {column_type}"),
}
}
/// An identifier.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, PartialOrd, Eq, Ord, Hash)]
pub struct Ident {

View File

@@ -73,6 +73,7 @@ pub enum RowValue {
}
impl RowValue {
#[allow(clippy::should_implement_trait)]
pub fn cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
match (self, other) {
(RowValue::Value(Value::Null), RowValue::Value(v2)) => v2.partial_cmp(&Value::Null),

View File

@@ -18,6 +18,7 @@ use partition::expr::{Operand, PartitionExpr, RestrictedOp};
use rand::Rng;
use snafu::ensure;
use crate::context::TableContext;
use crate::error::{self, Result};
use crate::ir::{Ident, generate_random_value};
@@ -36,6 +37,7 @@ use crate::ir::{Ident, generate_random_value};
/// # Fields
/// - `column_name`: The name of the column used for partitioning.
/// - `bounds`: The partition boundary values; must be sorted for correct partitioning logic.
#[derive(Clone)]
pub struct SimplePartitions {
/// The column to partition by.
pub column_name: Ident,
@@ -136,6 +138,15 @@ impl SimplePartitions {
Ok(Self::new(column_name, bounds))
}
/// Reconstructs a `SimplePartitions` instance from a `TableContext`.
pub fn from_table_ctx(table_ctx: &TableContext) -> Result<Self> {
let partition_def = table_ctx
.partition
.as_ref()
.expect("expected partition def");
Self::from_exprs(partition_def.columns[0].clone(), &partition_def.exprs)
}
/// Inserts a new bound into the partition bounds and returns the index of the new bound.
pub fn insert_bound(&mut self, bound: Value) -> Result<usize> {
ensure!(

View File

@@ -52,6 +52,10 @@ pub async fn fetch_partitions_info_schema(
})
}
fn normalize(s: &str) -> String {
s.replace("\\\"", "\"").replace("\\\\", "\\")
}
/// Asserts the partitions are equal to the expected partitions.
pub fn assert_partitions(expected: &PartitionDef, actual: &[PartitionInfo]) -> Result<()> {
ensure!(
@@ -69,17 +73,17 @@ pub fn assert_partitions(expected: &PartitionDef, actual: &[PartitionInfo]) -> R
for expr in expected_exprs {
let actual_expr = actual
.iter()
.find(|info| info.partition_description == expr);
.find(|info| normalize(&info.partition_description) == normalize(&expr));
ensure!(
actual_expr.is_some(),
error::AssertSnafu {
reason: format!(
"Expected partition expression: {expr} not found, actual: {:?}",
"Expected partition expression: '{expr:?}' not found, actual: {:?}",
actual
.iter()
.map(|info| info.partition_description.clone())
.map(|info| format!("'{:?}'", info.partition_description.clone()))
.collect::<Vec<_>>()
.join(", ")
.join("; ")
),
}
);

View File

@@ -14,15 +14,18 @@
#![no_main]
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use arbitrary::{Arbitrary, Unstructured};
use common_telemetry::info;
use common_telemetry::{info, warn};
use common_time::Timestamp;
use libfuzzer_sys::fuzz_target;
use rand::{Rng, SeedableRng};
use rand_chacha::ChaChaRng;
use snafu::ResultExt;
use sqlx::{MySql, Pool};
use sqlx::{Executor, MySql, Pool};
use tests_fuzz::context::{TableContext, TableContextRef};
use tests_fuzz::error::{self, Result};
use tests_fuzz::fake::{
@@ -31,19 +34,26 @@ use tests_fuzz::fake::{
};
use tests_fuzz::generator::Generator;
use tests_fuzz::generator::create_expr::CreateTableExprGeneratorBuilder;
use tests_fuzz::generator::insert_expr::InsertExprGeneratorBuilder;
use tests_fuzz::generator::repartition_expr::{
MergePartitionExprGeneratorBuilder, SplitPartitionExprGeneratorBuilder,
};
use tests_fuzz::ir::{CreateTableExpr, MySQLTsColumnTypeGenerator, RepartitionExpr};
use tests_fuzz::ir::{
CreateTableExpr, InsertIntoExpr, MySQLTsColumnTypeGenerator, RepartitionExpr, RowValue,
SimplePartitions, generate_partition_value, generate_unique_timestamp_for_mysql_with_clock,
};
use tests_fuzz::translator::DslTranslator;
use tests_fuzz::translator::mysql::create_expr::CreateTableExprTranslator;
use tests_fuzz::translator::mysql::insert_expr::InsertIntoExprTranslator;
use tests_fuzz::translator::mysql::repartition_expr::RepartitionExprTranslator;
use tests_fuzz::utils::{
Connections, get_fuzz_override, get_gt_fuzz_input_max_alter_actions,
init_greptime_connections_via_env,
};
use tests_fuzz::validator;
use tests_fuzz::validator::row::count_values;
#[derive(Clone)]
struct FuzzContext {
greptime: Pool<MySql>,
}
@@ -96,6 +106,114 @@ fn generate_create_expr<R: Rng + 'static>(
create_table_generator.generate(rng)
}
struct SharedState {
table_ctx: TableContextRef,
clock: Arc<Mutex<Timestamp>>,
inserted_rows: u64,
running: bool,
}
fn build_insert_expr<R: Rng + 'static>(
table_ctx: &TableContextRef,
rng: &mut R,
partitions: &SimplePartitions,
clock: &Arc<Mutex<Timestamp>>,
) -> InsertIntoExpr {
let ts_value_generator = generate_unique_timestamp_for_mysql_with_clock(clock.clone());
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = counter.clone();
let partition_len = table_ctx.partition.as_ref().unwrap().exprs.len();
let row = rng.random_range(partition_len..partition_len * 2);
let moved_partitions = partitions.clone();
let insert_generator = InsertExprGeneratorBuilder::default()
.table_ctx(table_ctx.clone())
.omit_column_list(false)
.rows(row)
.required_columns(vec![partitions.column_name.clone()])
.value_overrides(Some(Box::new(move |column, rng| {
if column.name.value == moved_partitions.column_name.value {
let bound_idx = counter_clone.fetch_add(1, Ordering::Relaxed) % partition_len;
return Some(RowValue::Value(generate_partition_value(
rng,
&column.column_type,
&moved_partitions.bounds,
bound_idx,
)));
}
None
})))
.ts_value_generator(ts_value_generator)
.build()
.unwrap();
insert_generator.generate(rng).unwrap()
}
async fn execute_insert_with_retry(ctx: &FuzzContext, sql: &str) -> Result<()> {
let mut delay = Duration::from_millis(100);
let mut attempt = 0;
let max_attempts = 10;
loop {
match ctx
.greptime
// unprepared query, see <https://github.com/GreptimeTeam/greptimedb/issues/3500>
.execute(sql)
.await
{
Ok(_) => {
return Ok(());
}
Err(err) => {
tokio::time::sleep(delay).await;
delay = std::cmp::min(delay * 2, Duration::from_secs(1));
attempt += 1;
warn!("Execute insert with retry: {sql}, attempt: {attempt}, error: {err:?}");
if attempt >= max_attempts {
return Err(err).context(error::ExecuteQuerySnafu { sql });
}
}
}
}
}
async fn write_loop<R: Rng + 'static>(
mut rng: R,
ctx: FuzzContext,
shared_state: Arc<Mutex<SharedState>>,
) -> Result<()> {
info!("Start write loop");
let clock = shared_state.lock().unwrap().clock.clone();
loop {
let (is_running, table_ctx) = {
let state = shared_state.lock().unwrap();
(state.running, state.table_ctx.clone())
};
if !is_running {
break;
}
let partitions = SimplePartitions::from_table_ctx(&table_ctx).unwrap();
let insert_expr = build_insert_expr(&table_ctx, &mut rng, &partitions, &clock);
let new_inserted_rows = insert_expr.values_list.len() as u64;
let translator = InsertIntoExprTranslator;
let sql = translator.translate(&insert_expr)?;
let now = Instant::now();
execute_insert_with_retry(&ctx, &sql).await?;
info!("Execute insert sql: {sql}, elapsed: {:?}", now.elapsed());
{
let mut state = shared_state.lock().unwrap();
state.inserted_rows += new_inserted_rows;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
info!("Write loop ended");
Ok(())
}
async fn execute_repartition_table(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
info!("input: {input:?}");
let mut rng = ChaChaRng::seed_from_u64(input.seed);
@@ -111,6 +229,15 @@ async fn execute_repartition_table(ctx: FuzzContext, input: FuzzInput) -> Result
// Repartition table
let mut table_ctx = Arc::new(TableContext::from(&expr));
let shared_state = Arc::new(Mutex::new(SharedState {
table_ctx: table_ctx.clone(),
clock: Arc::new(Mutex::new(Timestamp::current_millis())),
running: true,
inserted_rows: 0,
}));
let writer_rng = ChaChaRng::seed_from_u64(input.seed);
let writer_task = tokio::spawn(write_loop(writer_rng, ctx.clone(), shared_state.clone()));
for i in 0..input.actions {
let partition_num = table_ctx.partition.as_ref().unwrap().exprs.len();
info!(
@@ -129,6 +256,8 @@ async fn execute_repartition_table(ctx: FuzzContext, input: FuzzInput) -> Result
.context(error::ExecuteQuerySnafu { sql: &sql })?;
info!("result: {result:?}");
table_ctx = Arc::new(Arc::unwrap_or_clone(table_ctx).repartition(expr).unwrap());
// Updates the table partition in the shared state.
shared_state.lock().unwrap().table_ctx = table_ctx.clone();
// Validates partition expression
let partition_entries = validator::partition::fetch_partitions_info_schema(
@@ -137,13 +266,58 @@ async fn execute_repartition_table(ctx: FuzzContext, input: FuzzInput) -> Result
&table_ctx.name,
)
.await?;
info!(
"partition_entries:\n{}",
partition_entries
.iter()
.map(|entry| {
format!(
r#" - table_catalog: {}
table_schema: {}
table_name: {}
partition_name: {}
partition_expression: {}
partition_description: {}
greptime_partition_id: {}
partition_ordinal_position: {}"#,
entry.table_catalog,
entry.table_schema,
entry.table_name,
entry.partition_name,
entry.partition_expression,
entry.partition_description,
entry.greptime_partition_id,
entry.partition_ordinal_position,
)
})
.collect::<Vec<_>>()
.join("\n")
);
validator::partition::assert_partitions(
table_ctx.partition.as_ref().unwrap(),
&partition_entries,
)?;
// TODO(weny): inserts data and validates the data
}
shared_state.lock().unwrap().running = false;
writer_task.await.unwrap().unwrap();
let count_sql = format!("SELECT COUNT(1) AS count FROM {}", table_ctx.name);
let count = count_values(&ctx.greptime, &count_sql).await?;
assert_eq!(
count.count as usize,
shared_state.lock().unwrap().inserted_rows as usize
);
let timestamp_column_name = table_ctx.timestamp_column().unwrap().name.clone();
// Since each timestamp value is unique, the count of distinct timestamps should match the total row count.
let distinct_count_sql = format!(
"SELECT COUNT(DISTINCT {}) AS count FROM {}",
timestamp_column_name, table_ctx.name
);
let distinct_count = count_values(&ctx.greptime, &distinct_count_sql).await?;
assert_eq!(
distinct_count.count as usize,
shared_state.lock().unwrap().inserted_rows as usize
);
// Cleans up
let table_name = table_ctx.name.clone();