diff --git a/src/catalog/src/kvbackend/table_cache.rs b/src/catalog/src/kvbackend/table_cache.rs index 42b3fbc74b..13f74a48c9 100644 --- a/src/catalog/src/kvbackend/table_cache.rs +++ b/src/catalog/src/kvbackend/table_cache.rs @@ -14,7 +14,9 @@ use std::sync::Arc; -use common_meta::cache::{CacheContainer, Initializer, TableInfoCacheRef, TableNameCacheRef}; +use common_meta::cache::{ + CacheContainer, InitStrategy, Initializer, TableInfoCacheRef, TableNameCacheRef, +}; use common_meta::error::{Result as MetaResult, ValueNotExistSnafu}; use common_meta::instruction::CacheIdent; use futures::future::BoxFuture; @@ -38,7 +40,14 @@ pub fn new_table_cache( ) -> TableCache { let init = init_factory(table_info_cache, table_name_cache); - CacheContainer::new(name, cache, Box::new(invalidator), init, filter) + CacheContainer::with_strategy( + name, + cache, + Box::new(invalidator), + init, + filter, + InitStrategy::VersionChecked, + ) } fn init_factory( diff --git a/src/common/meta/src/cache.rs b/src/common/meta/src/cache.rs index f16290937a..c26a0fab76 100644 --- a/src/common/meta/src/cache.rs +++ b/src/common/meta/src/cache.rs @@ -17,7 +17,7 @@ mod flow; mod registry; mod table; -pub use container::{CacheContainer, Initializer, Invalidator, TokenFilter}; +pub use container::{CacheContainer, InitStrategy, Initializer, Invalidator, TokenFilter}; pub use flow::{TableFlownodeSetCache, TableFlownodeSetCacheRef, new_table_flownode_set_cache}; pub use registry::{ CacheRegistry, CacheRegistryBuilder, CacheRegistryRef, LayeredCacheRegistry, diff --git a/src/meta-srv/src/procedure/repartition.rs b/src/meta-srv/src/procedure/repartition.rs index c1819cb364..8c7c1dfeff 100644 --- a/src/meta-srv/src/procedure/repartition.rs +++ b/src/meta-srv/src/procedure/repartition.rs @@ -440,7 +440,17 @@ impl Context { }; let _ = self .cache_invalidator - .invalidate(&ctx, &[CacheIdent::TableId(table_id)]) + .invalidate( + &ctx, + &[ + CacheIdent::TableId(table_id), + CacheIdent::TableName(TableName { + catalog_name: self.persistent_ctx.catalog_name.clone(), + schema_name: self.persistent_ctx.schema_name.clone(), + table_name: self.persistent_ctx.table_name.clone(), + }), + ], + ) .await; Ok(()) } diff --git a/src/meta-srv/src/procedure/repartition/update_partition_metadata.rs b/src/meta-srv/src/procedure/repartition/update_partition_metadata.rs index cc9ca1c9bb..8d68a6e8fe 100644 --- a/src/meta-srv/src/procedure/repartition/update_partition_metadata.rs +++ b/src/meta-srv/src/procedure/repartition/update_partition_metadata.rs @@ -95,10 +95,19 @@ impl State for UpdatePartitionMetadata { let mut new_table_info = table_info_value.table_info.clone(); new_table_info.meta.partition_key_indices = partition_key_indices; + common_telemetry::info!( + "Update table partition metadata, table_id: {}, partition_key_indices: {:?}, partition_columns: {:?}", + table_id, + new_table_info.meta.partition_key_indices, + new_table_info + .meta + .partition_column_names() + .cloned() + .collect::>(), + ); ctx.update_table_info(&table_info_value, table_info_value.update(new_table_info)) .await?; - // We don't invalidate cache here because the subsequent AllocateRegion step - // will update the table route and invalidate the cache accordingly. + ctx.invalidate_table_cache().await?; Ok(( Box::new(AllocateRegion::new(self.plan_entries.clone())), diff --git a/tests-fuzz/src/context.rs b/tests-fuzz/src/context.rs index 2f65ad56aa..8043e166fd 100644 --- a/tests-fuzz/src/context.rs +++ b/tests-fuzz/src/context.rs @@ -200,6 +200,15 @@ impl TableContext { partitions.remove_bound(removed_idx)?; partition_def.exprs = partitions.generate()?; } + RepartitionExpr::AlterPartitions(partition) => { + ensure!( + self.partition.is_none(), + error::UnexpectedSnafu { + violated: format!("Table {} already has partition", self.name), + } + ); + self.partition = Some(partition.partition); + } } Ok(self) diff --git a/tests-fuzz/src/generator/create_expr.rs b/tests-fuzz/src/generator/create_expr.rs index 261a310db2..401c0fa9ff 100644 --- a/tests-fuzz/src/generator/create_expr.rs +++ b/tests-fuzz/src/generator/create_expr.rs @@ -44,6 +44,7 @@ pub struct CreateTableExprGenerator { #[builder(setter(into))] engine: String, partition: usize, + partition_column: bool, if_not_exists: bool, #[builder(setter(into))] name: Ident, @@ -67,6 +68,7 @@ impl Default for CreateTableExprGenerator { engine: DEFAULT_ENGINE.to_string(), if_not_exists: false, partition: 0, + partition_column: false, name: Ident::new(""), with_clause: HashMap::default(), name_generator: Box::new(MappedGenerator::new(WordGenerator, random_capitalize_map)), @@ -95,7 +97,7 @@ impl Generator for CreateTableExprGenerato let mut builder = CreateTableExprBuilder::default(); let mut columns = Vec::with_capacity(self.columns); let mut primary_keys = vec![]; - let need_partible_column = self.partition > 1; + let need_partible_column = self.partition > 1 || self.partition_column; let mut column_names = self.name_generator.choose(rng, self.columns); if self.columns == 1 { @@ -123,13 +125,15 @@ impl Generator for CreateTableExprGenerato ) .remove(0); - // Generates partition bounds. - let partition_def = generate_partition_def( - self.partition, - column.column_type.clone(), - name.clone(), - ); - builder.partition(partition_def); + if self.partition > 1 { + // Generates partition bounds. + let partition_def = generate_partition_def( + self.partition, + column.column_type.clone(), + name.clone(), + ); + builder.partition(partition_def); + } columns.push(column); } // Generates the ts column. @@ -178,11 +182,12 @@ impl Generator for CreateTableExprGenerato } } -fn generate_partition_def( +pub fn generate_partition_def( partitions: usize, column_type: ConcreteDataType, column_name: Ident, ) -> PartitionDef { + assert!(partitions > 1, "partitions must be greater than 1"); let bounds = generate_partition_bounds(&column_type, partitions - 1); let partitions = SimplePartitions::new(column_name.clone(), bounds); let partition_exprs = partitions.generate().unwrap(); @@ -193,24 +198,23 @@ fn generate_partition_def( } } -fn generate_metric_partition(partitions: usize) -> Option<(Column, PartitionDef)> { - if partitions <= 1 { - return None; - } - - let partition_column = Column { +fn metric_partition_column() -> Column { + Column { name: Ident::new("host"), column_type: ConcreteDataType::string_datatype(), options: vec![ColumnOption::PrimaryKey], - }; + } +} + +pub fn generate_metric_partition_def(partitions: usize) -> PartitionDef { + assert!(partitions > 1, "partitions must be greater than 1"); + let partition_column = metric_partition_column(); let bounds = generate_partition_bounds(&partition_column.column_type, partitions - 1); let partitions = SimplePartitions::new(partition_column.name.clone(), bounds); - let partition_def = PartitionDef { + PartitionDef { columns: vec![partitions.column_name.clone()], exprs: partitions.generate().unwrap(), - }; - - Some((partition_column, partition_def)) + } } /// Generate a physical table with 2 columns: ts of TimestampType::Millisecond as time index and val of Float64Type. @@ -223,6 +227,8 @@ pub struct CreatePhysicalTableExprGenerator { if_not_exists: bool, #[builder(default = "0")] partition: usize, + #[builder(default = "false")] + partition_column: bool, #[builder(default, setter(into))] with_clause: HashMap, } @@ -252,11 +258,13 @@ impl Generator for CreatePhysicalTableExpr let mut partition = None; let mut primary_keys = vec![]; - if let Some((partition_column, partition_def)) = generate_metric_partition(self.partition) { - columns.push(partition_column); - partition = Some(partition_def); + if self.partition > 1 || self.partition_column { + columns.push(metric_partition_column()); primary_keys.push(columns.len() - 1); } + if self.partition > 1 { + partition = Some(generate_metric_partition_def(self.partition)); + } Ok(CreateTableExpr { table_name: self.name_generator.generate(rng), @@ -387,6 +395,7 @@ mod tests { use super::*; use crate::context::TableContext; + use crate::ir::PARTIBLE_DATA_TYPES; #[test] fn test_float64() { @@ -423,6 +432,18 @@ mod tests { .unwrap(); assert_eq!(expr.columns.len(), 10); assert!(expr.partition.is_none()); + + let expr = CreateTableExprGeneratorBuilder::default() + .columns(10) + .partition(1) + .partition_column(true) + .build() + .unwrap() + .generate(&mut rng) + .unwrap(); + assert_eq!(expr.columns.len(), 10); + assert!(expr.partition.is_none()); + assert!(PARTIBLE_DATA_TYPES.contains(&expr.columns[0].column_type)); } #[test] @@ -516,6 +537,25 @@ mod tests { assert_eq!(physical_table_expr.partition.unwrap().exprs.len(), 3); } + #[test] + fn test_create_physical_table_expr_generator_with_partition_column() { + let mut rng = rand::rng(); + let physical_table_expr = CreatePhysicalTableExprGeneratorBuilder::default() + .partition(1) + .partition_column(true) + .if_not_exists(false) + .build() + .unwrap() + .generate(&mut rng) + .unwrap(); + + assert_eq!(physical_table_expr.engine, "metric"); + assert!(physical_table_expr.partition.is_none()); + assert_eq!(physical_table_expr.columns.len(), 3); + assert_eq!(physical_table_expr.columns[2].name, Ident::new("host")); + assert_eq!(physical_table_expr.primary_keys, vec![2]); + } + #[test] fn test_create_logical_table_expr_generator_without_partition_column() { let mut rng = rand::rng(); diff --git a/tests-fuzz/src/ir.rs b/tests-fuzz/src/ir.rs index ce1628cd61..e0b57ad0d1 100644 --- a/tests-fuzz/src/ir.rs +++ b/tests-fuzz/src/ir.rs @@ -30,7 +30,7 @@ use std::time::Duration; pub use alter_expr::{AlterTableExpr, AlterTableOption}; use common_time::timestamp::TimeUnit; use common_time::{Date, Timestamp}; -pub use create_expr::{CreateDatabaseExpr, CreateTableExpr}; +pub use create_expr::{CreateDatabaseExpr, CreateTableExpr, PartitionDef}; use datatypes::data_type::ConcreteDataType; use datatypes::types::TimestampType; use datatypes::value::Value; @@ -40,7 +40,7 @@ use lazy_static::lazy_static; pub use partition_expr::SimplePartitions; use rand::Rng; use rand::seq::{IndexedRandom, SliceRandom}; -pub use repartition_expr::RepartitionExpr; +pub use repartition_expr::{AlterTablePartitionsExpr, RepartitionExpr}; use serde::{Deserialize, Serialize}; use self::insert_expr::RowValues; diff --git a/tests-fuzz/src/ir/repartition_expr.rs b/tests-fuzz/src/ir/repartition_expr.rs index 5c8b401c8d..d17ed90d2c 100644 --- a/tests-fuzz/src/ir/repartition_expr.rs +++ b/tests-fuzz/src/ir/repartition_expr.rs @@ -16,6 +16,7 @@ use partition::expr::PartitionExpr; use serde::{Deserialize, Serialize}; use crate::ir::Ident; +use crate::ir::create_expr::PartitionDef; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SplitPartitionExpr { @@ -34,10 +35,19 @@ pub struct MergePartitionExpr { pub wait: bool, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AlterTablePartitionsExpr { + pub table_name: Ident, + pub partition: PartitionDef, + #[serde(default = "default_wait")] + pub wait: bool, +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub enum RepartitionExpr { Split(SplitPartitionExpr), Merge(MergePartitionExpr), + AlterPartitions(AlterTablePartitionsExpr), } const fn default_wait() -> bool { diff --git a/tests-fuzz/src/translator/mysql/repartition_expr.rs b/tests-fuzz/src/translator/mysql/repartition_expr.rs index 56c27594b3..43ca8e57ef 100644 --- a/tests-fuzz/src/translator/mysql/repartition_expr.rs +++ b/tests-fuzz/src/translator/mysql/repartition_expr.rs @@ -15,7 +15,10 @@ use partition::expr::PartitionExpr; use crate::error::Result; -use crate::ir::repartition_expr::{MergePartitionExpr, RepartitionExpr, SplitPartitionExpr}; +use crate::ir::create_expr::PartitionDef; +use crate::ir::repartition_expr::{ + AlterTablePartitionsExpr, MergePartitionExpr, RepartitionExpr, SplitPartitionExpr, +}; use crate::translator::DslTranslator; pub struct RepartitionExprTranslator; @@ -59,10 +62,38 @@ impl DslTranslator for RepartitionExprTranslator { table_name, merge_exprs, wait_clause )) } + RepartitionExpr::AlterPartitions(AlterTablePartitionsExpr { + table_name, + partition, + wait, + }) => { + let partition_clause = format_partition_clause(partition); + let wait_clause = format_wait_clause(*wait); + Ok(format!( + "ALTER TABLE {} {}{};", + table_name, partition_clause, wait_clause + )) + } } } } +fn format_partition_clause(partition: &PartitionDef) -> String { + let columns = partition + .columns + .iter() + .map(|column| column.to_string()) + .collect::>() + .join(", "); + let exprs = partition + .exprs + .iter() + .map(format_partition_expr_sql) + .collect::>() + .join(",\n "); + format!("PARTITION ON COLUMNS ({columns}) (\n {exprs}\n)") +} + fn format_partition_expr_sql(expr: &PartitionExpr) -> String { expr.to_parser_expr().to_string() } @@ -79,9 +110,15 @@ fn format_wait_clause(wait: bool) -> String { mod tests { use datatypes::value::Value; use partition::expr::col; + use sql::dialect::GreptimeDbDialect; + use sql::parser::{ParseOptions, ParserContext}; use super::RepartitionExprTranslator; - use crate::ir::repartition_expr::{MergePartitionExpr, RepartitionExpr, SplitPartitionExpr}; + use crate::ir::Ident; + use crate::ir::create_expr::PartitionDef; + use crate::ir::repartition_expr::{ + AlterTablePartitionsExpr, MergePartitionExpr, RepartitionExpr, SplitPartitionExpr, + }; use crate::translator::DslTranslator; #[test] @@ -149,4 +186,61 @@ mod tests { );"#; assert_eq!(sql, expected); } + + #[test] + fn test_translate_alter_table_partitions_expr() { + let expr = RepartitionExpr::AlterPartitions(AlterTablePartitionsExpr { + table_name: "demo".into(), + partition: PartitionDef { + columns: vec![Ident::new("id")], + exprs: vec![ + col("id").lt(Value::Int32(10)), + col("id") + .gt_eq(Value::Int32(10)) + .and(col("id").lt(Value::Int32(20))), + col("id").gt_eq(Value::Int32(20)), + ], + }, + wait: true, + }); + let sql = RepartitionExprTranslator.translate(&expr).unwrap(); + let expected = r#"ALTER TABLE demo PARTITION ON COLUMNS (id) ( + id < 10, + id >= 10 AND id < 20, + id >= 20 +);"#; + assert_eq!(sql, expected); + assert_repartition_sql_parseable(&sql); + } + + #[test] + fn test_translate_alter_table_partitions_expr_wait_false() { + let expr = RepartitionExpr::AlterPartitions(AlterTablePartitionsExpr { + table_name: "demo".into(), + partition: PartitionDef { + columns: vec![Ident::new("host")], + exprs: vec![ + col("host").lt(Value::from("m")), + col("host").gt_eq(Value::from("m")), + ], + }, + wait: false, + }); + let sql = RepartitionExprTranslator.translate(&expr).unwrap(); + let expected = r#"ALTER TABLE demo PARTITION ON COLUMNS (host) ( + host < 'm', + host >= 'm' +) WITH ( + WAIT = false +);"#; + assert_eq!(sql, expected); + assert_repartition_sql_parseable(&sql); + } + + fn assert_repartition_sql_parseable(sql: &str) { + let statements = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap(); + assert_eq!(statements.len(), 1); + } } diff --git a/tests-fuzz/src/validator/partition.rs b/tests-fuzz/src/validator/partition.rs index 3fbc8e7f86..581fb8a635 100644 --- a/tests-fuzz/src/validator/partition.rs +++ b/tests-fuzz/src/validator/partition.rs @@ -21,7 +21,8 @@ use crate::ir::Ident; use crate::ir::create_expr::PartitionDef; const PARTITIONS_INFO_SCHEMA_SQL: &str = "SELECT table_catalog, table_schema, table_name, \ -partition_name, partition_expression, partition_description, greptime_partition_id, \ +partition_name, COALESCE(partition_expression, '') AS partition_expression, \ +COALESCE(partition_description, '') AS partition_description, greptime_partition_id, \ partition_ordinal_position FROM information_schema.partitions WHERE table_name = ? \ ORDER BY partition_ordinal_position;"; @@ -91,3 +92,20 @@ pub fn assert_partitions(expected: &PartitionDef, actual: &[PartitionInfo]) -> R Ok(()) } + +/// Asserts that the table has no partition metadata in information schema. +pub fn assert_unpartitioned(actual: &[PartitionInfo]) -> Result<()> { + let has_no_partition_metadata = actual.is_empty() + || (actual.len() == 1 + && actual[0].partition_expression.is_empty() + && actual[0].partition_description.is_empty()); + + ensure!( + has_no_partition_metadata, + error::AssertSnafu { + reason: format!("Expected unpartitioned table, got partitions: {actual:?}"), + } + ); + + Ok(()) +} diff --git a/tests-fuzz/targets/ddl/fuzz_repartition_metric_table.rs b/tests-fuzz/targets/ddl/fuzz_repartition_metric_table.rs index 7932bc7759..8a6bd81fa8 100644 --- a/tests-fuzz/targets/ddl/fuzz_repartition_metric_table.rs +++ b/tests-fuzz/targets/ddl/fuzz_repartition_metric_table.rs @@ -36,14 +36,15 @@ use tests_fuzz::fake::{ use tests_fuzz::generator::Generator; use tests_fuzz::generator::create_expr::{ CreateLogicalTableExprGeneratorBuilder, CreatePhysicalTableExprGeneratorBuilder, + generate_metric_partition_def, }; use tests_fuzz::generator::insert_expr::InsertExprGeneratorBuilder; use tests_fuzz::generator::repartition_expr::{ MergePartitionExprGeneratorBuilder, SplitPartitionExprGeneratorBuilder, }; use tests_fuzz::ir::{ - CreateTableExpr, Ident, InsertIntoExpr, RepartitionExpr, generate_random_value, - generate_unique_timestamp_for_mysql_with_clock, + AlterTablePartitionsExpr, CreateTableExpr, Ident, InsertIntoExpr, PartitionDef, + RepartitionExpr, generate_random_value, generate_unique_timestamp_for_mysql_with_clock, }; use tests_fuzz::translator::DslTranslator; use tests_fuzz::translator::csv::InsertExprToCsvRecordsTranslator; @@ -94,6 +95,7 @@ fn generate_create_physical_table_expr( )))) .if_not_exists(rng.random_bool(0.5)) .partition(partitions) + .partition_column(partitions <= 1) .build() .unwrap() .generate(rng) @@ -158,12 +160,6 @@ async fn create_metric_tables( })?; info!("Create physical table: {create_physical_sql}, result: {result:?}"); let physical_table_ctx = Arc::new(TableContext::from(&create_physical_expr)); - ensure!( - physical_table_ctx.partition.is_some(), - error::AssertSnafu { - reason: "Physical metric table must have partition".to_string() - } - ); let mut logical_tables = BTreeMap::new(); let mut create_logical_sqls = HashMap::new(); @@ -436,6 +432,11 @@ fn repartition_operation( table_ctx: &TableContextRef, rng: &mut R, ) -> Result { + if table_ctx.partition.is_none() { + let partition = generate_metric_partition_def(rng.random_range(2..8)); + return Ok(alter_table_partitions_expr(table_ctx, partition, true)); + } + let split = rng.random_bool(0.5); if table_ctx.partition.as_ref().unwrap().exprs.len() <= 2 || split { let expr = SplitPartitionExprGeneratorBuilder::default() @@ -454,19 +455,35 @@ fn repartition_operation( } } +fn alter_table_partitions_expr( + table_ctx: &TableContextRef, + partition: PartitionDef, + wait: bool, +) -> RepartitionExpr { + RepartitionExpr::AlterPartitions(AlterTablePartitionsExpr { + table_name: table_ctx.name.clone(), + partition, + wait, + }) +} + impl Arbitrary<'_> for FuzzInput { fn arbitrary(u: &mut Unstructured<'_>) -> arbitrary::Result { let seed = get_fuzz_override::("SEED").unwrap_or(u.int_in_range(u64::MIN..=u64::MAX)?); let mut rng = ChaChaRng::seed_from_u64(seed); - let partitions = - get_fuzz_override::("PARTITIONS").unwrap_or_else(|| rng.random_range(2..8)); + let partitions = get_fuzz_override::("PARTITIONS").unwrap_or_else(|| { + if rng.random_bool(0.5) { + 1 + } else { + rng.random_range(2..8) + } + }); let max_tables = get_gt_fuzz_input_max_tables(); let tables = get_fuzz_override::("TABLES") .unwrap_or_else(|| rng.random_range(1..=std::cmp::max(1, max_tables))); - let max_actions = get_gt_fuzz_input_max_alter_actions(); + let max_actions = std::cmp::min(128, get_gt_fuzz_input_max_alter_actions()); let actions = get_fuzz_override::("ACTIONS") .unwrap_or_else(|| rng.random_range(1..max_actions)); - Ok(FuzzInput { seed, actions, @@ -536,7 +553,11 @@ async fn execute_repartition_metric_table(ctx: FuzzContext, input: FuzzInput) -> tokio::time::sleep(Duration::from_millis(100)).await; for i in 0..input.actions { - let partition_num = physical_table_ctx.partition.as_ref().unwrap().exprs.len(); + let partition_num = physical_table_ctx + .partition + .as_ref() + .map(|partition| partition.exprs.len()) + .unwrap_or_default(); info!( "partition_num: {partition_num}, action: {}/{}, table: {}, logical table num: {}", i + 1, diff --git a/tests-fuzz/targets/ddl/fuzz_repartition_table.rs b/tests-fuzz/targets/ddl/fuzz_repartition_table.rs index d4b9e9fd7a..4f6a014f2e 100644 --- a/tests-fuzz/targets/ddl/fuzz_repartition_table.rs +++ b/tests-fuzz/targets/ddl/fuzz_repartition_table.rs @@ -33,14 +33,15 @@ use tests_fuzz::fake::{ uppercase_and_keyword_backtick_map, }; use tests_fuzz::generator::Generator; -use tests_fuzz::generator::create_expr::CreateTableExprGeneratorBuilder; +use tests_fuzz::generator::create_expr::{CreateTableExprGeneratorBuilder, generate_partition_def}; use tests_fuzz::generator::insert_expr::InsertExprGeneratorBuilder; use tests_fuzz::generator::repartition_expr::{ MergePartitionExprGeneratorBuilder, SplitPartitionExprGeneratorBuilder, }; use tests_fuzz::ir::{ - CreateTableExpr, InsertIntoExpr, MySQLTsColumnTypeGenerator, RepartitionExpr, RowValue, - SimplePartitions, generate_partition_value, generate_unique_timestamp_for_mysql_with_clock, + AlterTablePartitionsExpr, CreateTableExpr, InsertIntoExpr, MySQLTsColumnTypeGenerator, + PartitionDef, RepartitionExpr, RowValue, SimplePartitions, generate_partition_value, + generate_unique_timestamp_for_mysql_with_clock, }; use tests_fuzz::translator::DslTranslator; use tests_fuzz::translator::mysql::create_expr::CreateTableExprTranslator; @@ -75,8 +76,13 @@ impl Arbitrary<'_> for FuzzInput { fn arbitrary(u: &mut Unstructured<'_>) -> arbitrary::Result { let seed = get_fuzz_override::("SEED").unwrap_or(u.int_in_range(u64::MIN..=u64::MAX)?); let mut rng = ChaChaRng::seed_from_u64(seed); - let partitions = - get_fuzz_override::("PARTITIONS").unwrap_or_else(|| rng.random_range(2..8)); + let partitions = get_fuzz_override::("PARTITIONS").unwrap_or_else(|| { + if rng.random_bool(0.5) { + 1 + } else { + rng.random_range(2..8) + } + }); let max_actions = get_gt_fuzz_input_max_alter_actions(); let actions = get_fuzz_override::("ACTIONS") .unwrap_or_else(|| rng.random_range(1..max_actions)); @@ -99,6 +105,7 @@ fn generate_create_expr( ))) .columns(5) .partition(input.partitions) + .partition_column(input.partitions <= 1) .engine("mito") .ts_column_type_generator(Box::new(MySQLTsColumnTypeGenerator)) .build() @@ -122,7 +129,7 @@ fn build_insert_expr( let ts_value_generator = generate_unique_timestamp_for_mysql_with_clock(clock.clone()); let counter = Arc::new(AtomicUsize::new(0)); let counter_clone = counter.clone(); - let partition_len = table_ctx.partition.as_ref().unwrap().exprs.len(); + let partition_len = partitions.bounds.len() + 1; let row = rng.random_range(partition_len..partition_len * 2); let moved_partitions = partitions.clone(); @@ -150,6 +157,28 @@ fn build_insert_expr( insert_generator.generate(rng).unwrap() } +fn alter_table_partitions_expr( + table_ctx: &TableContextRef, + partition: PartitionDef, + wait: bool, +) -> RepartitionExpr { + RepartitionExpr::AlterPartitions(AlterTablePartitionsExpr { + table_name: table_ctx.name.clone(), + partition, + wait, + }) +} + +fn alter_table_partitions_expr_from_table_ctx( + table_ctx: &TableContextRef, + rng: &mut R, + wait: bool, +) -> RepartitionExpr { + let column = table_ctx.columns[0].clone(); + let partition = generate_partition_def(rng.random_range(2..8), column.column_type, column.name); + alter_table_partitions_expr(table_ctx, partition, wait) +} + async fn execute_insert_with_retry(ctx: &FuzzContext, sql: &str) -> Result<()> { let mut delay = Duration::from_millis(100); let mut attempt = 0; @@ -236,9 +265,36 @@ async fn execute_repartition_table(ctx: FuzzContext, input: FuzzInput) -> Result inserted_rows: 0, })); + let mut action_start = 0; + if table_ctx.partition.is_none() { + let expr = alter_table_partitions_expr_from_table_ctx(&table_ctx, &mut rng, true); + let translator = RepartitionExprTranslator; + let sql = translator.translate(&expr)?; + info!("Initial partition sql: {sql}"); + let result = sqlx::query(&sql) + .execute(&ctx.greptime) + .await + .context(error::ExecuteQuerySnafu { sql: &sql })?; + info!("Initial partition result: {result:?}"); + table_ctx = Arc::new(Arc::unwrap_or_clone(table_ctx).repartition(expr).unwrap()); + shared_state.lock().unwrap().table_ctx = table_ctx.clone(); + + let partition_entries = validator::partition::fetch_partitions_info_schema( + &ctx.greptime, + "public".into(), + &table_ctx.name, + ) + .await?; + validator::partition::assert_partitions( + table_ctx.partition.as_ref().unwrap(), + &partition_entries, + )?; + action_start = 1; + } + let writer_rng = ChaChaRng::seed_from_u64(input.seed); let writer_task = tokio::spawn(write_loop(writer_rng, ctx.clone(), shared_state.clone())); - for i in 0..input.actions { + for i in action_start..input.actions { let partition_num = table_ctx.partition.as_ref().unwrap().exprs.len(); info!( "partition_num: {partition_num}, action: {}/{}", diff --git a/tests-fuzz/targets/ddl/fuzz_repartition_table_chaos.rs b/tests-fuzz/targets/ddl/fuzz_repartition_table_chaos.rs index d3789b696c..9d8faeebf5 100644 --- a/tests-fuzz/targets/ddl/fuzz_repartition_table_chaos.rs +++ b/tests-fuzz/targets/ddl/fuzz_repartition_table_chaos.rs @@ -34,14 +34,15 @@ use tests_fuzz::fake::{ uppercase_and_keyword_backtick_map, }; use tests_fuzz::generator::Generator; -use tests_fuzz::generator::create_expr::CreateTableExprGeneratorBuilder; +use tests_fuzz::generator::create_expr::{CreateTableExprGeneratorBuilder, generate_partition_def}; use tests_fuzz::generator::insert_expr::InsertExprGeneratorBuilder; use tests_fuzz::generator::repartition_expr::{ MergePartitionExprGeneratorBuilder, SplitPartitionExprGeneratorBuilder, }; use tests_fuzz::ir::{ - CreateTableExpr, InsertIntoExpr, MySQLTsColumnTypeGenerator, RepartitionExpr, RowValue, - SimplePartitions, generate_partition_value, generate_unique_timestamp_for_mysql_with_clock, + AlterTablePartitionsExpr, CreateTableExpr, InsertIntoExpr, MySQLTsColumnTypeGenerator, + PartitionDef, RepartitionExpr, RowValue, SimplePartitions, generate_partition_value, + generate_unique_timestamp_for_mysql_with_clock, }; use tests_fuzz::translator::DslTranslator; use tests_fuzz::translator::mysql::create_expr::CreateTableExprTranslator; @@ -93,13 +94,17 @@ impl Arbitrary<'_> for FuzzInput { let mut rng = ChaChaRng::seed_from_u64(seed); let rows = get_fuzz_override::("ROWS") .unwrap_or_else(|| rng.random_range(2..get_gt_fuzz_input_max_rows())); - let partitions = - get_fuzz_override::("PARTITIONS").unwrap_or_else(|| rng.random_range(2..8)); + let partitions = get_fuzz_override::("PARTITIONS").unwrap_or_else(|| { + if rng.random_bool(0.5) { + 1 + } else { + rng.random_range(2..8) + } + }); let chaos_delay_ms = get_fuzz_override::("CHAOS_DELAY_MS").unwrap_or_else(|| rng.random_range(0..5000)); let chaos_hold_secs = get_fuzz_override::("CHAOS_HOLD_SECS").unwrap_or_else(|| rng.random_range(10..20)); - Ok(FuzzInput { seed, rows, @@ -127,6 +132,7 @@ fn generate_create_expr( ))) .columns(5) .partition(input.partitions) + .partition_column(input.partitions <= 1) .engine("mito") .ts_column_type_generator(Box::new(MySQLTsColumnTypeGenerator)) .build() @@ -144,7 +150,7 @@ fn build_insert_expr( let ts_value_generator = generate_unique_timestamp_for_mysql_with_clock(clock.clone()); let counter = Arc::new(AtomicUsize::new(0)); let counter_clone = counter.clone(); - let partition_len = table_ctx.partition.as_ref().unwrap().exprs.len(); + let partition_len = partitions.bounds.len() + 1; let moved_partitions = partitions.clone(); let insert_generator = InsertExprGeneratorBuilder::default() .table_ctx(table_ctx.clone()) @@ -202,10 +208,12 @@ async fn create_table(ctx: &FuzzContext, expr: &CreateTableExpr) -> Result( ctx: &FuzzContext, table_ctx: &TableContextRef, + partition_def: &PartitionDef, rng: &mut R, rows: usize, ) -> Result { - let partitions = SimplePartitions::from_table_ctx(table_ctx).unwrap(); + let partitions = + SimplePartitions::from_exprs(partition_def.columns[0].clone(), &partition_def.exprs)?; let clock = Arc::new(Mutex::new(Timestamp::current_millis())); let insert_expr = build_insert_expr(table_ctx, rng, &partitions, &clock, rows); let inserted_rows = insert_expr.values_list.len() as u64; @@ -260,6 +268,28 @@ fn repartition_operation( } } +fn alter_table_partitions_expr( + table_ctx: &TableContextRef, + partition: PartitionDef, + wait: bool, +) -> RepartitionExpr { + RepartitionExpr::AlterPartitions(AlterTablePartitionsExpr { + table_name: table_ctx.name.clone(), + partition, + wait, + }) +} + +fn alter_table_partitions_expr_from_table_ctx( + table_ctx: &TableContextRef, + rng: &mut R, + wait: bool, +) -> RepartitionExpr { + let column = table_ctx.columns[0].clone(); + let partition = generate_partition_def(rng.random_range(2..8), column.column_type, column.name); + alter_table_partitions_expr(table_ctx, partition, wait) +} + async fn submit_repartition_procedure(ctx: &FuzzContext, expr: &RepartitionExpr) -> Result { let translator = RepartitionExprTranslator; let sql = translator.translate(expr)?; @@ -334,10 +364,13 @@ async fn validate_terminal_metadata( after_table_ctx.partition.as_ref().unwrap(), &partition_entries, )?, - ProcedureTerminalState::Failed => validator::partition::assert_partitions( - before_table_ctx.partition.as_ref().unwrap(), - &partition_entries, - )?, + ProcedureTerminalState::Failed => { + if let Some(partition) = before_table_ctx.partition.as_ref() { + validator::partition::assert_partitions(partition, &partition_entries)?; + } else { + validator::partition::assert_unpartitioned(&partition_entries)?; + } + } } Ok(()) @@ -359,7 +392,21 @@ async fn execute_repartition_chaos(ctx: FuzzContext, input: FuzzInput) -> Result let create_expr = generate_create_expr(&input, &mut rng)?; let before_table_ctx = create_table(&ctx, &create_expr).await?; - let inserted_rows = insert_initial_rows(&ctx, &before_table_ctx, &mut rng, input.rows).await?; + let insert_partition = create_expr.partition.clone().unwrap_or_else(|| { + generate_partition_def( + 2, + before_table_ctx.columns[0].column_type.clone(), + before_table_ctx.columns[0].name.clone(), + ) + }); + let inserted_rows = insert_initial_rows( + &ctx, + &before_table_ctx, + &insert_partition, + &mut rng, + input.rows, + ) + .await?; validate_table_rows(&ctx, &before_table_ctx, inserted_rows).await?; let before_entries = validator::partition::fetch_partitions_info_schema( @@ -370,7 +417,11 @@ async fn execute_repartition_chaos(ctx: FuzzContext, input: FuzzInput) -> Result .await?; info!("Before repartition partition entries: {before_entries:?}"); - let repartition_expr = repartition_operation(&before_table_ctx, &mut rng, false)?; + let repartition_expr = if before_table_ctx.partition.is_some() { + repartition_operation(&before_table_ctx, &mut rng, false)? + } else { + alter_table_partitions_expr_from_table_ctx(&before_table_ctx, &mut rng, false) + }; let after_table_ctx = Arc::new( Arc::unwrap_or_clone(before_table_ctx.clone()) .repartition(repartition_expr.clone())