diff --git a/src/common/meta/src/ddl/table_meta.rs b/src/common/meta/src/ddl/table_meta.rs index f07e2c2f6f..0ca68cde42 100644 --- a/src/common/meta/src/ddl/table_meta.rs +++ b/src/common/meta/src/ddl/table_meta.rs @@ -113,15 +113,19 @@ impl TableMetadataAllocator { table_id: TableId, task: &CreateTableTask, ) -> Result { - let regions = task.partitions.len(); + let num_regions = task + .partitions + .as_ref() + .map(|p| p.value_list.len()) + .unwrap_or(1); ensure!( - regions > 0, + num_regions > 0, error::UnexpectedSnafu { err_msg: "The number of partitions must be greater than 0" } ); - let peers = self.peer_allocator.alloc(regions).await?; + let peers = self.peer_allocator.alloc(num_regions).await?; debug!("Allocated peers {:?} for table {}", peers, table_id); let region_routes = task .partitions diff --git a/src/common/meta/src/ddl/test_util.rs b/src/common/meta/src/ddl/test_util.rs index 24fa5522fa..2bc8a45b0f 100644 --- a/src/common/meta/src/ddl/test_util.rs +++ b/src/common/meta/src/ddl/test_util.rs @@ -21,7 +21,6 @@ pub mod flownode_handler; use std::assert_matches::assert_matches; use std::collections::HashMap; -use api::v1::meta::Partition; use api::v1::{ColumnDataType, SemanticType}; use common_procedure::Status; use datatypes::prelude::ConcreteDataType; @@ -145,10 +144,7 @@ pub fn test_create_logical_table_task(name: &str) -> CreateTableTask { CreateTableTask { create_table, // Single region - partitions: vec![Partition { - column_list: vec![], - value_list: vec![], - }], + partitions: None, table_info, } } @@ -183,10 +179,7 @@ pub fn test_create_physical_table_task(name: &str) -> CreateTableTask { CreateTableTask { create_table, // Single region - partitions: vec![Partition { - column_list: vec![], - value_list: vec![], - }], + partitions: None, table_info, } } diff --git a/src/common/meta/src/ddl/test_util/create_table.rs b/src/common/meta/src/ddl/test_util/create_table.rs index b0b7b3a5c9..afd4b06833 100644 --- a/src/common/meta/src/ddl/test_util/create_table.rs +++ b/src/common/meta/src/ddl/test_util/create_table.rs @@ -15,7 +15,6 @@ use std::collections::HashMap; use api::v1::column_def::try_as_column_schema; -use api::v1::meta::Partition; use api::v1::{ColumnDataType, ColumnDef, CreateTableExpr, SemanticType}; use chrono::DateTime; use common_catalog::consts::{ @@ -175,10 +174,7 @@ pub fn test_create_table_task(name: &str, table_id: TableId) -> CreateTableTask CreateTableTask { create_table, // Single region - partitions: vec![Partition { - column_list: vec![], - value_list: vec![], - }], + partitions: None, table_info, } } diff --git a/src/common/meta/src/ddl/tests/create_table.rs b/src/common/meta/src/ddl/tests/create_table.rs index 8e8d70957d..d1fd74c6fb 100644 --- a/src/common/meta/src/ddl/tests/create_table.rs +++ b/src/common/meta/src/ddl/tests/create_table.rs @@ -17,7 +17,7 @@ use std::collections::HashMap; use std::sync::Arc; use api::region::RegionResponse; -use api::v1::meta::{Partition, Peer}; +use api::v1::meta::Peer; use api::v1::region::{region_request, RegionRequest}; use api::v1::{ColumnDataType, SemanticType}; use common_error::ext::ErrorExt; @@ -141,10 +141,7 @@ pub(crate) fn test_create_table_task(name: &str) -> CreateTableTask { CreateTableTask { create_table, // Single region - partitions: vec![Partition { - column_list: vec![], - value_list: vec![], - }], + partitions: None, table_info, } } @@ -218,7 +215,7 @@ async fn test_on_prepare_with_no_partition_err() { let node_manager = Arc::new(MockDatanodeManager::new(())); let ddl_context = new_ddl_context(node_manager); let mut task = test_create_table_task("foo"); - task.partitions = vec![]; + task.partitions = None; task.create_table.create_if_not_exists = true; let mut procedure = CreateTableProcedure::new(task, ddl_context); let err = procedure.on_prepare().await.unwrap_err(); diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index 2eb8468599..62c69aef60 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -96,7 +96,7 @@ impl DdlTask { /// Creates a [`DdlTask`] to create a table. pub fn new_create_table( expr: CreateTableExpr, - partitions: Vec, + partitions: Option, table_info: RawTableInfo, ) -> Self { DdlTask::CreateTable(CreateTableTask::new(expr, partitions, table_info)) @@ -107,7 +107,7 @@ impl DdlTask { DdlTask::CreateLogicalTables( table_data .into_iter() - .map(|(expr, table_info)| CreateTableTask::new(expr, Vec::new(), table_info)) + .map(|(expr, table_info)| CreateTableTask::new(expr, None, table_info)) .collect(), ) } @@ -606,7 +606,10 @@ impl From for PbDropTableTask { #[derive(Debug, PartialEq, Clone)] pub struct CreateTableTask { pub create_table: CreateTableExpr, - pub partitions: Vec, + /// The partitions of the table. + /// + /// If the table is created with a single region (not partitioned), this field is `None`. + pub partitions: Option, pub table_info: RawTableInfo, } @@ -620,7 +623,7 @@ impl TryFrom for CreateTableTask { pb.create_table.context(error::InvalidProtoMsgSnafu { err_msg: "expected create table", })?, - pb.partitions, + pb.partitions.first().cloned(), table_info, )) } @@ -633,7 +636,10 @@ impl TryFrom for PbCreateTableTask { Ok(PbCreateTableTask { table_info: serde_json::to_vec(&task.table_info).context(error::SerdeJsonSnafu)?, create_table: Some(task.create_table), - partitions: task.partitions, + partitions: match task.partitions { + Some(p) => vec![p], + None => vec![], + }, }) } } @@ -641,7 +647,7 @@ impl TryFrom for PbCreateTableTask { impl CreateTableTask { pub fn new( expr: CreateTableExpr, - partitions: Vec, + partitions: Option, table_info: RawTableInfo, ) -> CreateTableTask { CreateTableTask { @@ -701,7 +707,10 @@ impl Serialize for CreateTableTask { let pb = PbCreateTableTask { create_table: Some(self.create_table.clone()), - partitions: self.partitions.clone(), + partitions: match &self.partitions { + Some(p) => vec![p.clone()], + None => vec![], + }, table_info, }; let buf = pb.encode_to_vec(); @@ -1315,7 +1324,7 @@ mod tests { let table_info = test_table_info(1025, "foo", "bar", "baz", Arc::new(schema)); let task = CreateTableTask::new( CreateTableExpr::default(), - Vec::new(), + None, RawTableInfo::from(table_info), ); @@ -1411,8 +1420,7 @@ mod tests { ..Default::default() }; - let mut create_table_task = - CreateTableTask::new(create_table_expr, Vec::new(), raw_table_info); + let mut create_table_task = CreateTableTask::new(create_table_expr, None, raw_table_info); // Call the sort_columns method create_table_task.sort_columns(); diff --git a/src/common/meta/src/rpc/router.rs b/src/common/meta/src/rpc/router.rs index b7d14202f9..71b58fe697 100644 --- a/src/common/meta/src/rpc/router.rs +++ b/src/common/meta/src/rpc/router.rs @@ -391,6 +391,9 @@ impl From for PbRegion { } } +/// Serialized version of `PartitionDef`. +/// +/// Represent the entire partition part of one table #[derive(Debug, Clone, Deserialize, Serialize, PartialEq)] pub struct Partition { #[serde(serialize_with = "as_utf8_vec", deserialize_with = "from_utf8_vec")] diff --git a/src/meta-srv/src/procedure/tests.rs b/src/meta-srv/src/procedure/tests.rs index eda7d2c25a..f7dde90000 100644 --- a/src/meta-srv/src/procedure/tests.rs +++ b/src/meta-srv/src/procedure/tests.rs @@ -15,7 +15,6 @@ use std::collections::{HashMap, HashSet}; use std::sync::{Arc, Mutex}; -use api::v1::meta::Partition; use api::v1::region::region_request::Body as PbRegionRequest; use api::v1::region::{CreateRequest as PbCreateRegionRequest, RegionColumnDef}; use api::v1::{ColumnDataType, ColumnDef as PbColumnDef, SemanticType}; @@ -84,14 +83,7 @@ fn create_table_task(table_name: Option<&str>) -> CreateTableTask { .into(); let table_info = build_raw_table_info_from_expr(&expr); - CreateTableTask::new( - expr, - vec![Partition { - column_list: vec![], - value_list: vec![], - }], - table_info, - ) + CreateTableTask::new(expr, None, table_info) } #[test] diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index d57f575ce6..252bd93827 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -1389,12 +1389,11 @@ impl StatementExecutor { async fn create_table_procedure( &self, create_table: CreateTableExpr, - partitions: Vec, + partitions: Option, table_info: RawTableInfo, query_context: QueryContextRef, ) -> Result { - let partitions = partitions.into_iter().map(Into::into).collect(); - + let partitions = partitions.map(|p| p.into()); // to PbPartition let request = SubmitDdlTaskRequest { query_context, task: DdlTask::new_create_table(create_table, partitions, table_info), @@ -1590,7 +1589,7 @@ fn parse_partitions( create_table: &CreateTableExpr, partitions: Option, query_ctx: &QueryContextRef, -) -> Result<(Vec, Vec)> { +) -> Result<(Option, Vec)> { // If partitions are not defined by user, use the timestamp column (which has to be existed) as // the partition column, and create only one partition. let partition_columns = find_partition_columns(&partitions)?; @@ -1600,23 +1599,26 @@ fn parse_partitions( // Validates partition let mut exprs = vec![]; for partition in &partition_entries { - for bound in partition { - if let PartitionBound::Expr(expr) = bound { - exprs.push(expr.clone()); - } + if let PartitionBound::Expr(expr) = partition { + exprs.push(expr.clone()); } } MultiDimPartitionRule::try_new(partition_columns.clone(), vec![], exprs, true) .context(InvalidPartitionSnafu)?; - Ok(( - partition_entries - .into_iter() - .map(|x| MetaPartition::try_from(PartitionDef::new(partition_columns.clone(), x))) - .collect::>() + let meta_partition = if partition_entries.is_empty() { + None + } else { + Some( + MetaPartition::try_from(PartitionDef::new( + partition_columns.clone(), + partition_entries, + )) .context(DeserializePartitionSnafu)?, - partition_columns, - )) + ) + }; + + Ok((meta_partition, partition_columns)) } fn create_table_info( @@ -1727,7 +1729,7 @@ fn find_partition_entries( partitions: &Option, partition_columns: &[String], query_ctx: &QueryContextRef, -) -> Result>> { +) -> Result> { let entries = if let Some(partitions) = partitions { // extract concrete data type of partition columns let column_defs = partition_columns @@ -1756,17 +1758,17 @@ fn find_partition_entries( for partition in &partitions.exprs { let partition_expr = convert_one_expr(partition, &column_name_and_type, &query_ctx.timezone())?; - partition_exprs.push(vec![PartitionBound::Expr(partition_expr)]); + partition_exprs.push(PartitionBound::Expr(partition_expr)); } // fallback for no expr if partition_exprs.is_empty() { - partition_exprs.push(vec![PartitionBound::MaxValue]); + partition_exprs.push(PartitionBound::MaxValue); } partition_exprs } else { - vec![vec![PartitionBound::MaxValue]] + vec![PartitionBound::MaxValue] }; Ok(entries) } diff --git a/src/partition/src/partition.rs b/src/partition/src/partition.rs index 8095e4b584..5c64160d45 100644 --- a/src/partition/src/partition.rs +++ b/src/partition/src/partition.rs @@ -48,17 +48,21 @@ pub trait PartitionRule: Sync + Send { ) -> Result>; } -/// The right bound(exclusive) of partition range. +/// The bound of one partition. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] pub enum PartitionBound { + /// Deprecated since 0.9.0. Value(Value), + /// Deprecated since 0.15.0. MaxValue, Expr(crate::expr::PartitionExpr), } +/// The partition definition of one table. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PartitionDef { partition_columns: Vec, + /// Each element represents one partition. partition_bounds: Vec, }