mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-06-01 12:50:40 +00:00
test: fuzz unpartitioned repartition (#8195)
* test: fuzz unpartitioned repartition Signed-off-by: WenyXu <wenymedia@gmail.com> * fix: invalidate table info cache Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: set max actions 128 Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: use InitStrategy::VersionChecked for table info cache Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: refine logs Signed-off-by: WenyXu <wenymedia@gmail.com> --------- Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
@@ -14,7 +14,9 @@
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_meta::cache::{CacheContainer, Initializer, TableInfoCacheRef, TableNameCacheRef};
|
||||
use common_meta::cache::{
|
||||
CacheContainer, InitStrategy, Initializer, TableInfoCacheRef, TableNameCacheRef,
|
||||
};
|
||||
use common_meta::error::{Result as MetaResult, ValueNotExistSnafu};
|
||||
use common_meta::instruction::CacheIdent;
|
||||
use futures::future::BoxFuture;
|
||||
@@ -38,7 +40,14 @@ pub fn new_table_cache(
|
||||
) -> TableCache {
|
||||
let init = init_factory(table_info_cache, table_name_cache);
|
||||
|
||||
CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
|
||||
CacheContainer::with_strategy(
|
||||
name,
|
||||
cache,
|
||||
Box::new(invalidator),
|
||||
init,
|
||||
filter,
|
||||
InitStrategy::VersionChecked,
|
||||
)
|
||||
}
|
||||
|
||||
fn init_factory(
|
||||
|
||||
@@ -17,7 +17,7 @@ mod flow;
|
||||
mod registry;
|
||||
mod table;
|
||||
|
||||
pub use container::{CacheContainer, Initializer, Invalidator, TokenFilter};
|
||||
pub use container::{CacheContainer, InitStrategy, Initializer, Invalidator, TokenFilter};
|
||||
pub use flow::{TableFlownodeSetCache, TableFlownodeSetCacheRef, new_table_flownode_set_cache};
|
||||
pub use registry::{
|
||||
CacheRegistry, CacheRegistryBuilder, CacheRegistryRef, LayeredCacheRegistry,
|
||||
|
||||
@@ -440,7 +440,17 @@ impl Context {
|
||||
};
|
||||
let _ = self
|
||||
.cache_invalidator
|
||||
.invalidate(&ctx, &[CacheIdent::TableId(table_id)])
|
||||
.invalidate(
|
||||
&ctx,
|
||||
&[
|
||||
CacheIdent::TableId(table_id),
|
||||
CacheIdent::TableName(TableName {
|
||||
catalog_name: self.persistent_ctx.catalog_name.clone(),
|
||||
schema_name: self.persistent_ctx.schema_name.clone(),
|
||||
table_name: self.persistent_ctx.table_name.clone(),
|
||||
}),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -95,10 +95,19 @@ impl State for UpdatePartitionMetadata {
|
||||
|
||||
let mut new_table_info = table_info_value.table_info.clone();
|
||||
new_table_info.meta.partition_key_indices = partition_key_indices;
|
||||
common_telemetry::info!(
|
||||
"Update table partition metadata, table_id: {}, partition_key_indices: {:?}, partition_columns: {:?}",
|
||||
table_id,
|
||||
new_table_info.meta.partition_key_indices,
|
||||
new_table_info
|
||||
.meta
|
||||
.partition_column_names()
|
||||
.cloned()
|
||||
.collect::<Vec<_>>(),
|
||||
);
|
||||
ctx.update_table_info(&table_info_value, table_info_value.update(new_table_info))
|
||||
.await?;
|
||||
// We don't invalidate cache here because the subsequent AllocateRegion step
|
||||
// will update the table route and invalidate the cache accordingly.
|
||||
ctx.invalidate_table_cache().await?;
|
||||
|
||||
Ok((
|
||||
Box::new(AllocateRegion::new(self.plan_entries.clone())),
|
||||
|
||||
@@ -200,6 +200,15 @@ impl TableContext {
|
||||
partitions.remove_bound(removed_idx)?;
|
||||
partition_def.exprs = partitions.generate()?;
|
||||
}
|
||||
RepartitionExpr::AlterPartitions(partition) => {
|
||||
ensure!(
|
||||
self.partition.is_none(),
|
||||
error::UnexpectedSnafu {
|
||||
violated: format!("Table {} already has partition", self.name),
|
||||
}
|
||||
);
|
||||
self.partition = Some(partition.partition);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(self)
|
||||
|
||||
@@ -44,6 +44,7 @@ pub struct CreateTableExprGenerator<R: Rng + 'static> {
|
||||
#[builder(setter(into))]
|
||||
engine: String,
|
||||
partition: usize,
|
||||
partition_column: bool,
|
||||
if_not_exists: bool,
|
||||
#[builder(setter(into))]
|
||||
name: Ident,
|
||||
@@ -67,6 +68,7 @@ impl<R: Rng + 'static> Default for CreateTableExprGenerator<R> {
|
||||
engine: DEFAULT_ENGINE.to_string(),
|
||||
if_not_exists: false,
|
||||
partition: 0,
|
||||
partition_column: false,
|
||||
name: Ident::new(""),
|
||||
with_clause: HashMap::default(),
|
||||
name_generator: Box::new(MappedGenerator::new(WordGenerator, random_capitalize_map)),
|
||||
@@ -95,7 +97,7 @@ impl<R: Rng + 'static> Generator<CreateTableExpr, R> for CreateTableExprGenerato
|
||||
let mut builder = CreateTableExprBuilder::default();
|
||||
let mut columns = Vec::with_capacity(self.columns);
|
||||
let mut primary_keys = vec![];
|
||||
let need_partible_column = self.partition > 1;
|
||||
let need_partible_column = self.partition > 1 || self.partition_column;
|
||||
let mut column_names = self.name_generator.choose(rng, self.columns);
|
||||
|
||||
if self.columns == 1 {
|
||||
@@ -123,13 +125,15 @@ impl<R: Rng + 'static> Generator<CreateTableExpr, R> for CreateTableExprGenerato
|
||||
)
|
||||
.remove(0);
|
||||
|
||||
// Generates partition bounds.
|
||||
let partition_def = generate_partition_def(
|
||||
self.partition,
|
||||
column.column_type.clone(),
|
||||
name.clone(),
|
||||
);
|
||||
builder.partition(partition_def);
|
||||
if self.partition > 1 {
|
||||
// Generates partition bounds.
|
||||
let partition_def = generate_partition_def(
|
||||
self.partition,
|
||||
column.column_type.clone(),
|
||||
name.clone(),
|
||||
);
|
||||
builder.partition(partition_def);
|
||||
}
|
||||
columns.push(column);
|
||||
}
|
||||
// Generates the ts column.
|
||||
@@ -178,11 +182,12 @@ impl<R: Rng + 'static> Generator<CreateTableExpr, R> for CreateTableExprGenerato
|
||||
}
|
||||
}
|
||||
|
||||
fn generate_partition_def(
|
||||
pub fn generate_partition_def(
|
||||
partitions: usize,
|
||||
column_type: ConcreteDataType,
|
||||
column_name: Ident,
|
||||
) -> PartitionDef {
|
||||
assert!(partitions > 1, "partitions must be greater than 1");
|
||||
let bounds = generate_partition_bounds(&column_type, partitions - 1);
|
||||
let partitions = SimplePartitions::new(column_name.clone(), bounds);
|
||||
let partition_exprs = partitions.generate().unwrap();
|
||||
@@ -193,24 +198,23 @@ fn generate_partition_def(
|
||||
}
|
||||
}
|
||||
|
||||
fn generate_metric_partition(partitions: usize) -> Option<(Column, PartitionDef)> {
|
||||
if partitions <= 1 {
|
||||
return None;
|
||||
}
|
||||
|
||||
let partition_column = Column {
|
||||
fn metric_partition_column() -> Column {
|
||||
Column {
|
||||
name: Ident::new("host"),
|
||||
column_type: ConcreteDataType::string_datatype(),
|
||||
options: vec![ColumnOption::PrimaryKey],
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
pub fn generate_metric_partition_def(partitions: usize) -> PartitionDef {
|
||||
assert!(partitions > 1, "partitions must be greater than 1");
|
||||
let partition_column = metric_partition_column();
|
||||
let bounds = generate_partition_bounds(&partition_column.column_type, partitions - 1);
|
||||
let partitions = SimplePartitions::new(partition_column.name.clone(), bounds);
|
||||
let partition_def = PartitionDef {
|
||||
PartitionDef {
|
||||
columns: vec![partitions.column_name.clone()],
|
||||
exprs: partitions.generate().unwrap(),
|
||||
};
|
||||
|
||||
Some((partition_column, partition_def))
|
||||
}
|
||||
}
|
||||
|
||||
/// Generate a physical table with 2 columns: ts of TimestampType::Millisecond as time index and val of Float64Type.
|
||||
@@ -223,6 +227,8 @@ pub struct CreatePhysicalTableExprGenerator<R: Rng + 'static> {
|
||||
if_not_exists: bool,
|
||||
#[builder(default = "0")]
|
||||
partition: usize,
|
||||
#[builder(default = "false")]
|
||||
partition_column: bool,
|
||||
#[builder(default, setter(into))]
|
||||
with_clause: HashMap<String, String>,
|
||||
}
|
||||
@@ -252,11 +258,13 @@ impl<R: Rng + 'static> Generator<CreateTableExpr, R> for CreatePhysicalTableExpr
|
||||
|
||||
let mut partition = None;
|
||||
let mut primary_keys = vec![];
|
||||
if let Some((partition_column, partition_def)) = generate_metric_partition(self.partition) {
|
||||
columns.push(partition_column);
|
||||
partition = Some(partition_def);
|
||||
if self.partition > 1 || self.partition_column {
|
||||
columns.push(metric_partition_column());
|
||||
primary_keys.push(columns.len() - 1);
|
||||
}
|
||||
if self.partition > 1 {
|
||||
partition = Some(generate_metric_partition_def(self.partition));
|
||||
}
|
||||
|
||||
Ok(CreateTableExpr {
|
||||
table_name: self.name_generator.generate(rng),
|
||||
@@ -387,6 +395,7 @@ mod tests {
|
||||
|
||||
use super::*;
|
||||
use crate::context::TableContext;
|
||||
use crate::ir::PARTIBLE_DATA_TYPES;
|
||||
|
||||
#[test]
|
||||
fn test_float64() {
|
||||
@@ -423,6 +432,18 @@ mod tests {
|
||||
.unwrap();
|
||||
assert_eq!(expr.columns.len(), 10);
|
||||
assert!(expr.partition.is_none());
|
||||
|
||||
let expr = CreateTableExprGeneratorBuilder::default()
|
||||
.columns(10)
|
||||
.partition(1)
|
||||
.partition_column(true)
|
||||
.build()
|
||||
.unwrap()
|
||||
.generate(&mut rng)
|
||||
.unwrap();
|
||||
assert_eq!(expr.columns.len(), 10);
|
||||
assert!(expr.partition.is_none());
|
||||
assert!(PARTIBLE_DATA_TYPES.contains(&expr.columns[0].column_type));
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -516,6 +537,25 @@ mod tests {
|
||||
assert_eq!(physical_table_expr.partition.unwrap().exprs.len(), 3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_create_physical_table_expr_generator_with_partition_column() {
|
||||
let mut rng = rand::rng();
|
||||
let physical_table_expr = CreatePhysicalTableExprGeneratorBuilder::default()
|
||||
.partition(1)
|
||||
.partition_column(true)
|
||||
.if_not_exists(false)
|
||||
.build()
|
||||
.unwrap()
|
||||
.generate(&mut rng)
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(physical_table_expr.engine, "metric");
|
||||
assert!(physical_table_expr.partition.is_none());
|
||||
assert_eq!(physical_table_expr.columns.len(), 3);
|
||||
assert_eq!(physical_table_expr.columns[2].name, Ident::new("host"));
|
||||
assert_eq!(physical_table_expr.primary_keys, vec![2]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_create_logical_table_expr_generator_without_partition_column() {
|
||||
let mut rng = rand::rng();
|
||||
|
||||
@@ -30,7 +30,7 @@ use std::time::Duration;
|
||||
pub use alter_expr::{AlterTableExpr, AlterTableOption};
|
||||
use common_time::timestamp::TimeUnit;
|
||||
use common_time::{Date, Timestamp};
|
||||
pub use create_expr::{CreateDatabaseExpr, CreateTableExpr};
|
||||
pub use create_expr::{CreateDatabaseExpr, CreateTableExpr, PartitionDef};
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::types::TimestampType;
|
||||
use datatypes::value::Value;
|
||||
@@ -40,7 +40,7 @@ use lazy_static::lazy_static;
|
||||
pub use partition_expr::SimplePartitions;
|
||||
use rand::Rng;
|
||||
use rand::seq::{IndexedRandom, SliceRandom};
|
||||
pub use repartition_expr::RepartitionExpr;
|
||||
pub use repartition_expr::{AlterTablePartitionsExpr, RepartitionExpr};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use self::insert_expr::RowValues;
|
||||
|
||||
@@ -16,6 +16,7 @@ use partition::expr::PartitionExpr;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::ir::Ident;
|
||||
use crate::ir::create_expr::PartitionDef;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct SplitPartitionExpr {
|
||||
@@ -34,10 +35,19 @@ pub struct MergePartitionExpr {
|
||||
pub wait: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct AlterTablePartitionsExpr {
|
||||
pub table_name: Ident,
|
||||
pub partition: PartitionDef,
|
||||
#[serde(default = "default_wait")]
|
||||
pub wait: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum RepartitionExpr {
|
||||
Split(SplitPartitionExpr),
|
||||
Merge(MergePartitionExpr),
|
||||
AlterPartitions(AlterTablePartitionsExpr),
|
||||
}
|
||||
|
||||
const fn default_wait() -> bool {
|
||||
|
||||
@@ -15,7 +15,10 @@
|
||||
use partition::expr::PartitionExpr;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::ir::repartition_expr::{MergePartitionExpr, RepartitionExpr, SplitPartitionExpr};
|
||||
use crate::ir::create_expr::PartitionDef;
|
||||
use crate::ir::repartition_expr::{
|
||||
AlterTablePartitionsExpr, MergePartitionExpr, RepartitionExpr, SplitPartitionExpr,
|
||||
};
|
||||
use crate::translator::DslTranslator;
|
||||
|
||||
pub struct RepartitionExprTranslator;
|
||||
@@ -59,10 +62,38 @@ impl DslTranslator<RepartitionExpr, String> for RepartitionExprTranslator {
|
||||
table_name, merge_exprs, wait_clause
|
||||
))
|
||||
}
|
||||
RepartitionExpr::AlterPartitions(AlterTablePartitionsExpr {
|
||||
table_name,
|
||||
partition,
|
||||
wait,
|
||||
}) => {
|
||||
let partition_clause = format_partition_clause(partition);
|
||||
let wait_clause = format_wait_clause(*wait);
|
||||
Ok(format!(
|
||||
"ALTER TABLE {} {}{};",
|
||||
table_name, partition_clause, wait_clause
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn format_partition_clause(partition: &PartitionDef) -> String {
|
||||
let columns = partition
|
||||
.columns
|
||||
.iter()
|
||||
.map(|column| column.to_string())
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ");
|
||||
let exprs = partition
|
||||
.exprs
|
||||
.iter()
|
||||
.map(format_partition_expr_sql)
|
||||
.collect::<Vec<_>>()
|
||||
.join(",\n ");
|
||||
format!("PARTITION ON COLUMNS ({columns}) (\n {exprs}\n)")
|
||||
}
|
||||
|
||||
fn format_partition_expr_sql(expr: &PartitionExpr) -> String {
|
||||
expr.to_parser_expr().to_string()
|
||||
}
|
||||
@@ -79,9 +110,15 @@ fn format_wait_clause(wait: bool) -> String {
|
||||
mod tests {
|
||||
use datatypes::value::Value;
|
||||
use partition::expr::col;
|
||||
use sql::dialect::GreptimeDbDialect;
|
||||
use sql::parser::{ParseOptions, ParserContext};
|
||||
|
||||
use super::RepartitionExprTranslator;
|
||||
use crate::ir::repartition_expr::{MergePartitionExpr, RepartitionExpr, SplitPartitionExpr};
|
||||
use crate::ir::Ident;
|
||||
use crate::ir::create_expr::PartitionDef;
|
||||
use crate::ir::repartition_expr::{
|
||||
AlterTablePartitionsExpr, MergePartitionExpr, RepartitionExpr, SplitPartitionExpr,
|
||||
};
|
||||
use crate::translator::DslTranslator;
|
||||
|
||||
#[test]
|
||||
@@ -149,4 +186,61 @@ mod tests {
|
||||
);"#;
|
||||
assert_eq!(sql, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_translate_alter_table_partitions_expr() {
|
||||
let expr = RepartitionExpr::AlterPartitions(AlterTablePartitionsExpr {
|
||||
table_name: "demo".into(),
|
||||
partition: PartitionDef {
|
||||
columns: vec![Ident::new("id")],
|
||||
exprs: vec![
|
||||
col("id").lt(Value::Int32(10)),
|
||||
col("id")
|
||||
.gt_eq(Value::Int32(10))
|
||||
.and(col("id").lt(Value::Int32(20))),
|
||||
col("id").gt_eq(Value::Int32(20)),
|
||||
],
|
||||
},
|
||||
wait: true,
|
||||
});
|
||||
let sql = RepartitionExprTranslator.translate(&expr).unwrap();
|
||||
let expected = r#"ALTER TABLE demo PARTITION ON COLUMNS (id) (
|
||||
id < 10,
|
||||
id >= 10 AND id < 20,
|
||||
id >= 20
|
||||
);"#;
|
||||
assert_eq!(sql, expected);
|
||||
assert_repartition_sql_parseable(&sql);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_translate_alter_table_partitions_expr_wait_false() {
|
||||
let expr = RepartitionExpr::AlterPartitions(AlterTablePartitionsExpr {
|
||||
table_name: "demo".into(),
|
||||
partition: PartitionDef {
|
||||
columns: vec![Ident::new("host")],
|
||||
exprs: vec![
|
||||
col("host").lt(Value::from("m")),
|
||||
col("host").gt_eq(Value::from("m")),
|
||||
],
|
||||
},
|
||||
wait: false,
|
||||
});
|
||||
let sql = RepartitionExprTranslator.translate(&expr).unwrap();
|
||||
let expected = r#"ALTER TABLE demo PARTITION ON COLUMNS (host) (
|
||||
host < 'm',
|
||||
host >= 'm'
|
||||
) WITH (
|
||||
WAIT = false
|
||||
);"#;
|
||||
assert_eq!(sql, expected);
|
||||
assert_repartition_sql_parseable(&sql);
|
||||
}
|
||||
|
||||
fn assert_repartition_sql_parseable(sql: &str) {
|
||||
let statements =
|
||||
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
|
||||
.unwrap();
|
||||
assert_eq!(statements.len(), 1);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,7 +21,8 @@ use crate::ir::Ident;
|
||||
use crate::ir::create_expr::PartitionDef;
|
||||
|
||||
const PARTITIONS_INFO_SCHEMA_SQL: &str = "SELECT table_catalog, table_schema, table_name, \
|
||||
partition_name, partition_expression, partition_description, greptime_partition_id, \
|
||||
partition_name, COALESCE(partition_expression, '') AS partition_expression, \
|
||||
COALESCE(partition_description, '') AS partition_description, greptime_partition_id, \
|
||||
partition_ordinal_position FROM information_schema.partitions WHERE table_name = ? \
|
||||
ORDER BY partition_ordinal_position;";
|
||||
|
||||
@@ -91,3 +92,20 @@ pub fn assert_partitions(expected: &PartitionDef, actual: &[PartitionInfo]) -> R
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Asserts that the table has no partition metadata in information schema.
|
||||
pub fn assert_unpartitioned(actual: &[PartitionInfo]) -> Result<()> {
|
||||
let has_no_partition_metadata = actual.is_empty()
|
||||
|| (actual.len() == 1
|
||||
&& actual[0].partition_expression.is_empty()
|
||||
&& actual[0].partition_description.is_empty());
|
||||
|
||||
ensure!(
|
||||
has_no_partition_metadata,
|
||||
error::AssertSnafu {
|
||||
reason: format!("Expected unpartitioned table, got partitions: {actual:?}"),
|
||||
}
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -36,14 +36,15 @@ use tests_fuzz::fake::{
|
||||
use tests_fuzz::generator::Generator;
|
||||
use tests_fuzz::generator::create_expr::{
|
||||
CreateLogicalTableExprGeneratorBuilder, CreatePhysicalTableExprGeneratorBuilder,
|
||||
generate_metric_partition_def,
|
||||
};
|
||||
use tests_fuzz::generator::insert_expr::InsertExprGeneratorBuilder;
|
||||
use tests_fuzz::generator::repartition_expr::{
|
||||
MergePartitionExprGeneratorBuilder, SplitPartitionExprGeneratorBuilder,
|
||||
};
|
||||
use tests_fuzz::ir::{
|
||||
CreateTableExpr, Ident, InsertIntoExpr, RepartitionExpr, generate_random_value,
|
||||
generate_unique_timestamp_for_mysql_with_clock,
|
||||
AlterTablePartitionsExpr, CreateTableExpr, Ident, InsertIntoExpr, PartitionDef,
|
||||
RepartitionExpr, generate_random_value, generate_unique_timestamp_for_mysql_with_clock,
|
||||
};
|
||||
use tests_fuzz::translator::DslTranslator;
|
||||
use tests_fuzz::translator::csv::InsertExprToCsvRecordsTranslator;
|
||||
@@ -94,6 +95,7 @@ fn generate_create_physical_table_expr<R: Rng + 'static>(
|
||||
))))
|
||||
.if_not_exists(rng.random_bool(0.5))
|
||||
.partition(partitions)
|
||||
.partition_column(partitions <= 1)
|
||||
.build()
|
||||
.unwrap()
|
||||
.generate(rng)
|
||||
@@ -158,12 +160,6 @@ async fn create_metric_tables<R: Rng + 'static>(
|
||||
})?;
|
||||
info!("Create physical table: {create_physical_sql}, result: {result:?}");
|
||||
let physical_table_ctx = Arc::new(TableContext::from(&create_physical_expr));
|
||||
ensure!(
|
||||
physical_table_ctx.partition.is_some(),
|
||||
error::AssertSnafu {
|
||||
reason: "Physical metric table must have partition".to_string()
|
||||
}
|
||||
);
|
||||
|
||||
let mut logical_tables = BTreeMap::new();
|
||||
let mut create_logical_sqls = HashMap::new();
|
||||
@@ -436,6 +432,11 @@ fn repartition_operation<R: Rng + 'static>(
|
||||
table_ctx: &TableContextRef,
|
||||
rng: &mut R,
|
||||
) -> Result<RepartitionExpr> {
|
||||
if table_ctx.partition.is_none() {
|
||||
let partition = generate_metric_partition_def(rng.random_range(2..8));
|
||||
return Ok(alter_table_partitions_expr(table_ctx, partition, true));
|
||||
}
|
||||
|
||||
let split = rng.random_bool(0.5);
|
||||
if table_ctx.partition.as_ref().unwrap().exprs.len() <= 2 || split {
|
||||
let expr = SplitPartitionExprGeneratorBuilder::default()
|
||||
@@ -454,19 +455,35 @@ fn repartition_operation<R: Rng + 'static>(
|
||||
}
|
||||
}
|
||||
|
||||
fn alter_table_partitions_expr(
|
||||
table_ctx: &TableContextRef,
|
||||
partition: PartitionDef,
|
||||
wait: bool,
|
||||
) -> RepartitionExpr {
|
||||
RepartitionExpr::AlterPartitions(AlterTablePartitionsExpr {
|
||||
table_name: table_ctx.name.clone(),
|
||||
partition,
|
||||
wait,
|
||||
})
|
||||
}
|
||||
|
||||
impl Arbitrary<'_> for FuzzInput {
|
||||
fn arbitrary(u: &mut Unstructured<'_>) -> arbitrary::Result<Self> {
|
||||
let seed = get_fuzz_override::<u64>("SEED").unwrap_or(u.int_in_range(u64::MIN..=u64::MAX)?);
|
||||
let mut rng = ChaChaRng::seed_from_u64(seed);
|
||||
let partitions =
|
||||
get_fuzz_override::<usize>("PARTITIONS").unwrap_or_else(|| rng.random_range(2..8));
|
||||
let partitions = get_fuzz_override::<usize>("PARTITIONS").unwrap_or_else(|| {
|
||||
if rng.random_bool(0.5) {
|
||||
1
|
||||
} else {
|
||||
rng.random_range(2..8)
|
||||
}
|
||||
});
|
||||
let max_tables = get_gt_fuzz_input_max_tables();
|
||||
let tables = get_fuzz_override::<usize>("TABLES")
|
||||
.unwrap_or_else(|| rng.random_range(1..=std::cmp::max(1, max_tables)));
|
||||
let max_actions = get_gt_fuzz_input_max_alter_actions();
|
||||
let max_actions = std::cmp::min(128, get_gt_fuzz_input_max_alter_actions());
|
||||
let actions = get_fuzz_override::<usize>("ACTIONS")
|
||||
.unwrap_or_else(|| rng.random_range(1..max_actions));
|
||||
|
||||
Ok(FuzzInput {
|
||||
seed,
|
||||
actions,
|
||||
@@ -536,7 +553,11 @@ async fn execute_repartition_metric_table(ctx: FuzzContext, input: FuzzInput) ->
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
|
||||
for i in 0..input.actions {
|
||||
let partition_num = physical_table_ctx.partition.as_ref().unwrap().exprs.len();
|
||||
let partition_num = physical_table_ctx
|
||||
.partition
|
||||
.as_ref()
|
||||
.map(|partition| partition.exprs.len())
|
||||
.unwrap_or_default();
|
||||
info!(
|
||||
"partition_num: {partition_num}, action: {}/{}, table: {}, logical table num: {}",
|
||||
i + 1,
|
||||
|
||||
@@ -33,14 +33,15 @@ use tests_fuzz::fake::{
|
||||
uppercase_and_keyword_backtick_map,
|
||||
};
|
||||
use tests_fuzz::generator::Generator;
|
||||
use tests_fuzz::generator::create_expr::CreateTableExprGeneratorBuilder;
|
||||
use tests_fuzz::generator::create_expr::{CreateTableExprGeneratorBuilder, generate_partition_def};
|
||||
use tests_fuzz::generator::insert_expr::InsertExprGeneratorBuilder;
|
||||
use tests_fuzz::generator::repartition_expr::{
|
||||
MergePartitionExprGeneratorBuilder, SplitPartitionExprGeneratorBuilder,
|
||||
};
|
||||
use tests_fuzz::ir::{
|
||||
CreateTableExpr, InsertIntoExpr, MySQLTsColumnTypeGenerator, RepartitionExpr, RowValue,
|
||||
SimplePartitions, generate_partition_value, generate_unique_timestamp_for_mysql_with_clock,
|
||||
AlterTablePartitionsExpr, CreateTableExpr, InsertIntoExpr, MySQLTsColumnTypeGenerator,
|
||||
PartitionDef, RepartitionExpr, RowValue, SimplePartitions, generate_partition_value,
|
||||
generate_unique_timestamp_for_mysql_with_clock,
|
||||
};
|
||||
use tests_fuzz::translator::DslTranslator;
|
||||
use tests_fuzz::translator::mysql::create_expr::CreateTableExprTranslator;
|
||||
@@ -75,8 +76,13 @@ impl Arbitrary<'_> for FuzzInput {
|
||||
fn arbitrary(u: &mut Unstructured<'_>) -> arbitrary::Result<Self> {
|
||||
let seed = get_fuzz_override::<u64>("SEED").unwrap_or(u.int_in_range(u64::MIN..=u64::MAX)?);
|
||||
let mut rng = ChaChaRng::seed_from_u64(seed);
|
||||
let partitions =
|
||||
get_fuzz_override::<usize>("PARTITIONS").unwrap_or_else(|| rng.random_range(2..8));
|
||||
let partitions = get_fuzz_override::<usize>("PARTITIONS").unwrap_or_else(|| {
|
||||
if rng.random_bool(0.5) {
|
||||
1
|
||||
} else {
|
||||
rng.random_range(2..8)
|
||||
}
|
||||
});
|
||||
let max_actions = get_gt_fuzz_input_max_alter_actions();
|
||||
let actions = get_fuzz_override::<usize>("ACTIONS")
|
||||
.unwrap_or_else(|| rng.random_range(1..max_actions));
|
||||
@@ -99,6 +105,7 @@ fn generate_create_expr<R: Rng + 'static>(
|
||||
)))
|
||||
.columns(5)
|
||||
.partition(input.partitions)
|
||||
.partition_column(input.partitions <= 1)
|
||||
.engine("mito")
|
||||
.ts_column_type_generator(Box::new(MySQLTsColumnTypeGenerator))
|
||||
.build()
|
||||
@@ -122,7 +129,7 @@ fn build_insert_expr<R: Rng + 'static>(
|
||||
let ts_value_generator = generate_unique_timestamp_for_mysql_with_clock(clock.clone());
|
||||
let counter = Arc::new(AtomicUsize::new(0));
|
||||
let counter_clone = counter.clone();
|
||||
let partition_len = table_ctx.partition.as_ref().unwrap().exprs.len();
|
||||
let partition_len = partitions.bounds.len() + 1;
|
||||
let row = rng.random_range(partition_len..partition_len * 2);
|
||||
|
||||
let moved_partitions = partitions.clone();
|
||||
@@ -150,6 +157,28 @@ fn build_insert_expr<R: Rng + 'static>(
|
||||
insert_generator.generate(rng).unwrap()
|
||||
}
|
||||
|
||||
fn alter_table_partitions_expr(
|
||||
table_ctx: &TableContextRef,
|
||||
partition: PartitionDef,
|
||||
wait: bool,
|
||||
) -> RepartitionExpr {
|
||||
RepartitionExpr::AlterPartitions(AlterTablePartitionsExpr {
|
||||
table_name: table_ctx.name.clone(),
|
||||
partition,
|
||||
wait,
|
||||
})
|
||||
}
|
||||
|
||||
fn alter_table_partitions_expr_from_table_ctx<R: Rng + 'static>(
|
||||
table_ctx: &TableContextRef,
|
||||
rng: &mut R,
|
||||
wait: bool,
|
||||
) -> RepartitionExpr {
|
||||
let column = table_ctx.columns[0].clone();
|
||||
let partition = generate_partition_def(rng.random_range(2..8), column.column_type, column.name);
|
||||
alter_table_partitions_expr(table_ctx, partition, wait)
|
||||
}
|
||||
|
||||
async fn execute_insert_with_retry(ctx: &FuzzContext, sql: &str) -> Result<()> {
|
||||
let mut delay = Duration::from_millis(100);
|
||||
let mut attempt = 0;
|
||||
@@ -236,9 +265,36 @@ async fn execute_repartition_table(ctx: FuzzContext, input: FuzzInput) -> Result
|
||||
inserted_rows: 0,
|
||||
}));
|
||||
|
||||
let mut action_start = 0;
|
||||
if table_ctx.partition.is_none() {
|
||||
let expr = alter_table_partitions_expr_from_table_ctx(&table_ctx, &mut rng, true);
|
||||
let translator = RepartitionExprTranslator;
|
||||
let sql = translator.translate(&expr)?;
|
||||
info!("Initial partition sql: {sql}");
|
||||
let result = sqlx::query(&sql)
|
||||
.execute(&ctx.greptime)
|
||||
.await
|
||||
.context(error::ExecuteQuerySnafu { sql: &sql })?;
|
||||
info!("Initial partition result: {result:?}");
|
||||
table_ctx = Arc::new(Arc::unwrap_or_clone(table_ctx).repartition(expr).unwrap());
|
||||
shared_state.lock().unwrap().table_ctx = table_ctx.clone();
|
||||
|
||||
let partition_entries = validator::partition::fetch_partitions_info_schema(
|
||||
&ctx.greptime,
|
||||
"public".into(),
|
||||
&table_ctx.name,
|
||||
)
|
||||
.await?;
|
||||
validator::partition::assert_partitions(
|
||||
table_ctx.partition.as_ref().unwrap(),
|
||||
&partition_entries,
|
||||
)?;
|
||||
action_start = 1;
|
||||
}
|
||||
|
||||
let writer_rng = ChaChaRng::seed_from_u64(input.seed);
|
||||
let writer_task = tokio::spawn(write_loop(writer_rng, ctx.clone(), shared_state.clone()));
|
||||
for i in 0..input.actions {
|
||||
for i in action_start..input.actions {
|
||||
let partition_num = table_ctx.partition.as_ref().unwrap().exprs.len();
|
||||
info!(
|
||||
"partition_num: {partition_num}, action: {}/{}",
|
||||
|
||||
@@ -34,14 +34,15 @@ use tests_fuzz::fake::{
|
||||
uppercase_and_keyword_backtick_map,
|
||||
};
|
||||
use tests_fuzz::generator::Generator;
|
||||
use tests_fuzz::generator::create_expr::CreateTableExprGeneratorBuilder;
|
||||
use tests_fuzz::generator::create_expr::{CreateTableExprGeneratorBuilder, generate_partition_def};
|
||||
use tests_fuzz::generator::insert_expr::InsertExprGeneratorBuilder;
|
||||
use tests_fuzz::generator::repartition_expr::{
|
||||
MergePartitionExprGeneratorBuilder, SplitPartitionExprGeneratorBuilder,
|
||||
};
|
||||
use tests_fuzz::ir::{
|
||||
CreateTableExpr, InsertIntoExpr, MySQLTsColumnTypeGenerator, RepartitionExpr, RowValue,
|
||||
SimplePartitions, generate_partition_value, generate_unique_timestamp_for_mysql_with_clock,
|
||||
AlterTablePartitionsExpr, CreateTableExpr, InsertIntoExpr, MySQLTsColumnTypeGenerator,
|
||||
PartitionDef, RepartitionExpr, RowValue, SimplePartitions, generate_partition_value,
|
||||
generate_unique_timestamp_for_mysql_with_clock,
|
||||
};
|
||||
use tests_fuzz::translator::DslTranslator;
|
||||
use tests_fuzz::translator::mysql::create_expr::CreateTableExprTranslator;
|
||||
@@ -93,13 +94,17 @@ impl Arbitrary<'_> for FuzzInput {
|
||||
let mut rng = ChaChaRng::seed_from_u64(seed);
|
||||
let rows = get_fuzz_override::<usize>("ROWS")
|
||||
.unwrap_or_else(|| rng.random_range(2..get_gt_fuzz_input_max_rows()));
|
||||
let partitions =
|
||||
get_fuzz_override::<usize>("PARTITIONS").unwrap_or_else(|| rng.random_range(2..8));
|
||||
let partitions = get_fuzz_override::<usize>("PARTITIONS").unwrap_or_else(|| {
|
||||
if rng.random_bool(0.5) {
|
||||
1
|
||||
} else {
|
||||
rng.random_range(2..8)
|
||||
}
|
||||
});
|
||||
let chaos_delay_ms =
|
||||
get_fuzz_override::<u64>("CHAOS_DELAY_MS").unwrap_or_else(|| rng.random_range(0..5000));
|
||||
let chaos_hold_secs =
|
||||
get_fuzz_override::<u64>("CHAOS_HOLD_SECS").unwrap_or_else(|| rng.random_range(10..20));
|
||||
|
||||
Ok(FuzzInput {
|
||||
seed,
|
||||
rows,
|
||||
@@ -127,6 +132,7 @@ fn generate_create_expr<R: Rng + 'static>(
|
||||
)))
|
||||
.columns(5)
|
||||
.partition(input.partitions)
|
||||
.partition_column(input.partitions <= 1)
|
||||
.engine("mito")
|
||||
.ts_column_type_generator(Box::new(MySQLTsColumnTypeGenerator))
|
||||
.build()
|
||||
@@ -144,7 +150,7 @@ fn build_insert_expr<R: Rng + 'static>(
|
||||
let ts_value_generator = generate_unique_timestamp_for_mysql_with_clock(clock.clone());
|
||||
let counter = Arc::new(AtomicUsize::new(0));
|
||||
let counter_clone = counter.clone();
|
||||
let partition_len = table_ctx.partition.as_ref().unwrap().exprs.len();
|
||||
let partition_len = partitions.bounds.len() + 1;
|
||||
let moved_partitions = partitions.clone();
|
||||
let insert_generator = InsertExprGeneratorBuilder::default()
|
||||
.table_ctx(table_ctx.clone())
|
||||
@@ -202,10 +208,12 @@ async fn create_table(ctx: &FuzzContext, expr: &CreateTableExpr) -> Result<Table
|
||||
async fn insert_initial_rows<R: Rng + 'static>(
|
||||
ctx: &FuzzContext,
|
||||
table_ctx: &TableContextRef,
|
||||
partition_def: &PartitionDef,
|
||||
rng: &mut R,
|
||||
rows: usize,
|
||||
) -> Result<u64> {
|
||||
let partitions = SimplePartitions::from_table_ctx(table_ctx).unwrap();
|
||||
let partitions =
|
||||
SimplePartitions::from_exprs(partition_def.columns[0].clone(), &partition_def.exprs)?;
|
||||
let clock = Arc::new(Mutex::new(Timestamp::current_millis()));
|
||||
let insert_expr = build_insert_expr(table_ctx, rng, &partitions, &clock, rows);
|
||||
let inserted_rows = insert_expr.values_list.len() as u64;
|
||||
@@ -260,6 +268,28 @@ fn repartition_operation<R: Rng + 'static>(
|
||||
}
|
||||
}
|
||||
|
||||
fn alter_table_partitions_expr(
|
||||
table_ctx: &TableContextRef,
|
||||
partition: PartitionDef,
|
||||
wait: bool,
|
||||
) -> RepartitionExpr {
|
||||
RepartitionExpr::AlterPartitions(AlterTablePartitionsExpr {
|
||||
table_name: table_ctx.name.clone(),
|
||||
partition,
|
||||
wait,
|
||||
})
|
||||
}
|
||||
|
||||
fn alter_table_partitions_expr_from_table_ctx<R: Rng + 'static>(
|
||||
table_ctx: &TableContextRef,
|
||||
rng: &mut R,
|
||||
wait: bool,
|
||||
) -> RepartitionExpr {
|
||||
let column = table_ctx.columns[0].clone();
|
||||
let partition = generate_partition_def(rng.random_range(2..8), column.column_type, column.name);
|
||||
alter_table_partitions_expr(table_ctx, partition, wait)
|
||||
}
|
||||
|
||||
async fn submit_repartition_procedure(ctx: &FuzzContext, expr: &RepartitionExpr) -> Result<String> {
|
||||
let translator = RepartitionExprTranslator;
|
||||
let sql = translator.translate(expr)?;
|
||||
@@ -334,10 +364,13 @@ async fn validate_terminal_metadata(
|
||||
after_table_ctx.partition.as_ref().unwrap(),
|
||||
&partition_entries,
|
||||
)?,
|
||||
ProcedureTerminalState::Failed => validator::partition::assert_partitions(
|
||||
before_table_ctx.partition.as_ref().unwrap(),
|
||||
&partition_entries,
|
||||
)?,
|
||||
ProcedureTerminalState::Failed => {
|
||||
if let Some(partition) = before_table_ctx.partition.as_ref() {
|
||||
validator::partition::assert_partitions(partition, &partition_entries)?;
|
||||
} else {
|
||||
validator::partition::assert_unpartitioned(&partition_entries)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -359,7 +392,21 @@ async fn execute_repartition_chaos(ctx: FuzzContext, input: FuzzInput) -> Result
|
||||
|
||||
let create_expr = generate_create_expr(&input, &mut rng)?;
|
||||
let before_table_ctx = create_table(&ctx, &create_expr).await?;
|
||||
let inserted_rows = insert_initial_rows(&ctx, &before_table_ctx, &mut rng, input.rows).await?;
|
||||
let insert_partition = create_expr.partition.clone().unwrap_or_else(|| {
|
||||
generate_partition_def(
|
||||
2,
|
||||
before_table_ctx.columns[0].column_type.clone(),
|
||||
before_table_ctx.columns[0].name.clone(),
|
||||
)
|
||||
});
|
||||
let inserted_rows = insert_initial_rows(
|
||||
&ctx,
|
||||
&before_table_ctx,
|
||||
&insert_partition,
|
||||
&mut rng,
|
||||
input.rows,
|
||||
)
|
||||
.await?;
|
||||
validate_table_rows(&ctx, &before_table_ctx, inserted_rows).await?;
|
||||
|
||||
let before_entries = validator::partition::fetch_partitions_info_schema(
|
||||
@@ -370,7 +417,11 @@ async fn execute_repartition_chaos(ctx: FuzzContext, input: FuzzInput) -> Result
|
||||
.await?;
|
||||
info!("Before repartition partition entries: {before_entries:?}");
|
||||
|
||||
let repartition_expr = repartition_operation(&before_table_ctx, &mut rng, false)?;
|
||||
let repartition_expr = if before_table_ctx.partition.is_some() {
|
||||
repartition_operation(&before_table_ctx, &mut rng, false)?
|
||||
} else {
|
||||
alter_table_partitions_expr_from_table_ctx(&before_table_ctx, &mut rng, false)
|
||||
};
|
||||
let after_table_ctx = Arc::new(
|
||||
Arc::unwrap_or_clone(before_table_ctx.clone())
|
||||
.repartition(repartition_expr.clone())
|
||||
|
||||
Reference in New Issue
Block a user