Compare commits

...

1 Commits

Author SHA1 Message Date
Ruihang Xia
afc3f88240 only use one optional Partition definition on creating table
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-07-16 06:44:55 +08:00
9 changed files with 61 additions and 62 deletions

View File

@@ -113,15 +113,19 @@ impl TableMetadataAllocator {
table_id: TableId, table_id: TableId,
task: &CreateTableTask, task: &CreateTableTask,
) -> Result<PhysicalTableRouteValue> { ) -> Result<PhysicalTableRouteValue> {
let regions = task.partitions.len(); let num_regions = task
.partitions
.as_ref()
.map(|p| p.value_list.len())
.unwrap_or(1);
ensure!( ensure!(
regions > 0, num_regions > 0,
error::UnexpectedSnafu { error::UnexpectedSnafu {
err_msg: "The number of partitions must be greater than 0" err_msg: "The number of partitions must be greater than 0"
} }
); );
let peers = self.peer_allocator.alloc(regions).await?; let peers = self.peer_allocator.alloc(num_regions).await?;
debug!("Allocated peers {:?} for table {}", peers, table_id); debug!("Allocated peers {:?} for table {}", peers, table_id);
let region_routes = task let region_routes = task
.partitions .partitions

View File

@@ -21,7 +21,6 @@ pub mod flownode_handler;
use std::assert_matches::assert_matches; use std::assert_matches::assert_matches;
use std::collections::HashMap; use std::collections::HashMap;
use api::v1::meta::Partition;
use api::v1::{ColumnDataType, SemanticType}; use api::v1::{ColumnDataType, SemanticType};
use common_procedure::Status; use common_procedure::Status;
use datatypes::prelude::ConcreteDataType; use datatypes::prelude::ConcreteDataType;
@@ -145,10 +144,7 @@ pub fn test_create_logical_table_task(name: &str) -> CreateTableTask {
CreateTableTask { CreateTableTask {
create_table, create_table,
// Single region // Single region
partitions: vec![Partition { partitions: None,
column_list: vec![],
value_list: vec![],
}],
table_info, table_info,
} }
} }
@@ -183,10 +179,7 @@ pub fn test_create_physical_table_task(name: &str) -> CreateTableTask {
CreateTableTask { CreateTableTask {
create_table, create_table,
// Single region // Single region
partitions: vec![Partition { partitions: None,
column_list: vec![],
value_list: vec![],
}],
table_info, table_info,
} }
} }

View File

@@ -15,7 +15,6 @@
use std::collections::HashMap; use std::collections::HashMap;
use api::v1::column_def::try_as_column_schema; use api::v1::column_def::try_as_column_schema;
use api::v1::meta::Partition;
use api::v1::{ColumnDataType, ColumnDef, CreateTableExpr, SemanticType}; use api::v1::{ColumnDataType, ColumnDef, CreateTableExpr, SemanticType};
use chrono::DateTime; use chrono::DateTime;
use common_catalog::consts::{ use common_catalog::consts::{
@@ -175,10 +174,7 @@ pub fn test_create_table_task(name: &str, table_id: TableId) -> CreateTableTask
CreateTableTask { CreateTableTask {
create_table, create_table,
// Single region // Single region
partitions: vec![Partition { partitions: None,
column_list: vec![],
value_list: vec![],
}],
table_info, table_info,
} }
} }

View File

@@ -17,7 +17,7 @@ use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use api::region::RegionResponse; use api::region::RegionResponse;
use api::v1::meta::{Partition, Peer}; use api::v1::meta::Peer;
use api::v1::region::{region_request, RegionRequest}; use api::v1::region::{region_request, RegionRequest};
use api::v1::{ColumnDataType, SemanticType}; use api::v1::{ColumnDataType, SemanticType};
use common_error::ext::ErrorExt; use common_error::ext::ErrorExt;
@@ -141,10 +141,7 @@ pub(crate) fn test_create_table_task(name: &str) -> CreateTableTask {
CreateTableTask { CreateTableTask {
create_table, create_table,
// Single region // Single region
partitions: vec![Partition { partitions: None,
column_list: vec![],
value_list: vec![],
}],
table_info, table_info,
} }
} }
@@ -218,7 +215,7 @@ async fn test_on_prepare_with_no_partition_err() {
let node_manager = Arc::new(MockDatanodeManager::new(())); let node_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(node_manager); let ddl_context = new_ddl_context(node_manager);
let mut task = test_create_table_task("foo"); let mut task = test_create_table_task("foo");
task.partitions = vec![]; task.partitions = None;
task.create_table.create_if_not_exists = true; task.create_table.create_if_not_exists = true;
let mut procedure = CreateTableProcedure::new(task, ddl_context); let mut procedure = CreateTableProcedure::new(task, ddl_context);
let err = procedure.on_prepare().await.unwrap_err(); let err = procedure.on_prepare().await.unwrap_err();

View File

@@ -96,7 +96,7 @@ impl DdlTask {
/// Creates a [`DdlTask`] to create a table. /// Creates a [`DdlTask`] to create a table.
pub fn new_create_table( pub fn new_create_table(
expr: CreateTableExpr, expr: CreateTableExpr,
partitions: Vec<Partition>, partitions: Option<Partition>,
table_info: RawTableInfo, table_info: RawTableInfo,
) -> Self { ) -> Self {
DdlTask::CreateTable(CreateTableTask::new(expr, partitions, table_info)) DdlTask::CreateTable(CreateTableTask::new(expr, partitions, table_info))
@@ -107,7 +107,7 @@ impl DdlTask {
DdlTask::CreateLogicalTables( DdlTask::CreateLogicalTables(
table_data table_data
.into_iter() .into_iter()
.map(|(expr, table_info)| CreateTableTask::new(expr, Vec::new(), table_info)) .map(|(expr, table_info)| CreateTableTask::new(expr, None, table_info))
.collect(), .collect(),
) )
} }
@@ -606,7 +606,10 @@ impl From<DropTableTask> for PbDropTableTask {
#[derive(Debug, PartialEq, Clone)] #[derive(Debug, PartialEq, Clone)]
pub struct CreateTableTask { pub struct CreateTableTask {
pub create_table: CreateTableExpr, pub create_table: CreateTableExpr,
pub partitions: Vec<Partition>, /// The partitions of the table.
///
/// If the table is created with a single region (not partitioned), this field is `None`.
pub partitions: Option<Partition>,
pub table_info: RawTableInfo, pub table_info: RawTableInfo,
} }
@@ -620,7 +623,7 @@ impl TryFrom<PbCreateTableTask> for CreateTableTask {
pb.create_table.context(error::InvalidProtoMsgSnafu { pb.create_table.context(error::InvalidProtoMsgSnafu {
err_msg: "expected create table", err_msg: "expected create table",
})?, })?,
pb.partitions, pb.partitions.first().cloned(),
table_info, table_info,
)) ))
} }
@@ -633,7 +636,10 @@ impl TryFrom<CreateTableTask> for PbCreateTableTask {
Ok(PbCreateTableTask { Ok(PbCreateTableTask {
table_info: serde_json::to_vec(&task.table_info).context(error::SerdeJsonSnafu)?, table_info: serde_json::to_vec(&task.table_info).context(error::SerdeJsonSnafu)?,
create_table: Some(task.create_table), create_table: Some(task.create_table),
partitions: task.partitions, partitions: match task.partitions {
Some(p) => vec![p],
None => vec![],
},
}) })
} }
} }
@@ -641,7 +647,7 @@ impl TryFrom<CreateTableTask> for PbCreateTableTask {
impl CreateTableTask { impl CreateTableTask {
pub fn new( pub fn new(
expr: CreateTableExpr, expr: CreateTableExpr,
partitions: Vec<Partition>, partitions: Option<Partition>,
table_info: RawTableInfo, table_info: RawTableInfo,
) -> CreateTableTask { ) -> CreateTableTask {
CreateTableTask { CreateTableTask {
@@ -701,7 +707,10 @@ impl Serialize for CreateTableTask {
let pb = PbCreateTableTask { let pb = PbCreateTableTask {
create_table: Some(self.create_table.clone()), create_table: Some(self.create_table.clone()),
partitions: self.partitions.clone(), partitions: match &self.partitions {
Some(p) => vec![p.clone()],
None => vec![],
},
table_info, table_info,
}; };
let buf = pb.encode_to_vec(); let buf = pb.encode_to_vec();
@@ -1315,7 +1324,7 @@ mod tests {
let table_info = test_table_info(1025, "foo", "bar", "baz", Arc::new(schema)); let table_info = test_table_info(1025, "foo", "bar", "baz", Arc::new(schema));
let task = CreateTableTask::new( let task = CreateTableTask::new(
CreateTableExpr::default(), CreateTableExpr::default(),
Vec::new(), None,
RawTableInfo::from(table_info), RawTableInfo::from(table_info),
); );
@@ -1411,8 +1420,7 @@ mod tests {
..Default::default() ..Default::default()
}; };
let mut create_table_task = let mut create_table_task = CreateTableTask::new(create_table_expr, None, raw_table_info);
CreateTableTask::new(create_table_expr, Vec::new(), raw_table_info);
// Call the sort_columns method // Call the sort_columns method
create_table_task.sort_columns(); create_table_task.sort_columns();

View File

@@ -391,6 +391,9 @@ impl From<Region> for PbRegion {
} }
} }
/// Serialized version of `PartitionDef`.
///
/// Represent the entire partition part of one table
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)] #[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
pub struct Partition { pub struct Partition {
#[serde(serialize_with = "as_utf8_vec", deserialize_with = "from_utf8_vec")] #[serde(serialize_with = "as_utf8_vec", deserialize_with = "from_utf8_vec")]

View File

@@ -15,7 +15,6 @@
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use api::v1::meta::Partition;
use api::v1::region::region_request::Body as PbRegionRequest; use api::v1::region::region_request::Body as PbRegionRequest;
use api::v1::region::{CreateRequest as PbCreateRegionRequest, RegionColumnDef}; use api::v1::region::{CreateRequest as PbCreateRegionRequest, RegionColumnDef};
use api::v1::{ColumnDataType, ColumnDef as PbColumnDef, SemanticType}; use api::v1::{ColumnDataType, ColumnDef as PbColumnDef, SemanticType};
@@ -84,14 +83,7 @@ fn create_table_task(table_name: Option<&str>) -> CreateTableTask {
.into(); .into();
let table_info = build_raw_table_info_from_expr(&expr); let table_info = build_raw_table_info_from_expr(&expr);
CreateTableTask::new( CreateTableTask::new(expr, None, table_info)
expr,
vec![Partition {
column_list: vec![],
value_list: vec![],
}],
table_info,
)
} }
#[test] #[test]

View File

@@ -1389,12 +1389,11 @@ impl StatementExecutor {
async fn create_table_procedure( async fn create_table_procedure(
&self, &self,
create_table: CreateTableExpr, create_table: CreateTableExpr,
partitions: Vec<Partition>, partitions: Option<Partition>,
table_info: RawTableInfo, table_info: RawTableInfo,
query_context: QueryContextRef, query_context: QueryContextRef,
) -> Result<SubmitDdlTaskResponse> { ) -> Result<SubmitDdlTaskResponse> {
let partitions = partitions.into_iter().map(Into::into).collect(); let partitions = partitions.map(|p| p.into()); // to PbPartition
let request = SubmitDdlTaskRequest { let request = SubmitDdlTaskRequest {
query_context, query_context,
task: DdlTask::new_create_table(create_table, partitions, table_info), task: DdlTask::new_create_table(create_table, partitions, table_info),
@@ -1590,7 +1589,7 @@ fn parse_partitions(
create_table: &CreateTableExpr, create_table: &CreateTableExpr,
partitions: Option<Partitions>, partitions: Option<Partitions>,
query_ctx: &QueryContextRef, query_ctx: &QueryContextRef,
) -> Result<(Vec<MetaPartition>, Vec<String>)> { ) -> Result<(Option<MetaPartition>, Vec<String>)> {
// If partitions are not defined by user, use the timestamp column (which has to be existed) as // If partitions are not defined by user, use the timestamp column (which has to be existed) as
// the partition column, and create only one partition. // the partition column, and create only one partition.
let partition_columns = find_partition_columns(&partitions)?; let partition_columns = find_partition_columns(&partitions)?;
@@ -1600,23 +1599,26 @@ fn parse_partitions(
// Validates partition // Validates partition
let mut exprs = vec![]; let mut exprs = vec![];
for partition in &partition_entries { for partition in &partition_entries {
for bound in partition { if let PartitionBound::Expr(expr) = partition {
if let PartitionBound::Expr(expr) = bound { exprs.push(expr.clone());
exprs.push(expr.clone());
}
} }
} }
MultiDimPartitionRule::try_new(partition_columns.clone(), vec![], exprs, true) MultiDimPartitionRule::try_new(partition_columns.clone(), vec![], exprs, true)
.context(InvalidPartitionSnafu)?; .context(InvalidPartitionSnafu)?;
Ok(( let meta_partition = if partition_entries.is_empty() {
partition_entries None
.into_iter() } else {
.map(|x| MetaPartition::try_from(PartitionDef::new(partition_columns.clone(), x))) Some(
.collect::<std::result::Result<_, _>>() MetaPartition::try_from(PartitionDef::new(
partition_columns.clone(),
partition_entries,
))
.context(DeserializePartitionSnafu)?, .context(DeserializePartitionSnafu)?,
partition_columns, )
)) };
Ok((meta_partition, partition_columns))
} }
fn create_table_info( fn create_table_info(
@@ -1727,7 +1729,7 @@ fn find_partition_entries(
partitions: &Option<Partitions>, partitions: &Option<Partitions>,
partition_columns: &[String], partition_columns: &[String],
query_ctx: &QueryContextRef, query_ctx: &QueryContextRef,
) -> Result<Vec<Vec<PartitionBound>>> { ) -> Result<Vec<PartitionBound>> {
let entries = if let Some(partitions) = partitions { let entries = if let Some(partitions) = partitions {
// extract concrete data type of partition columns // extract concrete data type of partition columns
let column_defs = partition_columns let column_defs = partition_columns
@@ -1756,17 +1758,17 @@ fn find_partition_entries(
for partition in &partitions.exprs { for partition in &partitions.exprs {
let partition_expr = let partition_expr =
convert_one_expr(partition, &column_name_and_type, &query_ctx.timezone())?; convert_one_expr(partition, &column_name_and_type, &query_ctx.timezone())?;
partition_exprs.push(vec![PartitionBound::Expr(partition_expr)]); partition_exprs.push(PartitionBound::Expr(partition_expr));
} }
// fallback for no expr // fallback for no expr
if partition_exprs.is_empty() { if partition_exprs.is_empty() {
partition_exprs.push(vec![PartitionBound::MaxValue]); partition_exprs.push(PartitionBound::MaxValue);
} }
partition_exprs partition_exprs
} else { } else {
vec![vec![PartitionBound::MaxValue]] vec![PartitionBound::MaxValue]
}; };
Ok(entries) Ok(entries)
} }

View File

@@ -48,17 +48,21 @@ pub trait PartitionRule: Sync + Send {
) -> Result<HashMap<RegionNumber, RegionMask>>; ) -> Result<HashMap<RegionNumber, RegionMask>>;
} }
/// The right bound(exclusive) of partition range. /// The bound of one partition.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum PartitionBound { pub enum PartitionBound {
/// Deprecated since 0.9.0.
Value(Value), Value(Value),
/// Deprecated since 0.15.0.
MaxValue, MaxValue,
Expr(crate::expr::PartitionExpr), Expr(crate::expr::PartitionExpr),
} }
/// The partition definition of one table.
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PartitionDef { pub struct PartitionDef {
partition_columns: Vec<String>, partition_columns: Vec<String>,
/// Each element represents one partition.
partition_bounds: Vec<PartitionBound>, partition_bounds: Vec<PartitionBound>,
} }