test: add repartition fuzz target with partition IR and translators (#7666)

* test: add repartition fuzz target with partition IR and translators

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: fix unit tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2026-02-05 14:04:04 +08:00
committed by GitHub
parent 8883022742
commit 7f47ce0e44
16 changed files with 1092 additions and 35 deletions

View File

@@ -93,6 +93,13 @@ test = false
bench = false
doc = false
[[bin]]
name = "fuzz_repartition_table"
path = "targets/ddl/fuzz_repartition_table.rs"
test = false
bench = false
doc = false
[[bin]]
name = "fuzz_alter_table"
path = "targets/ddl/fuzz_alter_table.rs"

View File

@@ -23,6 +23,8 @@ use crate::error::{self, Result};
use crate::generator::Random;
use crate::ir::alter_expr::{AlterTableOperation, AlterTableOption};
use crate::ir::create_expr::{ColumnOption, PartitionDef};
use crate::ir::partition_expr::SimplePartitions;
use crate::ir::repartition_expr::RepartitionExpr;
use crate::ir::{AlterTableExpr, Column, CreateTableExpr, Ident};
pub type TableContextRef = Arc<TableContext>;
@@ -170,6 +172,39 @@ impl TableContext {
}
}
pub fn repartition(mut self, expr: RepartitionExpr) -> Result<TableContext> {
match expr {
RepartitionExpr::Split(split) => {
let partition_def = self.partition.as_mut().expect("expected partition def");
let insert_pos = partition_def
.exprs
.iter()
.position(|expr| expr == &split.target)
.unwrap();
partition_def.exprs[insert_pos] = split.into[0].clone();
partition_def
.exprs
.insert(insert_pos + 1, split.into[1].clone());
}
RepartitionExpr::Merge(merge) => {
let partition_def = self.partition.as_mut().expect("expected partition def");
let removed_idx = partition_def
.exprs
.iter()
.position(|expr| expr == &merge.targets[0])
.unwrap();
let mut partitions = SimplePartitions::from_exprs(
partition_def.columns[0].clone(),
&partition_def.exprs,
)?;
partitions.remove_bound(removed_idx)?;
partition_def.exprs = partitions.generate()?;
}
}
Ok(self)
}
pub fn generate_unique_column_name<R: Rng>(
&self,
rng: &mut R,
@@ -200,10 +235,16 @@ mod tests {
use common_query::AddColumnLocation;
use common_time::Duration;
use datatypes::data_type::ConcreteDataType;
use datatypes::value::Value;
use rand::SeedableRng;
use super::TableContext;
use crate::generator::Generator;
use crate::generator::create_expr::CreateTableExprGeneratorBuilder;
use crate::ir::alter_expr::{AlterTableOperation, AlterTableOption, Ttl};
use crate::ir::create_expr::ColumnOption;
use crate::ir::partition_expr::SimplePartitions;
use crate::ir::repartition_expr::{MergePartitionExpr, RepartitionExpr, SplitPartitionExpr};
use crate::ir::{AlterTableExpr, Column, Ident};
#[test]
@@ -296,4 +337,89 @@ mod tests {
let table_ctx = table_ctx.alter(expr).unwrap();
assert_eq!(table_ctx.table_options.len(), 0);
}
#[test]
fn test_apply_split_partition_expr() {
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0);
let expr = CreateTableExprGeneratorBuilder::default()
.columns(10)
.partition(10)
.if_not_exists(true)
.engine("mito2")
.build()
.unwrap()
.generate(&mut rng)
.unwrap();
let mut table_ctx = TableContext::from(&expr);
// "age < 10"
// "age >= 10 AND age < 20"
// "age >= 20" (SPLIT) INTO (age >= 20 AND age < 30, age >= 30)
let partitions = SimplePartitions::new(
table_ctx.partition.as_ref().unwrap().columns[0].clone(),
vec![Value::from(10), Value::from(20)],
)
.generate()
.unwrap();
// "age < 10"
// "age >= 10 AND age < 20"
// "age >= 20" AND age < 30"
// "age >= 30"
let expected_exprs = SimplePartitions::new(
table_ctx.partition.as_ref().unwrap().columns[0].clone(),
vec![Value::from(10), Value::from(20), Value::from(30)],
)
.generate()
.unwrap();
table_ctx.partition.as_mut().unwrap().exprs = partitions.clone();
let table_ctx = table_ctx
.repartition(RepartitionExpr::Split(SplitPartitionExpr {
table_name: expr.table_name.clone(),
target: partitions.last().unwrap().clone(),
into: vec![expected_exprs[2].clone(), expected_exprs[3].clone()],
}))
.unwrap();
let partition_def = table_ctx.partition.as_ref().unwrap();
assert_eq!(partition_def.exprs, expected_exprs);
}
#[test]
fn test_apply_merge_partition_expr() {
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0);
let expr = CreateTableExprGeneratorBuilder::default()
.columns(10)
.partition(10)
.if_not_exists(true)
.engine("mito2")
.build()
.unwrap()
.generate(&mut rng)
.unwrap();
let mut table_ctx = TableContext::from(&expr);
// "age < 10"
// "age >= 10 AND age < 20" (MERGE)
// "age >= 20" (MERGE)
let partitions = SimplePartitions::new(
table_ctx.partition.as_ref().unwrap().columns[0].clone(),
vec![Value::from(10), Value::from(20)],
)
.generate()
.unwrap();
// "age < 10"
// "age >= 10
let expected_exprs = SimplePartitions::new(
table_ctx.partition.as_ref().unwrap().columns[0].clone(),
vec![Value::from(10)],
)
.generate()
.unwrap();
table_ctx.partition.as_mut().unwrap().exprs = partitions.clone();
let table_ctx = table_ctx
.repartition(RepartitionExpr::Merge(MergePartitionExpr {
table_name: expr.table_name.clone(),
targets: vec![partitions[1].clone(), partitions[2].clone()],
}))
.unwrap();
let partition_def = table_ctx.partition.as_ref().unwrap();
assert_eq!(partition_def.exprs, expected_exprs);
}
}

View File

@@ -15,6 +15,7 @@
pub mod alter_expr;
pub mod create_expr;
pub mod insert_expr;
pub mod repartition_expr;
pub mod select_expr;
use std::fmt;

View File

@@ -17,7 +17,6 @@ use std::collections::HashMap;
use datatypes::data_type::ConcreteDataType;
use datatypes::value::Value;
use derive_builder::Builder;
use partition::expr::{Operand, PartitionExpr, RestrictedOp};
use rand::Rng;
use rand::seq::SliceRandom;
use snafu::{ResultExt, ensure};
@@ -30,6 +29,7 @@ use crate::generator::{ColumnOptionGenerator, ConcreteDataTypeGenerator, Random}
use crate::ir::create_expr::{
ColumnOption, CreateDatabaseExprBuilder, CreateTableExprBuilder, PartitionDef,
};
use crate::ir::partition_expr::SimplePartitions;
use crate::ir::{
Column, ColumnTypeGenerator, CreateDatabaseExpr, CreateTableExpr, Ident,
PartibleColumnTypeGenerator, StringColumnTypeGenerator, TsColumnTypeGenerator,
@@ -184,38 +184,11 @@ fn generate_partition_def(
column_name: Ident,
) -> PartitionDef {
let bounds = generate_partition_bounds(&column_type, partitions - 1);
let mut partition_exprs = Vec::with_capacity(partitions);
let first_bound = bounds[0].clone();
partition_exprs.push(PartitionExpr::new(
Operand::Column(column_name.to_string()),
RestrictedOp::Lt,
Operand::Value(first_bound),
));
for bound_idx in 1..bounds.len() {
partition_exprs.push(PartitionExpr::new(
Operand::Expr(PartitionExpr::new(
Operand::Column(column_name.to_string()),
RestrictedOp::GtEq,
Operand::Value(bounds[bound_idx - 1].clone()),
)),
RestrictedOp::And,
Operand::Expr(PartitionExpr::new(
Operand::Column(column_name.to_string()),
RestrictedOp::Lt,
Operand::Value(bounds[bound_idx].clone()),
)),
));
}
let last_bound = bounds.last().unwrap().clone();
partition_exprs.push(PartitionExpr::new(
Operand::Column(column_name.to_string()),
RestrictedOp::GtEq,
Operand::Value(last_bound),
));
let partitions = SimplePartitions::new(column_name.clone(), bounds);
let partition_exprs = partitions.generate().unwrap();
PartitionDef {
columns: vec![column_name.to_string()],
columns: vec![column_name.clone()],
exprs: partition_exprs,
}
}
@@ -425,7 +398,7 @@ mod tests {
.unwrap();
let serialized = serde_json::to_string(&expr).unwrap();
let expected = r#"{"table_name":{"value":"quasi","quote_style":null},"columns":[{"name":{"value":"mOLEsTIAs","quote_style":null},"column_type":{"Float64":{}},"options":["PrimaryKey","Null"]},{"name":{"value":"CUMQUe","quote_style":null},"column_type":{"Timestamp":{"Second":null}},"options":["TimeIndex"]},{"name":{"value":"NaTus","quote_style":null},"column_type":{"Int64":{}},"options":[]},{"name":{"value":"EXPeDITA","quote_style":null},"column_type":{"Float64":{}},"options":[]},{"name":{"value":"ImPEDiT","quote_style":null},"column_type":{"Float32":{}},"options":[{"DefaultValue":{"Float32":0.56425774}}]},{"name":{"value":"ADIpisci","quote_style":null},"column_type":{"Float32":{}},"options":["PrimaryKey"]},{"name":{"value":"deBITIs","quote_style":null},"column_type":{"Float32":{}},"options":[{"DefaultValue":{"Float32":0.31315368}}]},{"name":{"value":"toTaM","quote_style":null},"column_type":{"Int32":{}},"options":["NotNull"]},{"name":{"value":"QuI","quote_style":null},"column_type":{"Float32":{}},"options":[{"DefaultValue":{"Float32":0.39941502}}]},{"name":{"value":"INVeNtOre","quote_style":null},"column_type":{"Boolean":null},"options":["PrimaryKey"]}],"if_not_exists":true,"partition":{"columns":["mOLEsTIAs"],"exprs":[{"lhs":{"Column":"mOLEsTIAs"},"op":"Lt","rhs":{"Value":{"Float64":5.992310449541053e+307}}},{"lhs":{"Expr":{"lhs":{"Column":"mOLEsTIAs"},"op":"GtEq","rhs":{"Value":{"Float64":5.992310449541053e+307}}}},"op":"And","rhs":{"Expr":{"lhs":{"Column":"mOLEsTIAs"},"op":"Lt","rhs":{"Value":{"Float64":1.1984620899082105e+308}}}}},{"lhs":{"Column":"mOLEsTIAs"},"op":"GtEq","rhs":{"Value":{"Float64":1.1984620899082105e+308}}}]},"engine":"mito2","options":{},"primary_keys":[0,5,9]}"#;
let expected = r#"{"table_name":{"value":"quasi","quote_style":null},"columns":[{"name":{"value":"mOLEsTIAs","quote_style":null},"column_type":{"Float64":{}},"options":["PrimaryKey","Null"]},{"name":{"value":"CUMQUe","quote_style":null},"column_type":{"Timestamp":{"Second":null}},"options":["TimeIndex"]},{"name":{"value":"NaTus","quote_style":null},"column_type":{"Int64":{}},"options":[]},{"name":{"value":"EXPeDITA","quote_style":null},"column_type":{"Float64":{}},"options":[]},{"name":{"value":"ImPEDiT","quote_style":null},"column_type":{"Float32":{}},"options":[{"DefaultValue":{"Float32":0.56425774}}]},{"name":{"value":"ADIpisci","quote_style":null},"column_type":{"Float32":{}},"options":["PrimaryKey"]},{"name":{"value":"deBITIs","quote_style":null},"column_type":{"Float32":{}},"options":[{"DefaultValue":{"Float32":0.31315368}}]},{"name":{"value":"toTaM","quote_style":null},"column_type":{"Int32":{}},"options":["NotNull"]},{"name":{"value":"QuI","quote_style":null},"column_type":{"Float32":{}},"options":[{"DefaultValue":{"Float32":0.39941502}}]},{"name":{"value":"INVeNtOre","quote_style":null},"column_type":{"Boolean":null},"options":["PrimaryKey"]}],"if_not_exists":true,"partition":{"columns":[{"value":"mOLEsTIAs","quote_style":null}],"exprs":[{"lhs":{"Column":"mOLEsTIAs"},"op":"Lt","rhs":{"Value":{"Float64":5.992310449541053e+307}}},{"lhs":{"Expr":{"lhs":{"Column":"mOLEsTIAs"},"op":"GtEq","rhs":{"Value":{"Float64":5.992310449541053e+307}}}},"op":"And","rhs":{"Expr":{"lhs":{"Column":"mOLEsTIAs"},"op":"Lt","rhs":{"Value":{"Float64":1.1984620899082105e+308}}}}},{"lhs":{"Column":"mOLEsTIAs"},"op":"GtEq","rhs":{"Value":{"Float64":1.1984620899082105e+308}}}]},"engine":"mito2","options":{},"primary_keys":[0,5,9]}"#;
assert_eq!(expected, serialized);
}

View File

@@ -0,0 +1,230 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use derive_builder::Builder;
use rand::Rng;
use crate::context::TableContextRef;
use crate::error::{Error, Result};
use crate::generator::Generator;
use crate::ir::partition_expr::{SimplePartitions, generate_unique_bound};
use crate::ir::repartition_expr::{MergePartitionExpr, SplitPartitionExpr};
#[derive(Builder)]
#[builder(pattern = "owned")]
pub struct SplitPartitionExprGenerator {
table_ctx: TableContextRef,
}
impl<R: Rng + 'static> Generator<SplitPartitionExpr, R> for SplitPartitionExprGenerator {
type Error = Error;
fn generate(&self, rng: &mut R) -> Result<SplitPartitionExpr> {
let table_name = self.table_ctx.name.clone();
let partition_def = self
.table_ctx
.partition
.as_ref()
.expect("expected partition def");
let mut partitions =
SimplePartitions::from_exprs(partition_def.columns[0].clone(), &partition_def.exprs)?;
let column = self
.table_ctx
.columns
.iter()
.find(|c| c.name.value == partitions.column_name.value)
.unwrap_or_else(|| {
panic!(
"partition column not found: {}, columns: {:?}",
partitions.column_name.value,
self.table_ctx
.columns
.iter()
.map(|c| &c.name.value)
.collect::<Vec<_>>()
)
});
let new_bound = generate_unique_bound(rng, &column.column_type, &partitions.bounds)?;
let insert_pos = partitions.insert_bound(new_bound)?;
let from_expr = partition_def.exprs[insert_pos].clone();
let new_exprs = partitions.generate()?;
let left = new_exprs[insert_pos].clone();
let right = new_exprs[insert_pos + 1].clone();
Ok(SplitPartitionExpr {
table_name,
target: from_expr,
into: vec![left, right],
})
}
}
#[derive(Builder)]
#[builder(pattern = "owned")]
pub struct MergePartitionExprGenerator {
table_ctx: TableContextRef,
}
impl<R: Rng + 'static> Generator<MergePartitionExpr, R> for MergePartitionExprGenerator {
type Error = Error;
fn generate(&self, rng: &mut R) -> Result<MergePartitionExpr> {
let table_name = self.table_ctx.name.clone();
let partition_def = self
.table_ctx
.partition
.as_ref()
.expect("expected partition def");
let mut partitions =
SimplePartitions::from_exprs(partition_def.columns[0].clone(), &partition_def.exprs)?;
let remove_idx = rng.random_range(0..partitions.bounds.len());
partitions.remove_bound(remove_idx)?;
let left = partition_def.exprs[remove_idx].clone();
let right = partition_def.exprs[remove_idx + 1].clone();
Ok(MergePartitionExpr {
table_name,
targets: vec![left, right],
})
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use rand::{Rng, SeedableRng};
use crate::context::TableContext;
use crate::generator::Generator;
use crate::generator::create_expr::CreateTableExprGeneratorBuilder;
use crate::generator::repartition_expr::{
MergePartitionExprGeneratorBuilder, SplitPartitionExprGeneratorBuilder,
};
use crate::ir::repartition_expr::RepartitionExpr;
use crate::translator::DslTranslator;
use crate::translator::mysql::repartition_expr::RepartitionExprTranslator;
#[test]
fn test_split_partition_expr() {
common_telemetry::init_default_ut_logging();
let mut rng = rand::rng();
let seed = rng.random::<u64>();
common_telemetry::info!("initializing rng with seed: {seed}");
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(seed);
let expr = CreateTableExprGeneratorBuilder::default()
.columns(10)
.partition(10)
.if_not_exists(true)
.engine("mito2")
.build()
.unwrap()
.generate(&mut rng)
.unwrap();
let table_ctx = Arc::new(TableContext::from(&expr));
SplitPartitionExprGeneratorBuilder::default()
.table_ctx(table_ctx)
.build()
.unwrap()
.generate(&mut rng)
.unwrap();
}
#[test]
fn test_split_partition_expr_deterministic() {
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(1234);
let expr = CreateTableExprGeneratorBuilder::default()
.columns(10)
.partition(10)
.if_not_exists(true)
.engine("mito2")
.build()
.unwrap()
.generate(&mut rng)
.unwrap();
let table_ctx = Arc::new(TableContext::from(&expr));
let expr = SplitPartitionExprGeneratorBuilder::default()
.table_ctx(table_ctx)
.build()
.unwrap()
.generate(&mut rng)
.unwrap();
let sql = RepartitionExprTranslator
.translate(&RepartitionExpr::Split(expr))
.unwrap();
let expected = r#"ALTER TABLE quO SPLIT PARTITION (
MaGnI >= 1844674407370955160 AND MaGnI < 2767011611056432740
) INTO (
MaGnI >= 1844674407370955160 AND MaGnI < 2290232243991136014,
MaGnI >= 2290232243991136014 AND MaGnI < 2767011611056432740
);"#;
assert_eq!(expected, sql);
}
#[test]
fn test_merge_partition_expr() {
common_telemetry::init_default_ut_logging();
let mut rng = rand::rng();
let seed = rng.random::<u64>();
common_telemetry::info!("initializing rng with seed: {seed}");
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(seed);
let expr = CreateTableExprGeneratorBuilder::default()
.columns(10)
.partition(10)
.if_not_exists(true)
.engine("mito2")
.build()
.unwrap()
.generate(&mut rng)
.unwrap();
let table_ctx = Arc::new(TableContext::from(&expr));
MergePartitionExprGeneratorBuilder::default()
.table_ctx(table_ctx)
.build()
.unwrap()
.generate(&mut rng)
.unwrap();
}
#[test]
fn test_merge_partition_expr_deterministic() {
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(1234);
let expr = CreateTableExprGeneratorBuilder::default()
.columns(10)
.partition(10)
.if_not_exists(true)
.engine("mito2")
.build()
.unwrap()
.generate(&mut rng)
.unwrap();
let table_ctx = Arc::new(TableContext::from(&expr));
let expr = MergePartitionExprGeneratorBuilder::default()
.table_ctx(table_ctx)
.build()
.unwrap()
.generate(&mut rng)
.unwrap();
let sql = RepartitionExprTranslator
.translate(&RepartitionExpr::Merge(expr))
.unwrap();
let expected = r#"ALTER TABLE quO MERGE PARTITION (
MaGnI >= 3689348814741910320 AND MaGnI < 4611686018427387900,
MaGnI >= 4611686018427387900 AND MaGnI < 5534023222112865480
);"#;
assert_eq!(expected, sql);
}
}

View File

@@ -17,6 +17,8 @@
pub(crate) mod alter_expr;
pub(crate) mod create_expr;
pub(crate) mod insert_expr;
pub(crate) mod partition_expr;
pub(crate) mod repartition_expr;
pub(crate) mod select_expr;
use core::fmt;
@@ -36,6 +38,7 @@ pub use insert_expr::InsertIntoExpr;
use lazy_static::lazy_static;
use rand::Rng;
use rand::seq::{IndexedRandom, SliceRandom};
pub use repartition_expr::RepartitionExpr;
use serde::{Deserialize, Serialize};
use self::insert_expr::{RowValue, RowValues};

View File

@@ -72,7 +72,7 @@ pub struct CreateTableExpr {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PartitionDef {
pub columns: Vec<String>,
pub columns: Vec<Ident>,
pub exprs: Vec<PartitionExpr>,
}

View File

@@ -0,0 +1,272 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use datatypes::prelude::ConcreteDataType;
use datatypes::value::Value;
use partition::expr::{Operand, PartitionExpr, RestrictedOp};
use rand::Rng;
use snafu::ensure;
use crate::error::{self, Result};
use crate::ir::{Ident, generate_random_value};
/// A partitioning scheme that divides a single column into multiple ranges based on provided bounds.
///
/// `SimplePartitions` is designed for range-based partitioning on a given column using explicit boundary values.
/// Each partition is represented by an interval. With `n` bounds, there are `n+1` resulting partitions.
///
/// # Example
///
/// Partitioning by the column `"age"` with bounds `[10, 20]` generates the following partitions:
/// - Partition 1: `age < 10`
/// - Partition 2: `age >= 10 AND age < 20`
/// - Partition 3: `age >= 20`
///
/// # Fields
/// - `column_name`: The name of the column used for partitioning.
/// - `bounds`: The partition boundary values; must be sorted for correct partitioning logic.
pub struct SimplePartitions {
/// The column to partition by.
pub column_name: Ident,
/// The boundaries that define each partition range.
///
/// With `k` bounds, the following partitions are created:
/// - `< bound[0]`
/// - `[bound[0], bound[1])`
/// - ...
/// - `>= bound[k-1]`
pub bounds: Vec<Value>,
}
impl SimplePartitions {
pub fn new(column_name: Ident, bounds: Vec<Value>) -> Self {
Self {
column_name,
bounds,
}
}
/// Generates partition expressions for the defined bounds on a single column.
///
/// Returns a vector of `PartitionExpr` representing all resulting partitions.
pub fn generate(&self) -> Result<Vec<PartitionExpr>> {
ensure!(
!self.bounds.is_empty(),
error::UnexpectedSnafu {
violated: "partition bounds must not be empty".to_string(),
}
);
let mut sorted = self.bounds.clone();
sorted.sort();
let mut exprs = Vec::with_capacity(sorted.len() + 1);
let first_bound = sorted[0].clone();
let column_name = self.column_name.value.clone();
exprs.push(PartitionExpr::new(
Operand::Column(column_name.clone()),
RestrictedOp::Lt,
Operand::Value(first_bound),
));
for bound_idx in 1..sorted.len() {
exprs.push(PartitionExpr::new(
Operand::Expr(PartitionExpr::new(
Operand::Column(column_name.clone()),
RestrictedOp::GtEq,
Operand::Value(sorted[bound_idx - 1].clone()),
)),
RestrictedOp::And,
Operand::Expr(PartitionExpr::new(
Operand::Column(column_name.clone()),
RestrictedOp::Lt,
Operand::Value(sorted[bound_idx].clone()),
)),
));
}
let last_bound = sorted.last().unwrap().clone();
exprs.push(PartitionExpr::new(
Operand::Column(column_name.clone()),
RestrictedOp::GtEq,
Operand::Value(last_bound),
));
Ok(exprs)
}
/// Reconstructs a `SimplePartitions` instance from a slice of `PartitionExpr`.
///
/// This extracts the upper-bound partition values for a given column from a list of
/// partition expressions (typically produced by `generate`). Only expressions that
/// define an upper bound for the column are included. Must not be passed an empty slice.
///
/// # Arguments
///
/// * `column_name` - The name of the column to partition by.
/// * `exprs` - A list of partition expressions, each specifying a partition's bounding condition.
///
/// # Errors
///
/// Returns an error if `exprs` is empty or if any expression cannot be parsed for bounds.
///
/// # Returns
///
/// A [`SimplePartitions`] value, where the bounds vector contains each extracted upper bound.
pub fn from_exprs(column_name: Ident, exprs: &[PartitionExpr]) -> Result<Self> {
ensure!(
!exprs.is_empty(),
error::UnexpectedSnafu {
violated: "partition exprs must not be empty".to_string(),
}
);
let mut bounds = Vec::new();
for expr in exprs {
if let Some(bound) = extract_upper_bound(&column_name.value, expr)? {
bounds.push(bound);
}
}
Ok(Self::new(column_name, bounds))
}
/// Inserts a new bound into the partition bounds and returns the index of the new bound.
pub fn insert_bound(&mut self, bound: Value) -> Result<usize> {
ensure!(
!self.bounds.contains(&bound),
error::UnexpectedSnafu {
violated: format!("duplicate bound: {bound}"),
}
);
self.bounds.push(bound.clone());
self.bounds.sort();
let insert_pos = self.bounds.binary_search(&bound).unwrap();
Ok(insert_pos)
}
/// Removes a bound at the specified index and returns the removed bound.
pub fn remove_bound(&mut self, idx: usize) -> Result<Value> {
ensure!(
idx < self.bounds.len(),
error::UnexpectedSnafu {
violated: format!("index out of bounds: {idx}"),
}
);
Ok(self.bounds.remove(idx))
}
}
fn extract_upper_bound(column: &str, expr: &PartitionExpr) -> Result<Option<Value>> {
match expr.op {
RestrictedOp::Lt | RestrictedOp::LtEq => {
let value = extract_column_value(column, expr)?;
Ok(Some(value))
}
RestrictedOp::Gt | RestrictedOp::GtEq => Ok(None),
RestrictedOp::And => {
let left = extract_expr_operand(expr.lhs.as_ref())?;
let right = extract_expr_operand(expr.rhs.as_ref())?;
let (left_op, left_value) = extract_bound_operand(column, left)?;
let (right_op, right_value) = extract_bound_operand(column, right)?;
let upper = match (is_upper_op(&left_op), is_upper_op(&right_op)) {
(true, false) => Some(left_value),
(false, true) => Some(right_value),
_ => None,
};
Ok(upper)
}
_ => error::UnexpectedSnafu {
violated: format!("unsupported partition op: {:?}", expr.op),
}
.fail(),
}
}
fn extract_expr_operand(operand: &Operand) -> Result<&PartitionExpr> {
match operand {
Operand::Expr(expr) => Ok(expr),
_ => error::UnexpectedSnafu {
violated: "expected partition expr operand".to_string(),
}
.fail(),
}
}
fn extract_bound_operand(column: &str, expr: &PartitionExpr) -> Result<(RestrictedOp, Value)> {
let lhs = expr.lhs.as_ref();
let rhs = expr.rhs.as_ref();
match (lhs, rhs) {
(Operand::Column(col), Operand::Value(value)) if col == column => {
Ok((expr.op.clone(), value.clone()))
}
_ => error::UnexpectedSnafu {
violated: format!("unexpected partition expr for column: {column}"),
}
.fail(),
}
}
fn extract_column_value(column: &str, expr: &PartitionExpr) -> Result<Value> {
let (op, value) = extract_bound_operand(column, expr)?;
ensure!(
is_upper_op(&op),
error::UnexpectedSnafu {
violated: format!("expected upper bound op, got: {:?}", op),
}
);
Ok(value)
}
fn is_upper_op(op: &RestrictedOp) -> bool {
matches!(op, RestrictedOp::Lt | RestrictedOp::LtEq)
}
/// Generates a unique partition bound that is not in the given bounds.
pub fn generate_unique_bound<R: Rng + 'static>(
rng: &mut R,
datatype: &ConcreteDataType,
bounds: &[Value],
) -> Result<Value> {
for _ in 0..16 {
let candidate = generate_random_value(rng, datatype, None);
if !bounds.contains(&candidate) {
return Ok(candidate);
}
}
error::UnexpectedSnafu {
violated: "unable to generate unique partition bound".to_string(),
}
.fail()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_simple_partitions() {
let partitions =
SimplePartitions::new(Ident::new("age"), vec![Value::from(10), Value::from(20)]);
let exprs = partitions.generate().unwrap();
assert_eq!(exprs.len(), 3);
assert_eq!(exprs[0].to_string(), "age < 10");
assert_eq!(exprs[1].to_string(), "age >= 10 AND age < 20");
assert_eq!(exprs[2].to_string(), "age >= 20");
}
#[test]
fn test_simple_partitions_from_exprs() {
let partitions =
SimplePartitions::new(Ident::new("age"), vec![Value::from(10), Value::from(20)]);
let exprs = partitions.generate().unwrap();
let partitions = SimplePartitions::from_exprs(Ident::new("age"), &exprs).unwrap();
assert_eq!(partitions.bounds, vec![Value::from(10), Value::from(20)]);
}
}

View File

@@ -0,0 +1,37 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use partition::expr::PartitionExpr;
use serde::{Deserialize, Serialize};
use crate::ir::Ident;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SplitPartitionExpr {
pub table_name: Ident,
pub target: PartitionExpr,
pub into: Vec<PartitionExpr>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MergePartitionExpr {
pub table_name: Ident,
pub targets: Vec<PartitionExpr>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RepartitionExpr {
Split(SplitPartitionExpr),
Merge(MergePartitionExpr),
}

View File

@@ -15,4 +15,5 @@
pub mod alter_expr;
pub mod create_expr;
pub mod insert_expr;
pub mod repartition_expr;
pub mod select_expr;

View File

@@ -74,7 +74,12 @@ impl CreateTableExprTranslator {
input.partition.as_ref().map(|partition| {
format!(
"PARTITION ON COLUMNS({}) (\n{}\n)",
partition.columns.join(", "),
partition
.columns
.iter()
.map(|c| c.to_string())
.collect::<Vec<_>>()
.join(", "),
partition
.exprs
.iter()
@@ -172,6 +177,7 @@ mod tests {
use partition::expr::{Operand, PartitionExpr, RestrictedOp};
use super::CreateTableExprTranslator;
use crate::ir::Ident;
use crate::ir::create_expr::{CreateDatabaseExprBuilder, CreateTableExprBuilder, PartitionDef};
use crate::test_utils;
use crate::translator::DslTranslator;
@@ -185,7 +191,7 @@ mod tests {
.engine("mito")
.primary_keys(vec![0, 1])
.partition(PartitionDef {
columns: vec!["idc".to_string()],
columns: vec![Ident::new("idc")],
exprs: vec![
PartitionExpr::new(
Operand::Column("idc".to_string()),

View File

@@ -0,0 +1,113 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use partition::expr::PartitionExpr;
use crate::error::Result;
use crate::ir::repartition_expr::{MergePartitionExpr, RepartitionExpr, SplitPartitionExpr};
use crate::translator::DslTranslator;
pub struct RepartitionExprTranslator;
impl DslTranslator<RepartitionExpr, String> for RepartitionExprTranslator {
type Error = crate::error::Error;
fn translate(&self, input: &RepartitionExpr) -> Result<String> {
match input {
RepartitionExpr::Split(SplitPartitionExpr {
table_name,
target,
into,
}) => {
let target_expr = format_partition_expr_sql(target);
let into_exprs = into
.iter()
.map(format_partition_expr_sql)
.collect::<Vec<_>>()
.join(",\n ");
Ok(format!(
"ALTER TABLE {} SPLIT PARTITION (\n {}\n) INTO (\n {}\n);",
table_name, target_expr, into_exprs
))
}
RepartitionExpr::Merge(MergePartitionExpr {
table_name,
targets,
}) => {
let merge_exprs = targets
.iter()
.map(format_partition_expr_sql)
.collect::<Vec<_>>()
.join(",\n ");
Ok(format!(
"ALTER TABLE {} MERGE PARTITION (\n {}\n);",
table_name, merge_exprs
))
}
}
}
}
fn format_partition_expr_sql(expr: &PartitionExpr) -> String {
expr.to_parser_expr().to_string()
}
#[cfg(test)]
mod tests {
use datatypes::value::Value;
use partition::expr::col;
use super::RepartitionExprTranslator;
use crate::ir::repartition_expr::{MergePartitionExpr, RepartitionExpr, SplitPartitionExpr};
use crate::translator::DslTranslator;
#[test]
fn test_translate_split_expr() {
let expr = RepartitionExpr::Split(SplitPartitionExpr {
table_name: "demo".into(),
target: col("id").lt(Value::Int32(10)),
into: vec![
col("id").lt(Value::Int32(5)),
col("id")
.gt_eq(Value::Int32(5))
.and(col("id").lt(Value::Int32(10))),
],
});
let sql = RepartitionExprTranslator.translate(&expr).unwrap();
let expected = r#"ALTER TABLE demo SPLIT PARTITION (
id < 10
) INTO (
id < 5,
id >= 5 AND id < 10
);"#;
assert_eq!(sql, expected);
}
#[test]
fn test_translate_merge_expr() {
let expr = RepartitionExpr::Merge(MergePartitionExpr {
table_name: "demo".into(),
targets: vec![
col("id").gt_eq(Value::Int32(10)),
col("id").gt_eq(Value::Int32(20)),
],
});
let sql = RepartitionExprTranslator.translate(&expr).unwrap();
let expected = r#"ALTER TABLE demo MERGE PARTITION (
id >= 10,
id >= 20
);"#;
assert_eq!(sql, expected);
}
}

View File

@@ -13,5 +13,6 @@
// limitations under the License.
pub mod column;
pub mod partition;
pub mod row;
pub mod table;

View File

@@ -0,0 +1,89 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use snafu::{ResultExt, ensure};
use sqlx::MySqlPool;
use crate::error;
use crate::error::Result;
use crate::ir::Ident;
use crate::ir::create_expr::PartitionDef;
const PARTITIONS_INFO_SCHEMA_SQL: &str = "SELECT table_catalog, table_schema, table_name, \
partition_name, partition_expression, partition_description, greptime_partition_id, \
partition_ordinal_position FROM information_schema.partitions WHERE table_name = ? \
ORDER BY partition_ordinal_position;";
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct PartitionInfo {
pub table_catalog: String,
pub table_schema: String,
pub table_name: String,
pub partition_name: String,
pub partition_expression: String,
pub partition_description: String,
pub greptime_partition_id: u64,
pub partition_ordinal_position: i64,
}
/// Fetches the partitions info from the information_schema.partitions table.
pub async fn fetch_partitions_info_schema(
db: &MySqlPool,
_schema_name: Ident,
table: &Ident,
) -> Result<Vec<PartitionInfo>> {
sqlx::query_as::<_, PartitionInfo>(PARTITIONS_INFO_SCHEMA_SQL)
.bind(&table.value)
.fetch_all(db)
.await
.context(error::ExecuteQuerySnafu {
sql: PARTITIONS_INFO_SCHEMA_SQL,
})
}
/// Asserts the partitions are equal to the expected partitions.
pub fn assert_partitions(expected: &PartitionDef, actual: &[PartitionInfo]) -> Result<()> {
ensure!(
expected.exprs.len() == actual.len(),
error::AssertSnafu {
reason: format!(
"Expected partitions length: {}, got: {}",
expected.exprs.len(),
actual.len()
),
}
);
let expected_exprs = expected.exprs.iter().map(|expr| expr.to_string());
for expr in expected_exprs {
let actual_expr = actual
.iter()
.find(|info| info.partition_description == expr);
ensure!(
actual_expr.is_some(),
error::AssertSnafu {
reason: format!(
"Expected partition expression: {expr} not found, actual: {:?}",
actual
.iter()
.map(|info| info.partition_description.clone())
.collect::<Vec<_>>()
.join(", ")
),
}
);
}
Ok(())
}

View File

@@ -50,6 +50,7 @@ use tests_fuzz::utils::{
get_gt_fuzz_input_max_columns, init_greptime_connections_via_env,
};
use tests_fuzz::validator;
struct FuzzContext {
greptime: Pool<MySql>,
}

View File

@@ -0,0 +1,197 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![no_main]
use std::sync::Arc;
use arbitrary::{Arbitrary, Unstructured};
use common_telemetry::info;
use libfuzzer_sys::fuzz_target;
use rand::{Rng, SeedableRng};
use rand_chacha::ChaChaRng;
use snafu::ResultExt;
use sqlx::{MySql, Pool};
use tests_fuzz::context::{TableContext, TableContextRef};
use tests_fuzz::error::{self, Result};
use tests_fuzz::fake::{
MappedGenerator, WordGenerator, merge_two_word_map_fn, random_capitalize_map,
uppercase_and_keyword_backtick_map,
};
use tests_fuzz::generator::Generator;
use tests_fuzz::generator::create_expr::CreateTableExprGeneratorBuilder;
use tests_fuzz::generator::repartition_expr::{
MergePartitionExprGeneratorBuilder, SplitPartitionExprGeneratorBuilder,
};
use tests_fuzz::ir::{CreateTableExpr, MySQLTsColumnTypeGenerator, RepartitionExpr};
use tests_fuzz::translator::DslTranslator;
use tests_fuzz::translator::mysql::create_expr::CreateTableExprTranslator;
use tests_fuzz::translator::mysql::repartition_expr::RepartitionExprTranslator;
use tests_fuzz::utils::{
Connections, get_fuzz_override, get_gt_fuzz_input_max_alter_actions,
init_greptime_connections_via_env,
};
use tests_fuzz::validator;
struct FuzzContext {
greptime: Pool<MySql>,
}
impl FuzzContext {
async fn close(self) {
self.greptime.close().await;
}
}
#[derive(Clone, Debug)]
struct FuzzInput {
seed: u64,
actions: usize,
partitions: usize,
}
impl Arbitrary<'_> for FuzzInput {
fn arbitrary(u: &mut Unstructured<'_>) -> arbitrary::Result<Self> {
let seed = get_fuzz_override::<u64>("SEED").unwrap_or(u.int_in_range(u64::MIN..=u64::MAX)?);
let mut rng = ChaChaRng::seed_from_u64(seed);
let partitions =
get_fuzz_override::<usize>("PARTITIONS").unwrap_or_else(|| rng.random_range(2..8));
let max_actions = get_gt_fuzz_input_max_alter_actions();
let actions = get_fuzz_override::<usize>("ACTIONS")
.unwrap_or_else(|| rng.random_range(1..max_actions));
Ok(FuzzInput {
seed,
actions,
partitions,
})
}
}
fn generate_create_expr<R: Rng + 'static>(
input: &FuzzInput,
rng: &mut R,
) -> Result<CreateTableExpr> {
let create_table_generator = CreateTableExprGeneratorBuilder::default()
.name_generator(Box::new(MappedGenerator::new(
WordGenerator,
merge_two_word_map_fn(random_capitalize_map, uppercase_and_keyword_backtick_map),
)))
.columns(5)
.partition(input.partitions)
.engine("mito")
.ts_column_type_generator(Box::new(MySQLTsColumnTypeGenerator))
.build()
.unwrap();
create_table_generator.generate(rng)
}
async fn execute_repartition_table(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
info!("input: {input:?}");
let mut rng = ChaChaRng::seed_from_u64(input.seed);
// Create table
let expr = generate_create_expr(&input, &mut rng).unwrap();
let translator = CreateTableExprTranslator;
let sql = translator.translate(&expr)?;
let result = sqlx::query(&sql)
.execute(&ctx.greptime)
.await
.context(error::ExecuteQuerySnafu { sql: &sql })?;
info!("Create table: {sql}, result: {result:?}");
// Repartition table
let mut table_ctx = Arc::new(TableContext::from(&expr));
for i in 0..input.actions {
let partition_num = table_ctx.partition.as_ref().unwrap().exprs.len();
info!(
"partition_num: {partition_num}, action: {}/{}",
i + 1,
input.actions,
);
let expr = repartition_operation(&table_ctx, &mut rng)?;
let translator = RepartitionExprTranslator;
let sql = translator.translate(&expr)?;
info!("Repartition sql: {sql}");
let result = sqlx::query(&sql)
.execute(&ctx.greptime)
.await
.context(error::ExecuteQuerySnafu { sql: &sql })?;
info!("result: {result:?}");
table_ctx = Arc::new(Arc::unwrap_or_clone(table_ctx).repartition(expr).unwrap());
// Validates partition expression
let partition_entries = validator::partition::fetch_partitions_info_schema(
&ctx.greptime,
"public".into(),
&table_ctx.name,
)
.await?;
validator::partition::assert_partitions(
table_ctx.partition.as_ref().unwrap(),
&partition_entries,
)?;
// TODO(weny): inserts data and validates the data
}
// Cleans up
let table_name = table_ctx.name.clone();
let sql = format!("DROP TABLE {}", table_name);
let result = sqlx::query(&sql)
.execute(&ctx.greptime)
.await
.context(error::ExecuteQuerySnafu { sql })?;
info!("Drop table: {}, result: {result:?}", table_name);
ctx.close().await;
Ok(())
}
fn repartition_operation<R: Rng + 'static>(
table_ctx: &TableContextRef,
rng: &mut R,
) -> Result<RepartitionExpr> {
let split = rng.random_bool(0.5);
// If partition expression count is less than 2, we split it intorst.
if table_ctx.partition.as_ref().unwrap().exprs.len() <= 2 || split {
let expr = SplitPartitionExprGeneratorBuilder::default()
.table_ctx(table_ctx.clone())
.build()
.unwrap()
.generate(rng)?;
Ok(RepartitionExpr::Split(expr))
} else {
let expr = MergePartitionExprGeneratorBuilder::default()
.table_ctx(table_ctx.clone())
.build()
.unwrap()
.generate(rng)?;
Ok(RepartitionExpr::Merge(expr))
}
}
fuzz_target!(|input: FuzzInput| {
common_telemetry::init_default_ut_logging();
common_runtime::block_on_global(async {
let Connections { mysql } = init_greptime_connections_via_env().await;
let ctx = FuzzContext {
greptime: mysql.expect("mysql connection init must be succeed"),
};
execute_repartition_table(ctx, input)
.await
.unwrap_or_else(|err| panic!("fuzz test must be succeed: {err:?}"));
})
});