diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 550887e315..8dceeb2e5a 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use std::time::Duration; use api::v1::alter_table_expr::Kind; -use api::v1::repartition::Source; +use api::v1::repartition::Source as PbRepartitionSource; use api::v1::{PartitionExprs, Repartition}; use common_error::ext::BoxedError; use common_procedure::{ @@ -49,7 +49,7 @@ use crate::error::{ self, CreateRepartitionProcedureSnafu, EmptyDdlTasksSnafu, ProcedureOutputSnafu, RegisterProcedureLoaderSnafu, RegisterRepartitionProcedureLoaderSnafu, Result, SubmitProcedureSnafu, TableInfoNotFoundSnafu, TableNotFoundSnafu, TableRouteNotFoundSnafu, - UnexpectedLogicalRouteTableSnafu, UnexpectedSnafu, WaitProcedureSnafu, + UnexpectedLogicalRouteTableSnafu, WaitProcedureSnafu, }; use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; @@ -152,13 +152,18 @@ macro_rules! procedure_loader { pub type RepartitionProcedureFactoryRef = Arc; +pub enum RepartitionSource { + Partitioned { exprs: Vec }, + Unpartitioned { partition_columns: Vec }, +} + pub trait RepartitionProcedureFactory: Send + Sync { fn create( &self, ddl_ctx: &DdlContext, table_name: TableName, table_id: TableId, - from_exprs: Vec, + source: RepartitionSource, to_exprs: Vec, timeout: Option, ) -> std::result::Result; @@ -290,18 +295,19 @@ impl DdlManager { let into_partition_exprs = repartition.into_partition_exprs; let source = repartition.source; - let from_partition_exprs = match source { - Some(Source::PartitionExprs(PartitionExprs { exprs })) => exprs, - Some(Source::Unpartitioned(_)) => { - return UnexpectedSnafu { - err_msg: "Unpartitioned repartition source is not supported yet".to_string(), - } - .fail(); + let source = match source { + Some(PbRepartitionSource::PartitionExprs(PartitionExprs { exprs })) => { + RepartitionSource::Partitioned { exprs } } + Some(PbRepartitionSource::Unpartitioned(source)) => RepartitionSource::Unpartitioned { + partition_columns: source.partition_columns, + }, None => { // Reads the deprecated field for backward compatibility with old persisted DDL tasks. #[allow(deprecated)] - repartition.from_partition_exprs + RepartitionSource::Partitioned { + exprs: repartition.from_partition_exprs, + } } }; @@ -311,7 +317,7 @@ impl DdlManager { &context, table_name, table_id, - from_partition_exprs, + source, into_partition_exprs, Some(timeout), ) @@ -1124,7 +1130,7 @@ mod tests { use crate::ddl::table_meta::TableMetadataAllocator; use crate::ddl::truncate_table::TruncateTableProcedure; use crate::ddl::{DdlContext, NoopRegionFailureDetectorControl}; - use crate::ddl_manager::RepartitionProcedureFactory; + use crate::ddl_manager::{RepartitionProcedureFactory, RepartitionSource}; use crate::key::TableMetadataManager; use crate::key::flow::FlowMetadataManager; use crate::kv_backend::memory::MemoryKvBackend; @@ -1162,7 +1168,7 @@ mod tests { _ddl_ctx: &DdlContext, _table_name: TableName, _table_id: TableId, - _from_exprs: Vec, + _source: RepartitionSource, _to_exprs: Vec, _timeout: Option, ) -> std::result::Result { diff --git a/src/meta-srv/src/procedure/repartition.rs b/src/meta-srv/src/procedure/repartition.rs index be060b7424..c1819cb364 100644 --- a/src/meta-srv/src/procedure/repartition.rs +++ b/src/meta-srv/src/procedure/repartition.rs @@ -20,6 +20,7 @@ pub mod group; pub mod plan; pub mod repartition_end; pub mod repartition_start; +pub mod update_partition_metadata; pub mod utils; use std::any::Any; @@ -32,7 +33,7 @@ use common_meta::cache_invalidator::CacheInvalidatorRef; use common_meta::ddl::DdlContext; use common_meta::ddl::allocator::region_routes::RegionRoutesAllocatorRef; use common_meta::ddl::allocator::wal_options::WalOptionsAllocatorRef; -use common_meta::ddl_manager::RepartitionProcedureFactory; +use common_meta::ddl_manager::{RepartitionProcedureFactory, RepartitionSource}; use common_meta::instruction::CacheIdent; use common_meta::key::datanode_table::RegionInfo; use common_meta::key::table_info::TableInfoValue; @@ -62,7 +63,8 @@ use crate::procedure::repartition::group::{ Context as RepartitionGroupContext, RepartitionGroupProcedure, region_routes, }; use crate::procedure::repartition::plan::RepartitionPlanEntry; -use crate::procedure::repartition::repartition_start::RepartitionStart; +use crate::procedure::repartition::repartition_start::{RepartitionFrom, RepartitionStart}; +use crate::procedure::repartition::update_partition_metadata::PartitionMetadataUpdate; use crate::procedure::repartition::utils::{ get_datanode_table_value, rollback_group_metadata_routes, }; @@ -93,6 +95,9 @@ pub struct PersistentContext { /// The timeout for repartition operations. #[serde(with = "humantime_serde", default = "default_timeout")] pub timeout: Duration, + #[serde(default)] + /// Records table-level partition metadata added by this repartition. + pub partition_metadata_update: Option, } fn default_timeout() -> Duration { @@ -121,6 +126,7 @@ impl PersistentContext { failed_procedures: vec![], unknown_procedures: vec![], timeout: timeout.unwrap_or_else(default_timeout), + partition_metadata_update: None, } } @@ -317,7 +323,9 @@ impl Context { /// /// Abort: /// - Table info not found. - pub async fn get_table_info_value(&self) -> Result { + pub async fn get_raw_table_info_value( + &self, + ) -> Result> { let table_id = self.persistent_ctx.table_id; let table_info_value = self .table_metadata_manager @@ -328,11 +336,36 @@ impl Context { .with_context(|_| error::RetryLaterWithSourceSnafu { reason: format!("Failed to get table info for table: {}", table_id), })? - .context(error::TableInfoNotFoundSnafu { table_id })? - .into_inner(); + .context(error::TableInfoNotFoundSnafu { table_id })?; + Ok(table_info_value) } + pub async fn get_table_info_value(&self) -> Result { + let table_info_value = self.get_raw_table_info_value().await?.into_inner(); + Ok(table_info_value) + } + + /// Updates the table info. + pub async fn update_table_info( + &self, + current_table_info_value: &DeserializedValueWithBytes, + new_table_info_value: TableInfoValue, + ) -> Result<()> { + let table_id = self.persistent_ctx.table_id; + self.table_metadata_manager + .update_table_info( + current_table_info_value, + None, + new_table_info_value.table_info, + ) + .await + .map_err(BoxedError::new) + .with_context(|_| error::RetryLaterWithSourceSnafu { + reason: format!("Failed to update table info for table: {}", table_id), + }) + } + /// Updates the table route. /// /// Retry: @@ -469,12 +502,8 @@ struct RepartitionDataOwned { impl RepartitionProcedure { const TYPE_NAME: &'static str = "metasrv-procedure::Repartition"; - pub fn new( - from_exprs: Vec, - to_exprs: Vec, - context: Context, - ) -> Self { - let state = Box::new(RepartitionStart::new(from_exprs, to_exprs)); + pub fn new(from: RepartitionFrom, to_exprs: Vec, context: Context) -> Self { + let state = Box::new(RepartitionStart::new(from, to_exprs)); Self { state, context } } @@ -492,24 +521,24 @@ impl RepartitionProcedure { Ok(Self { state, context }) } - /// Returns whether parent rollback should remove this repartition's allocated regions. + /// Returns whether parent rollback should run. /// - /// This uses an "after AllocateRegion" semantic: once execution reaches - /// `AllocateRegion` or any later state, rollback must try to remove this round's - /// `allocated_region_ids` from table-route metadata when they exist. - /// - /// State flow: - /// `RepartitionStart -> AllocateRegion -> Dispatch -> Collect -> DeallocateRegion -> RepartitionEnd` - /// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - /// rollback allocated regions in metadata + /// This uses an "after repartition metadata update" semantic: once execution + /// reaches `UpdatePartitionMetadata` or any later rollback-active state, + /// rollback must try to clean metadata written by the repartition procedure. /// /// Notes: - /// - `RepartitionStart`: no-op, because allocation has not happened yet. - /// - `AllocateRegion` / `Dispatch` / `Collect` rollback-active. + /// - `RepartitionStart`: no-op, because no metadata has been updated yet. + /// - `UpdatePartitionMetadata`: rollback table partition metadata. + /// - `AllocateRegion` / `Dispatch` / `Collect`: rollback table partition metadata + /// and allocated region metadata. /// - `DeallocateRegion`: is not rollback-active. /// - `RepartitionEnd`: no-op. - fn should_rollback_allocated_regions(&self) -> bool { - self.state.as_any().is::() + fn should_rollback(&self) -> bool { + self.state + .as_any() + .is::() + || self.state.as_any().is::() || self.state.as_any().is::() || self.state.as_any().is::() } @@ -526,7 +555,7 @@ impl RepartitionProcedure { /// Returns allocated region ids that parent rollback should remove. /// - /// Rollback uses an "after AllocateRegion" semantic: + /// Rollback uses an "after region allocation" semantic: /// - in `AllocateRegion` and `Dispatch`, all allocated regions belong to the /// current repartition attempt and must be cleaned up. /// - in `Collect`, only the plans referenced by failed or unknown @@ -586,8 +615,47 @@ impl RepartitionProcedure { Ok(()) } + async fn rollback_partition_metadata(&mut self) -> Result<()> { + let Some(update) = self + .context + .persistent_ctx + .partition_metadata_update + .as_ref() + else { + return Ok(()); + }; + if update.partition_key_indices.is_empty() { + return Ok(()); + } + + let table_info_value = self.context.get_raw_table_info_value().await?; + let mut new_partition_key_indices = table_info_value + .table_info + .meta + .partition_key_indices + .clone(); + new_partition_key_indices.retain(|idx| !update.partition_key_indices.contains(idx)); + if new_partition_key_indices == table_info_value.table_info.meta.partition_key_indices { + return Ok(()); + } + + let mut new_table_info = table_info_value.table_info.clone(); + new_table_info.meta.partition_key_indices = new_partition_key_indices; + self.context + .update_table_info(&table_info_value, table_info_value.update(new_table_info)) + .await?; + + // Do not invalidate the table cache here. The table routes may still + // contain partition expressions until `rollback_inner` rolls them back. + // Exposing cleared partition columns with partitioned routes can build + // an inconsistent partition rule. The cache is invalidated once after + // both partition metadata and routes are rolled back. + + Ok(()) + } + async fn rollback_inner(&mut self, procedure_ctx: &ProcedureContext) -> Result<()> { - if !self.should_rollback_allocated_regions() { + if !self.should_rollback() { return Ok(()); } @@ -596,6 +664,8 @@ impl RepartitionProcedure { let table_lock = TableLock::Write(table_id).into(); let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await; + + self.rollback_partition_metadata().await?; let table_route_value = self.context.get_table_route_value().await?; let original_region_routes = region_routes(table_id, table_route_value.get_inner_ref())?; let mut current_region_routes = original_region_routes.clone(); @@ -738,20 +808,28 @@ impl RepartitionProcedureFactory for DefaultRepartitionProcedureFactory { ddl_ctx: &DdlContext, table_name: TableName, table_id: TableId, - from_exprs: Vec, + source: RepartitionSource, to_exprs: Vec, timeout: Option, ) -> std::result::Result { let persistent_ctx = PersistentContext::new(table_name, table_id, timeout); - let from_exprs = from_exprs - .iter() - .map(|e| { - PartitionExpr::from_json_str(e) - .context(error::DeserializePartitionExprSnafu)? - .context(error::EmptyPartitionExprSnafu) - }) - .collect::>>() - .map_err(BoxedError::new)?; + let from = match source { + RepartitionSource::Partitioned { exprs } => { + let exprs = exprs + .iter() + .map(|e| { + PartitionExpr::from_json_str(e) + .context(error::DeserializePartitionExprSnafu)? + .context(error::EmptyPartitionExprSnafu) + }) + .collect::>>() + .map_err(BoxedError::new)?; + RepartitionFrom::Partitioned { exprs } + } + RepartitionSource::Unpartitioned { partition_columns } => { + RepartitionFrom::Unpartitioned { partition_columns } + } + }; let to_exprs = to_exprs .iter() .map(|e| { @@ -763,7 +841,7 @@ impl RepartitionProcedureFactory for DefaultRepartitionProcedureFactory { .map_err(BoxedError::new)?; let procedure = RepartitionProcedure::new( - from_exprs, + from, to_exprs, Context::new( ddl_ctx, @@ -860,6 +938,9 @@ mod tests { new_parent_context, procedure_context_with_receivers, procedure_state_receiver, range_expr, test_region_route, test_region_wal_options, }; + use crate::procedure::repartition::update_partition_metadata::{ + PartitionMetadataUpdate, UpdatePartitionMetadata, + }; fn test_plan(table_id: TableId) -> RepartitionPlanEntry { RepartitionPlanEntry { @@ -927,6 +1008,15 @@ mod tests { .unwrap() } + async fn table_partition_key_indices(ctx: &Context) -> Vec { + ctx.get_table_info_value() + .await + .unwrap() + .table_info + .meta + .partition_key_indices + } + fn test_procedure(state: Box, context: Context) -> RepartitionProcedure { RepartitionProcedure { state, context } } @@ -965,34 +1055,43 @@ mod tests { } #[test] - fn test_should_rollback_allocated_regions() { + fn test_should_rollback_after_metadata_update() { let env = TestingEnv::new(); let table_id = 1024; let procedure = test_procedure( - Box::new(RepartitionStart::new(vec![], vec![])), + Box::new(RepartitionStart::new( + RepartitionFrom::Partitioned { exprs: vec![] }, + vec![], + )), test_context(&env, table_id), ); - assert!(!procedure.should_rollback_allocated_regions()); + assert!(!procedure.should_rollback()); + + let procedure = test_procedure( + Box::new(UpdatePartitionMetadata::new(vec![])), + test_context(&env, table_id), + ); + assert!(procedure.should_rollback()); let procedure = test_procedure( Box::new(AllocateRegion::new(vec![])), test_context(&env, table_id), ); - assert!(procedure.should_rollback_allocated_regions()); + assert!(procedure.should_rollback()); let procedure = test_procedure(Box::new(Dispatch), test_context(&env, table_id)); - assert!(procedure.should_rollback_allocated_regions()); + assert!(procedure.should_rollback()); let procedure = test_procedure(Box::new(Collect::new(vec![])), test_context(&env, table_id)); - assert!(procedure.should_rollback_allocated_regions()); + assert!(procedure.should_rollback()); let procedure = test_procedure(Box::new(DeallocateRegion), test_context(&env, table_id)); - assert!(!procedure.should_rollback_allocated_regions()); + assert!(!procedure.should_rollback()); let procedure = test_procedure(Box::new(RepartitionEnd), test_context(&env, table_id)); - assert!(!procedure.should_rollback_allocated_regions()); + assert!(!procedure.should_rollback()); } #[test] @@ -1048,6 +1147,68 @@ mod tests { ); } + #[test] + fn test_persistent_context_partition_metadata_update_serde_default() { + let json = r#"{ + "catalog_name":"test_catalog", + "schema_name":"test_schema", + "table_name":"test_table", + "table_id":1024, + "plans":[], + "timeout":"120s" + }"#; + + let persistent_ctx: PersistentContext = serde_json::from_str(json).unwrap(); + + assert!(persistent_ctx.partition_metadata_update.is_none()); + } + + #[tokio::test] + async fn test_repartition_rollback_removes_partition_metadata_indices() { + let env = TestingEnv::new(); + let table_id = 1024; + let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler)); + env.create_physical_table_metadata_for_repartition( + table_id, + vec![test_region_route(RegionId::new(table_id, 1), "")], + test_region_wal_options(&[1]), + ) + .await; + + let mut context = new_parent_context(&env, node_manager, table_id); + let current = context.get_raw_table_info_value().await.unwrap(); + let mut table_info = current.table_info.clone(); + table_info.meta.partition_key_indices = vec![0, 1]; + context + .update_table_info(¤t, current.update(table_info)) + .await + .unwrap(); + context.persistent_ctx.partition_metadata_update = Some(PartitionMetadataUpdate { + partition_key_indices: vec![0], + }); + let mut procedure = RepartitionProcedure { + state: Box::new(UpdatePartitionMetadata::new(vec![])), + context, + }; + + procedure + .rollback(&TestingEnv::procedure_context()) + .await + .unwrap(); + + assert_eq!( + procedure + .context + .get_table_info_value() + .await + .unwrap() + .table_info + .meta + .partition_key_indices, + vec![1] + ); + } + #[tokio::test] async fn test_repartition_rollback_removes_allocated_routes_from_dispatch() { let env = TestingEnv::new(); @@ -1708,7 +1869,9 @@ mod tests { let context = new_parent_context(&env, node_manager, table_id); let mut procedure = RepartitionProcedure::new( - vec![range_expr("x", 0, 100)], + RepartitionFrom::Partitioned { + exprs: vec![range_expr("x", 0, 100)], + }, vec![range_expr("x", 0, 50), range_expr("x", 50, 100)], context, ); @@ -1810,6 +1973,226 @@ mod tests { ); } + #[tokio::test] + async fn test_repartition_procedure_flow_unpartitioned_failed_and_full_rollback() { + let env = TestingEnv::new(); + let table_id = 1024; + let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + + env.create_physical_table_metadata_for_repartition( + table_id, + vec![test_region_route(RegionId::new(table_id, 1), "")], + test_region_wal_options(&[1]), + ) + .await; + + let context = new_parent_context(&env, node_manager, table_id); + let to_exprs = vec![range_expr("col1", 0, 50), range_expr("col1", 50, 100)]; + let mut procedure = RepartitionProcedure::new( + RepartitionFrom::Unpartitioned { + partition_columns: vec!["col1".to_string()], + }, + to_exprs.clone(), + context, + ); + + let start_status = procedure + .execute(&TestingEnv::procedure_context()) + .await + .unwrap(); + assert!(start_status.need_persist()); + assert_parent_state::(&procedure); + assert_eq!( + procedure + .context + .persistent_ctx + .partition_metadata_update + .as_ref() + .unwrap() + .partition_key_indices, + vec![0] + ); + + let update_status = procedure + .execute(&TestingEnv::procedure_context()) + .await + .unwrap(); + assert!(update_status.need_persist()); + assert_parent_state::(&procedure); + assert_eq!( + table_partition_key_indices(&procedure.context).await, + vec![0] + ); + + let build_allocate_status = procedure + .execute(&TestingEnv::procedure_context()) + .await + .unwrap(); + assert!(build_allocate_status.need_persist()); + assert_parent_state::(&procedure); + assert_eq!(procedure.context.persistent_ctx.plans.len(), 1); + let plan = &procedure.context.persistent_ctx.plans[0]; + assert_eq!( + plan.source_regions, + vec![SourceRegionDescriptor::Default { + region_id: RegionId::new(table_id, 1) + }] + ); + assert_eq!(plan.target_regions.len(), 2); + assert_eq!(plan.target_regions[0].region_id, RegionId::new(table_id, 1)); + assert_eq!(plan.target_regions[0].partition_expr, to_exprs[0]); + assert_eq!( + plan.allocated_region_ids, + vec![plan.target_regions[1].region_id] + ); + assert!(plan.pending_deallocate_region_ids.is_empty()); + assert_eq!(plan.transition_map, vec![vec![0, 1]]); + let target_regions = plan.target_regions.clone(); + + let execute_allocate_status = procedure + .execute(&TestingEnv::procedure_context()) + .await + .unwrap(); + assert!(execute_allocate_status.need_persist()); + assert_parent_state::(&procedure); + let region_routes = current_parent_region_routes(&procedure.context).await; + assert_eq!(region_routes.len(), 2); + assert_eq!( + region_route_by_id(®ion_routes, target_regions[0].region_id) + .region + .partition_expr(), + "" + ); + assert_eq!( + region_route_by_id(®ion_routes, target_regions[1].region_id) + .region + .partition_expr(), + to_exprs[1].as_json_str().unwrap() + ); + + let dispatch_status = procedure + .execute(&TestingEnv::procedure_context()) + .await + .unwrap(); + let subprocedure_ids = extract_subprocedure_ids(dispatch_status); + assert_eq!(subprocedure_ids.len(), 1); + assert_parent_state::(&procedure); + + let failed_state = ProcedureState::failed(Arc::new(ProcedureError::external( + MockError::new(StatusCode::Internal), + ))); + let collect_ctx = procedure_context_with_receivers(HashMap::from([( + subprocedure_ids[0], + procedure_state_receiver(failed_state), + )])); + let err = procedure.execute(&collect_ctx).await.unwrap_err(); + assert!(!err.is_retry_later()); + assert_parent_state::(&procedure); + + procedure + .rollback(&TestingEnv::procedure_context()) + .await + .unwrap(); + + assert!( + table_partition_key_indices(&procedure.context) + .await + .is_empty() + ); + assert_eq!( + current_parent_region_routes(&procedure.context).await, + vec![test_region_route(RegionId::new(table_id, 1), "")] + ); + } + + #[tokio::test] + async fn test_repartition_procedure_flow_unpartitioned_rollback_is_idempotent() { + let env = TestingEnv::new(); + let table_id = 1024; + let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + + env.create_physical_table_metadata_for_repartition( + table_id, + vec![test_region_route(RegionId::new(table_id, 1), "")], + test_region_wal_options(&[1]), + ) + .await; + + let context = new_parent_context(&env, node_manager, table_id); + let mut procedure = RepartitionProcedure::new( + RepartitionFrom::Unpartitioned { + partition_columns: vec!["col1".to_string()], + }, + vec![range_expr("col1", 0, 50), range_expr("col1", 50, 100)], + context, + ); + + procedure + .execute(&TestingEnv::procedure_context()) + .await + .unwrap(); + procedure + .execute(&TestingEnv::procedure_context()) + .await + .unwrap(); + procedure + .execute(&TestingEnv::procedure_context()) + .await + .unwrap(); + procedure + .execute(&TestingEnv::procedure_context()) + .await + .unwrap(); + assert_eq!( + table_partition_key_indices(&procedure.context).await, + vec![0] + ); + assert_eq!( + current_parent_region_routes(&procedure.context).await.len(), + 2 + ); + + let dispatch_status = procedure + .execute(&TestingEnv::procedure_context()) + .await + .unwrap(); + let subprocedure_ids = extract_subprocedure_ids(dispatch_status); + assert_eq!(subprocedure_ids.len(), 1); + assert_parent_state::(&procedure); + + let failed_state = ProcedureState::failed(Arc::new(ProcedureError::external( + MockError::new(StatusCode::Internal), + ))); + let collect_ctx = procedure_context_with_receivers(HashMap::from([( + subprocedure_ids[0], + procedure_state_receiver(failed_state), + )])); + let err = procedure.execute(&collect_ctx).await.unwrap_err(); + assert!(!err.is_retry_later()); + + procedure + .rollback(&TestingEnv::procedure_context()) + .await + .unwrap(); + let once_indices = table_partition_key_indices(&procedure.context).await; + let once_routes = current_parent_region_routes(&procedure.context).await; + + procedure + .rollback(&TestingEnv::procedure_context()) + .await + .unwrap(); + let twice_indices = table_partition_key_indices(&procedure.context).await; + let twice_routes = current_parent_region_routes(&procedure.context).await; + + assert_eq!(once_indices, twice_indices); + assert_eq!(once_routes, twice_routes); + assert!(twice_indices.is_empty()); + assert_eq!( + twice_routes, + vec![test_region_route(RegionId::new(table_id, 1), "")] + ); + } + #[tokio::test] async fn test_repartition_procedure_flow_split_allocate_retryable_then_resume() { common_telemetry::init_default_ut_logging(); @@ -1852,7 +2235,9 @@ mod tests { let context = new_parent_context(&env, node_manager, table_id); let mut procedure = RepartitionProcedure::new( - vec![range_expr("x", 0, 100)], + RepartitionFrom::Partitioned { + exprs: vec![range_expr("x", 0, 100)], + }, vec![range_expr("x", 0, 50), range_expr("x", 50, 100)], context, ); diff --git a/src/meta-srv/src/procedure/repartition/repartition_start.rs b/src/meta-srv/src/procedure/repartition/repartition_start.rs index 6e14e6a0e6..b6f0ec9c0a 100644 --- a/src/meta-srv/src/procedure/repartition/repartition_start.rs +++ b/src/meta-srv/src/procedure/repartition/repartition_start.rs @@ -20,7 +20,7 @@ use common_telemetry::debug; use partition::collider::Collider; use partition::expr::PartitionExpr; use partition::subtask::{self, RepartitionSubtask}; -use serde::{Deserialize, Serialize}; +use serde::{Deserialize, Deserializer, Serialize}; use snafu::{OptionExt, ResultExt, ensure}; use tokio::time::Instant; use uuid::Uuid; @@ -29,20 +29,57 @@ use crate::error::{self, Result}; use crate::procedure::repartition::allocate_region::AllocateRegion; use crate::procedure::repartition::plan::{AllocationPlanEntry, SourceRegionDescriptor}; use crate::procedure::repartition::repartition_end::RepartitionEnd; +use crate::procedure::repartition::update_partition_metadata::{ + PartitionMetadataUpdate, UpdatePartitionMetadata, +}; use crate::procedure::repartition::{Context, State}; +#[derive(Debug, Clone, Serialize)] +pub enum RepartitionFrom { + Partitioned { exprs: Vec }, + Unpartitioned { partition_columns: Vec }, +} + +impl<'de> Deserialize<'de> for RepartitionFrom { + fn deserialize(deserializer: D) -> std::result::Result + where + D: Deserializer<'de>, + { + #[derive(Deserialize)] + enum CurrentRepartitionFrom { + Partitioned { exprs: Vec }, + Unpartitioned { partition_columns: Vec }, + } + + #[derive(Deserialize)] + #[serde(untagged)] + enum RepartitionFromRepr { + Current(CurrentRepartitionFrom), + Legacy(Vec), + } + + match RepartitionFromRepr::deserialize(deserializer)? { + RepartitionFromRepr::Current(CurrentRepartitionFrom::Partitioned { exprs }) => { + Ok(Self::Partitioned { exprs }) + } + RepartitionFromRepr::Current(CurrentRepartitionFrom::Unpartitioned { + partition_columns, + }) => Ok(Self::Unpartitioned { partition_columns }), + RepartitionFromRepr::Legacy(exprs) => Ok(Self::Partitioned { exprs }), + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RepartitionStart { - from_exprs: Vec, + #[serde(alias = "from_exprs")] + from: RepartitionFrom, to_exprs: Vec, } impl RepartitionStart { - pub fn new(from_exprs: Vec, to_exprs: Vec) -> Self { - Self { - from_exprs, - to_exprs, - } + pub fn new(from: RepartitionFrom, to_exprs: Vec) -> Self { + Self { from, to_exprs } } } @@ -54,6 +91,13 @@ impl State for RepartitionStart { ctx: &mut Context, _: &ProcedureContext, ) -> Result<(Box, Status)> { + ensure!( + !self.to_exprs.is_empty(), + error::InvalidArgumentsSnafu { + err_msg: "Repartition expects non-empty target partition expressions".to_string(), + } + ); + let timer = Instant::now(); let (physical_table_id, table_route) = ctx .table_metadata_manager @@ -72,7 +116,8 @@ impl State for RepartitionStart { } ); - let plans = Self::build_plan(&table_route, &self.from_exprs, &self.to_exprs)?; + let from_exprs = self.prepare_from(ctx).await?; + let plans = Self::build_plan(&table_route, from_exprs, &self.to_exprs)?; let plan_count = plans.len(); let total_source_regions: usize = plans.iter().map(|p| p.source_regions.len()).sum(); let total_target_regions: usize = @@ -91,10 +136,17 @@ impl State for RepartitionStart { return Ok((Box::new(RepartitionEnd), Status::done())); } - Ok(( - Box::new(AllocateRegion::new(plans)), - Status::executing(false), - )) + if ctx.persistent_ctx.partition_metadata_update.is_some() { + Ok(( + Box::new(UpdatePartitionMetadata::new(plans)), + Status::executing(true), + )) + } else { + Ok(( + Box::new(AllocateRegion::new(plans)), + Status::executing(false), + )) + } } fn as_any(&self) -> &dyn Any { @@ -103,6 +155,65 @@ impl State for RepartitionStart { } impl RepartitionStart { + async fn prepare_from<'a>(&'a self, ctx: &mut Context) -> Result<&'a [PartitionExpr]> { + match &self.from { + RepartitionFrom::Partitioned { exprs } => Ok(exprs), + RepartitionFrom::Unpartitioned { partition_columns } => { + Self::prepare_unpartitioned(ctx, partition_columns).await?; + Ok(&[]) + } + } + } + + async fn prepare_unpartitioned(ctx: &mut Context, partition_columns: &[String]) -> Result<()> { + if ctx.persistent_ctx.partition_metadata_update.is_some() { + return Ok(()); + } + + ensure!( + !partition_columns.is_empty(), + error::InvalidArgumentsSnafu { + err_msg: "Unpartitioned repartition expects non-empty partition columns" + .to_string(), + } + ); + + let table_info_value = ctx.get_table_info_value().await?; + ensure!( + table_info_value + .table_info + .meta + .partition_key_indices + .is_empty(), + error::InvalidArgumentsSnafu { + err_msg: format!( + "Unpartitioned repartition expects an unpartitioned table, but table {} has partition key indices: {:?}", + ctx.persistent_ctx.table_id, + table_info_value.table_info.meta.partition_key_indices + ), + } + ); + + let schema = &table_info_value.table_info.meta.schema; + let partition_key_indices = partition_columns + .iter() + .map(|column_name| { + schema.column_index_by_name(column_name).with_context(|| { + error::InvalidArgumentsSnafu { + err_msg: format!( + "Partition column {} not found in table {}", + column_name, ctx.persistent_ctx.table_id + ), + } + }) + }) + .collect::>>()?; + ctx.persistent_ctx.partition_metadata_update = + Some(PartitionMetadataUpdate::new(partition_key_indices)); + + Ok(()) + } + pub(crate) fn build_plan( physical_route: &PhysicalTableRouteValue, from_exprs: &[PartitionExpr], @@ -227,7 +338,6 @@ impl RepartitionStart { ), } ); - let source_region = &physical_route.region_routes[0].region; ensure!( source_region.partition_expr().is_empty(), @@ -247,20 +357,37 @@ impl RepartitionStart { #[cfg(test)] mod tests { + use std::sync::Arc; + + use common_meta::ddl::test_util::datanode_handler::NaiveDatanodeHandler; use common_meta::key::table_route::PhysicalTableRouteValue; use common_meta::peer::Peer; use common_meta::rpc::router::{Region, RegionRoute}; + use common_meta::test_util::MockDatanodeManager; use datatypes::prelude::Value; use partition::expr::{Operand, RestrictedOp}; use store_api::storage::RegionId; use super::*; - use crate::procedure::repartition::test_util::{range_expr, test_region_route}; + use crate::procedure::repartition::test_util::{ + TestingEnv, new_parent_context, range_expr, test_region_route, test_region_wal_options, + }; fn physical_route(region_routes: Vec) -> PhysicalTableRouteValue { PhysicalTableRouteValue::new(region_routes) } + async fn new_test_context(env: &TestingEnv, table_id: u32) -> Context { + env.create_physical_table_metadata_for_repartition( + table_id, + vec![test_region_route(RegionId::new(table_id, 1), "")], + test_region_wal_options(&[1]), + ) + .await; + let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + new_parent_context(env, node_manager, table_id) + } + #[test] fn test_build_plan_with_default_source_region() { let table_id = 1024; @@ -367,4 +494,216 @@ mod tests { )] ); } + + #[test] + fn test_repartition_start_deserializes_legacy_from_exprs() { + let from_exprs = vec![range_expr("x", 0, 100)]; + let to_exprs = vec![range_expr("x", 0, 50), range_expr("x", 50, 100)]; + let json = serde_json::json!({ + "from_exprs": from_exprs, + "to_exprs": to_exprs, + }) + .to_string(); + + let state: RepartitionStart = serde_json::from_str(&json).unwrap(); + + let RepartitionFrom::Partitioned { exprs } = state.from else { + panic!("expected partition source"); + }; + assert_eq!(exprs, vec![range_expr("x", 0, 100)]); + } + + #[test] + fn test_repartition_start_deserializes_current_from() { + let state = RepartitionStart::new( + RepartitionFrom::Unpartitioned { + partition_columns: vec!["col1".to_string()], + }, + vec![range_expr("col1", 0, 50)], + ); + let json = serde_json::to_string(&state).unwrap(); + + let state: RepartitionStart = serde_json::from_str(&json).unwrap(); + + let RepartitionFrom::Unpartitioned { partition_columns } = state.from else { + panic!("expected unpartitioned source"); + }; + assert_eq!(partition_columns, vec!["col1"]); + } + + #[tokio::test] + async fn test_partitioned_source_does_not_initialize_partition_metadata_update() { + let env = TestingEnv::new(); + let table_id = 1024; + env.create_physical_table_metadata_for_repartition( + table_id, + vec![test_region_route( + RegionId::new(table_id, 1), + &range_expr("x", 0, 100).as_json_str().unwrap(), + )], + test_region_wal_options(&[1]), + ) + .await; + let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let mut ctx = new_parent_context(&env, node_manager, table_id); + let mut state = RepartitionStart::new( + RepartitionFrom::Partitioned { + exprs: vec![range_expr("x", 0, 100)], + }, + vec![range_expr("x", 0, 50), range_expr("x", 50, 100)], + ); + + let (next, status) = state + .next(&mut ctx, &TestingEnv::procedure_context()) + .await + .unwrap(); + + assert!(!status.need_persist()); + assert!(next.as_any().is::()); + assert!(ctx.persistent_ctx.partition_metadata_update.is_none()); + } + + #[tokio::test] + async fn test_unpartitioned_source_initializes_partition_metadata_update() { + let env = TestingEnv::new(); + let table_id = 1024; + let mut ctx = new_test_context(&env, table_id).await; + let mut state = RepartitionStart::new( + RepartitionFrom::Unpartitioned { + partition_columns: vec!["col2".to_string(), "col1".to_string()], + }, + vec![range_expr("col2", 0, 50), range_expr("col2", 50, 100)], + ); + + let (next, status) = state + .next(&mut ctx, &TestingEnv::procedure_context()) + .await + .unwrap(); + + assert!(status.need_persist()); + assert!(next.as_any().is::()); + assert_eq!( + ctx.persistent_ctx + .partition_metadata_update + .as_ref() + .unwrap() + .partition_key_indices, + vec![2, 0] + ); + } + + #[tokio::test] + async fn test_unpartitioned_source_rejects_existing_partition_metadata() { + let env = TestingEnv::new(); + let table_id = 1024; + let mut ctx = new_test_context(&env, table_id).await; + let current = ctx.get_raw_table_info_value().await.unwrap(); + let mut table_info = current.table_info.clone(); + table_info.meta.partition_key_indices = vec![0]; + ctx.update_table_info(¤t, current.update(table_info)) + .await + .unwrap(); + let mut state = RepartitionStart::new( + RepartitionFrom::Unpartitioned { + partition_columns: vec!["col1".to_string()], + }, + vec![range_expr("col1", 0, 50)], + ); + + let err = state + .next(&mut ctx, &TestingEnv::procedure_context()) + .await + .unwrap_err(); + + assert!(err.to_string().contains("expects an unpartitioned table")); + assert!(ctx.persistent_ctx.partition_metadata_update.is_none()); + } + + #[tokio::test] + async fn test_repartition_start_rejects_empty_target_partition_exprs() { + let env = TestingEnv::new(); + let table_id = 1024; + let mut ctx = new_test_context(&env, table_id).await; + let mut state = + RepartitionStart::new(RepartitionFrom::Partitioned { exprs: vec![] }, vec![]); + + let err = state + .next(&mut ctx, &TestingEnv::procedure_context()) + .await + .unwrap_err(); + + assert!( + err.to_string() + .contains("non-empty target partition expressions") + ); + } + + #[tokio::test] + async fn test_unpartitioned_source_rejects_empty_target_partition_exprs() { + let env = TestingEnv::new(); + let table_id = 1024; + let mut ctx = new_test_context(&env, table_id).await; + let mut state = RepartitionStart::new( + RepartitionFrom::Unpartitioned { + partition_columns: vec!["col1".to_string()], + }, + vec![], + ); + + let err = state + .next(&mut ctx, &TestingEnv::procedure_context()) + .await + .unwrap_err(); + + assert!( + err.to_string() + .contains("non-empty target partition expressions") + ); + assert!(ctx.persistent_ctx.partition_metadata_update.is_none()); + } + + #[tokio::test] + async fn test_unpartitioned_source_rejects_empty_partition_columns() { + let env = TestingEnv::new(); + let table_id = 1024; + let mut ctx = new_test_context(&env, table_id).await; + let mut state = RepartitionStart::new( + RepartitionFrom::Unpartitioned { + partition_columns: vec![], + }, + vec![range_expr("col1", 0, 50)], + ); + + let err = state + .next(&mut ctx, &TestingEnv::procedure_context()) + .await + .unwrap_err(); + + assert!(err.to_string().contains("non-empty partition columns")); + assert!(ctx.persistent_ctx.partition_metadata_update.is_none()); + } + + #[tokio::test] + async fn test_unpartitioned_source_rejects_missing_partition_column() { + let env = TestingEnv::new(); + let table_id = 1024; + let mut ctx = new_test_context(&env, table_id).await; + let mut state = RepartitionStart::new( + RepartitionFrom::Unpartitioned { + partition_columns: vec!["missing_col".to_string()], + }, + vec![range_expr("col1", 0, 50)], + ); + + let err = state + .next(&mut ctx, &TestingEnv::procedure_context()) + .await + .unwrap_err(); + + assert!( + err.to_string() + .contains("Partition column missing_col not found") + ); + assert!(ctx.persistent_ctx.partition_metadata_update.is_none()); + } } diff --git a/src/meta-srv/src/procedure/repartition/update_partition_metadata.rs b/src/meta-srv/src/procedure/repartition/update_partition_metadata.rs new file mode 100644 index 0000000000..cc9ca1c9bb --- /dev/null +++ b/src/meta-srv/src/procedure/repartition/update_partition_metadata.rs @@ -0,0 +1,251 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use common_meta::lock_key::TableLock; +use common_procedure::{Context as ProcedureContext, Status}; +use serde::{Deserialize, Serialize}; +use snafu::ensure; + +use crate::error::{self, Result}; +use crate::procedure::repartition::allocate_region::AllocateRegion; +use crate::procedure::repartition::plan::AllocationPlanEntry; +use crate::procedure::repartition::{Context, State}; + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct PartitionMetadataUpdate { + pub partition_key_indices: Vec, +} + +impl PartitionMetadataUpdate { + pub fn new(partition_key_indices: Vec) -> Self { + Self { + partition_key_indices, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UpdatePartitionMetadata { + plan_entries: Vec, +} + +impl UpdatePartitionMetadata { + pub fn new(plan_entries: Vec) -> Self { + Self { plan_entries } + } +} + +#[async_trait::async_trait] +#[typetag::serde] +impl State for UpdatePartitionMetadata { + async fn next( + &mut self, + ctx: &mut Context, + procedure_ctx: &ProcedureContext, + ) -> Result<(Box, Status)> { + let Some(update) = ctx.persistent_ctx.partition_metadata_update.as_ref() else { + return Ok(( + Box::new(AllocateRegion::new(self.plan_entries.clone())), + Status::executing(false), + )); + }; + let partition_key_indices = update.partition_key_indices.clone(); + ensure!( + !partition_key_indices.is_empty(), + error::InvalidArgumentsSnafu { + err_msg: + "Repartition partition metadata update expects non-empty partition key indices" + .to_string(), + } + ); + + let table_id = ctx.persistent_ctx.table_id; + let table_lock = TableLock::Write(table_id).into(); + let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await; + let table_info_value = ctx.get_raw_table_info_value().await?; + let current_partition_key_indices = &table_info_value.table_info.meta.partition_key_indices; + if current_partition_key_indices == &partition_key_indices { + return Ok(( + Box::new(AllocateRegion::new(self.plan_entries.clone())), + Status::executing(true), + )); + } + ensure!( + current_partition_key_indices.is_empty(), + error::InvalidArgumentsSnafu { + err_msg: format!( + "Repartition partition metadata update expects an unpartitioned table, but table {} has partition key indices: {:?}", + table_id, current_partition_key_indices + ), + } + ); + + let mut new_table_info = table_info_value.table_info.clone(); + new_table_info.meta.partition_key_indices = partition_key_indices; + ctx.update_table_info(&table_info_value, table_info_value.update(new_table_info)) + .await?; + // We don't invalidate cache here because the subsequent AllocateRegion step + // will update the table route and invalidate the cache accordingly. + + Ok(( + Box::new(AllocateRegion::new(self.plan_entries.clone())), + Status::executing(true), + )) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use common_meta::ddl::test_util::datanode_handler::NaiveDatanodeHandler; + use common_meta::test_util::MockDatanodeManager; + use store_api::storage::{RegionId, TableId}; + + use super::*; + use crate::procedure::repartition::test_util::{ + TestingEnv, new_parent_context, range_expr, test_region_route, test_region_wal_options, + }; + + async fn new_test_context(env: &TestingEnv, table_id: TableId) -> Context { + env.create_physical_table_metadata_for_repartition( + table_id, + vec![test_region_route(RegionId::new(table_id, 1), "")], + test_region_wal_options(&[1]), + ) + .await; + let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let mut ctx = new_parent_context(env, node_manager, table_id); + ctx.persistent_ctx.partition_metadata_update = Some(PartitionMetadataUpdate::new(vec![0])); + ctx + } + + async fn set_partition_key_indices(ctx: &Context, partition_key_indices: Vec) { + let current = ctx.get_raw_table_info_value().await.unwrap(); + let mut table_info = current.table_info.clone(); + table_info.meta.partition_key_indices = partition_key_indices; + ctx.update_table_info(¤t, current.update(table_info)) + .await + .unwrap(); + } + + async fn partition_key_indices(ctx: &Context) -> Vec { + ctx.get_table_info_value() + .await + .unwrap() + .table_info + .meta + .partition_key_indices + } + + #[tokio::test] + async fn test_update_partition_metadata_applies_to_unpartitioned_table() { + let env = TestingEnv::new(); + let table_id = 1024; + let mut ctx = new_test_context(&env, table_id).await; + let mut state = UpdatePartitionMetadata::new(vec![]); + + let (next, status) = state + .next(&mut ctx, &TestingEnv::procedure_context()) + .await + .unwrap(); + + assert!(status.need_persist()); + assert!(next.as_any().is::()); + assert_eq!(partition_key_indices(&ctx).await, vec![0]); + } + + #[tokio::test] + async fn test_update_partition_metadata_replay_is_noop() { + let env = TestingEnv::new(); + let table_id = 1024; + let mut ctx = new_test_context(&env, table_id).await; + set_partition_key_indices(&ctx, vec![0]).await; + let mut state = UpdatePartitionMetadata::new(vec![]); + + let (next, status) = state + .next(&mut ctx, &TestingEnv::procedure_context()) + .await + .unwrap(); + + assert!(status.need_persist()); + assert!(next.as_any().is::()); + assert_eq!(partition_key_indices(&ctx).await, vec![0]); + } + + #[tokio::test] + async fn test_update_partition_metadata_rejects_empty_partition_key_indices() { + let env = TestingEnv::new(); + let table_id = 1024; + let mut ctx = new_test_context(&env, table_id).await; + ctx.persistent_ctx.partition_metadata_update = Some(PartitionMetadataUpdate::new(vec![])); + let mut state = UpdatePartitionMetadata::new(vec![]); + + let err = state + .next(&mut ctx, &TestingEnv::procedure_context()) + .await + .unwrap_err(); + + assert!(err.to_string().contains("non-empty partition key indices")); + assert!(partition_key_indices(&ctx).await.is_empty()); + } + + #[tokio::test] + async fn test_update_partition_metadata_rejects_other_partition_keys() { + let env = TestingEnv::new(); + let table_id = 1024; + let mut ctx = new_test_context(&env, table_id).await; + set_partition_key_indices(&ctx, vec![1]).await; + let mut state = UpdatePartitionMetadata::new(vec![]); + + let err = state + .next(&mut ctx, &TestingEnv::procedure_context()) + .await + .unwrap_err(); + + assert!(err.to_string().contains("expects an unpartitioned table")); + assert_eq!(partition_key_indices(&ctx).await, vec![1]); + } + + #[tokio::test] + async fn test_update_partition_metadata_preserves_plan_entries() { + let env = TestingEnv::new(); + let table_id = 1024; + let mut ctx = new_test_context(&env, table_id).await; + let plan_entries = vec![crate::procedure::repartition::plan::AllocationPlanEntry { + group_id: uuid::Uuid::new_v4(), + source_regions: vec![ + crate::procedure::repartition::plan::SourceRegionDescriptor::Default { + region_id: RegionId::new(table_id, 1), + }, + ], + target_partition_exprs: vec![range_expr("x", 0, 10)], + transition_map: vec![vec![0]], + }]; + let mut state = UpdatePartitionMetadata::new(plan_entries); + + let (next, _) = state + .next(&mut ctx, &TestingEnv::procedure_context()) + .await + .unwrap(); + + assert!(next.as_any().is::()); + } +} diff --git a/src/standalone/src/procedure.rs b/src/standalone/src/procedure.rs index 144a56be44..853221e698 100644 --- a/src/standalone/src/procedure.rs +++ b/src/standalone/src/procedure.rs @@ -17,7 +17,7 @@ use std::time::Duration; use common_error::ext::BoxedError; use common_meta::ddl::DdlContext; -use common_meta::ddl_manager::RepartitionProcedureFactory; +use common_meta::ddl_manager::{RepartitionProcedureFactory, RepartitionSource}; use common_meta::key::runtime_switch::RuntimeSwitchManager; use common_meta::kv_backend::KvBackendRef; use common_meta::state_store::KvStateStore; @@ -66,7 +66,7 @@ impl RepartitionProcedureFactory for StandaloneRepartitionProcedureFactory { _ddl_ctx: &DdlContext, _table_name: TableName, _table_id: TableId, - _from_exprs: Vec, + _source: RepartitionSource, _to_exprs: Vec, _timeout: Option, ) -> std::result::Result { diff --git a/tests-integration/tests/repartition.rs b/tests-integration/tests/repartition.rs index 50893cc7a6..ef59d1b910 100644 --- a/tests-integration/tests/repartition.rs +++ b/tests-integration/tests/repartition.rs @@ -55,6 +55,24 @@ macro_rules! repartition_tests { } } + #[tokio::test(flavor = "multi_thread")] + async fn [< test_partition_unpartitioned_mito >]() { + let store_type = tests_integration::test_util::StorageType::$service; + if store_type.test_on() { + common_telemetry::init_default_ut_logging(); + $crate::repartition::test_partition_unpartitioned_mito(store_type).await; + } + } + + #[tokio::test(flavor = "multi_thread")] + async fn [< test_partition_unpartitioned_metric >]() { + let store_type = tests_integration::test_util::StorageType::$service; + if store_type.test_on() { + common_telemetry::init_default_ut_logging(); + $crate::repartition::test_partition_unpartitioned_metric(store_type).await; + } + } + #[tokio::test(flavor = "multi_thread")] async fn [< test_repartition_metric >]() { let store_type = tests_integration::test_util::StorageType::$service; @@ -78,6 +96,274 @@ macro_rules! repartition_tests { }; } +pub async fn test_partition_unpartitioned_mito(store_type: StorageType) { + info!( + "test_partition_unpartitioned_mito: store_type: {:?}", + store_type + ); + let cluster_name = "test_partition_unpartitioned_mito"; + let (store_config, _guard) = get_test_store_config(&store_type); + let datanodes = 3u64; + let mut builder = GreptimeDbClusterBuilder::new(cluster_name).await; + if matches!(store_type, StorageType::File) { + let home_dir = create_temp_dir("test_partition_unpartitioned_mito_data_home"); + builder = builder.with_shared_home_dir(Arc::new(home_dir)); + } + + let cluster = builder + .with_datanodes(datanodes as u32) + .with_store_config(store_config) + .with_datanode_wal_config(DatanodeWalConfig::Noop) + .build(true) + .await; + + let query_ctx = QueryContext::arc(); + let instance = cluster.fe_instance(); + + let sql = r#" + CREATE TABLE `partition_unpartitioned_mito_table`( + `id` INT, + `city` STRING, + `ts` TIMESTAMP TIME INDEX, + PRIMARY KEY(`id`, `city`) + ) ENGINE = mito; + "#; + run_sql(instance, sql, query_ctx.clone()).await.unwrap(); + + let sql = r#" + INSERT INTO `partition_unpartitioned_mito_table` VALUES + (1, 'New York', '2022-01-01 00:00:00'), + (10, 'Paris', '2022-01-01 00:00:00'), + (20, 'Beijing', '2022-01-01 00:00:00'); + "#; + run_sql(instance, sql, query_ctx.clone()).await.unwrap(); + + let sql = r#" + ALTER TABLE `partition_unpartitioned_mito_table` PARTITION ON COLUMNS (`id`) ( + `id` < 10, + `id` >= 10 AND `id` < 20, + `id` >= 20 + ); + "#; + run_sql(instance, sql, query_ctx.clone()).await.unwrap(); + // Wait for cache invalidation. + tokio::time::sleep(Duration::from_millis(500)).await; + + let result = run_sql( + instance, + "SELECT * FROM `partition_unpartitioned_mito_table` ORDER BY `id`", + query_ctx.clone(), + ) + .await + .unwrap(); + let expected = "\ ++----+----------+---------------------+ +| id | city | ts | ++----+----------+---------------------+ +| 1 | New York | 2022-01-01T00:00:00 | +| 10 | Paris | 2022-01-01T00:00:00 | +| 20 | Beijing | 2022-01-01T00:00:00 | ++----+----------+---------------------+"; + check_output_stream(result.data, expected).await; + + let result = run_sql( + instance, + "\ +SELECT partition_expression, partition_description \ +FROM information_schema.partitions \ +WHERE table_name = 'partition_unpartitioned_mito_table' \ +ORDER BY partition_ordinal_position;", + query_ctx.clone(), + ) + .await + .unwrap(); + let expected_partitions = r#"+----------------------+-----------------------+ +| partition_expression | partition_description | ++----------------------+-----------------------+ +| id | id < 10 | +| id | id >= 10 AND id < 20 | +| id | id >= 20 | ++----------------------+-----------------------+"#; + check_output_stream(result.data, expected_partitions).await; + + let sql = r#" + INSERT INTO `partition_unpartitioned_mito_table` VALUES + (5, 'London', '2022-01-02 00:00:00'), + (15, 'Tokyo', '2022-01-02 00:00:00'), + (25, 'Shanghai', '2022-01-02 00:00:00'); + "#; + run_sql(instance, sql, query_ctx.clone()).await.unwrap(); + + let result = run_sql( + instance, + "SELECT * FROM `partition_unpartitioned_mito_table` ORDER BY `id`", + query_ctx.clone(), + ) + .await + .unwrap(); + let expected = "\ ++----+----------+---------------------+ +| id | city | ts | ++----+----------+---------------------+ +| 1 | New York | 2022-01-01T00:00:00 | +| 5 | London | 2022-01-02T00:00:00 | +| 10 | Paris | 2022-01-01T00:00:00 | +| 15 | Tokyo | 2022-01-02T00:00:00 | +| 20 | Beijing | 2022-01-01T00:00:00 | +| 25 | Shanghai | 2022-01-02T00:00:00 | ++----+----------+---------------------+"; + check_output_stream(result.data, expected).await; + + run_sql( + instance, + "DROP TABLE `partition_unpartitioned_mito_table`", + query_ctx.clone(), + ) + .await + .unwrap(); +} + +pub async fn test_partition_unpartitioned_metric(store_type: StorageType) { + info!( + "test_partition_unpartitioned_metric: store_type: {:?}", + store_type + ); + let cluster_name = "test_partition_unpartitioned_metric"; + let (store_config, _guard) = get_test_store_config(&store_type); + let datanodes = 3u64; + let mut builder = GreptimeDbClusterBuilder::new(cluster_name).await; + if matches!(store_type, StorageType::File) { + let home_dir = create_temp_dir("test_partition_unpartitioned_metric_data_home"); + builder = builder.with_shared_home_dir(Arc::new(home_dir)); + } + + let cluster = builder + .with_datanodes(datanodes as u32) + .with_store_config(store_config) + .with_datanode_wal_config(DatanodeWalConfig::Noop) + .build(true) + .await; + + let query_ctx = QueryContext::arc(); + let instance = cluster.fe_instance(); + + let sql = r#" + CREATE TABLE `partition_unpartitioned_metric_phy`( + `ts` TIMESTAMP TIME INDEX, + `val` DOUBLE, + `host` STRING PRIMARY KEY + ) ENGINE = metric + WITH ( + "physical_metric_table" = "true" + ); + "#; + run_sql(instance, sql, query_ctx.clone()).await.unwrap(); + + let sql = r#" + CREATE TABLE `partition_unpartitioned_metric_log`( + `ts` TIMESTAMP TIME INDEX, + `val` DOUBLE, + `host` STRING PRIMARY KEY + ) ENGINE = metric + WITH ( + "on_physical_table" = "partition_unpartitioned_metric_phy" + ); + "#; + run_sql(instance, sql, query_ctx.clone()).await.unwrap(); + + let sql = r#" + INSERT INTO `partition_unpartitioned_metric_log` (`host`, `ts`, `val`) VALUES + ('a_host', '2022-01-01 00:00:00', 1), + ('z_host', '2022-01-01 00:00:00', 2); + "#; + run_sql(instance, sql, query_ctx.clone()).await.unwrap(); + + let sql = r#" + ALTER TABLE `partition_unpartitioned_metric_phy` PARTITION ON COLUMNS (`host`) ( + `host` < 'm', + `host` >= 'm' + ); + "#; + run_sql(instance, sql, query_ctx.clone()).await.unwrap(); + // Wait for cache invalidation. + tokio::time::sleep(Duration::from_millis(500)).await; + + let result = run_sql( + instance, + "SELECT * FROM `partition_unpartitioned_metric_log` ORDER BY `host`", + query_ctx.clone(), + ) + .await + .unwrap(); + let expected = "\ ++--------+---------------------+-----+ +| host | ts | val | ++--------+---------------------+-----+ +| a_host | 2022-01-01T00:00:00 | 1.0 | +| z_host | 2022-01-01T00:00:00 | 2.0 | ++--------+---------------------+-----+"; + check_output_stream(result.data, expected).await; + + let result = run_sql( + instance, + "\ +SELECT partition_expression, partition_description \ +FROM information_schema.partitions \ +WHERE table_name = 'partition_unpartitioned_metric_phy' \ +ORDER BY partition_ordinal_position;", + query_ctx.clone(), + ) + .await + .unwrap(); + let expected_partitions = r#"+----------------------+-----------------------+ +| partition_expression | partition_description | ++----------------------+-----------------------+ +| host | host < m | +| host | host >= m | ++----------------------+-----------------------+"#; + check_output_stream(result.data, expected_partitions).await; + + let sql = r#" + INSERT INTO `partition_unpartitioned_metric_log` (`host`, `ts`, `val`) VALUES + ('b_host', '2022-01-02 00:00:00', 3), + ('x_host', '2022-01-02 00:00:00', 4); + "#; + run_sql(instance, sql, query_ctx.clone()).await.unwrap(); + + let result = run_sql( + instance, + "SELECT * FROM `partition_unpartitioned_metric_log` ORDER BY `host`", + query_ctx.clone(), + ) + .await + .unwrap(); + let expected = "\ ++--------+---------------------+-----+ +| host | ts | val | ++--------+---------------------+-----+ +| a_host | 2022-01-01T00:00:00 | 1.0 | +| b_host | 2022-01-02T00:00:00 | 3.0 | +| x_host | 2022-01-02T00:00:00 | 4.0 | +| z_host | 2022-01-01T00:00:00 | 2.0 | ++--------+---------------------+-----+"; + check_output_stream(result.data, expected).await; + + run_sql( + instance, + "DROP TABLE `partition_unpartitioned_metric_log`", + query_ctx.clone(), + ) + .await + .unwrap(); + run_sql( + instance, + "DROP TABLE `partition_unpartitioned_metric_phy`", + query_ctx.clone(), + ) + .await + .unwrap(); +} + async fn trigger_table_gc(metasrv: &Arc, table_name: &str) { info!("triggering table gc for table: {}", table_name); let table_metadata_manager = metasrv.table_metadata_manager();