Compare commits

...

1 Commits

Author SHA1 Message Date
Ruihang Xia
afc3f88240 only use one optional Partition definition on creating table
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-07-16 06:44:55 +08:00
9 changed files with 61 additions and 62 deletions

View File

@@ -113,15 +113,19 @@ impl TableMetadataAllocator {
table_id: TableId,
task: &CreateTableTask,
) -> Result<PhysicalTableRouteValue> {
let regions = task.partitions.len();
let num_regions = task
.partitions
.as_ref()
.map(|p| p.value_list.len())
.unwrap_or(1);
ensure!(
regions > 0,
num_regions > 0,
error::UnexpectedSnafu {
err_msg: "The number of partitions must be greater than 0"
}
);
let peers = self.peer_allocator.alloc(regions).await?;
let peers = self.peer_allocator.alloc(num_regions).await?;
debug!("Allocated peers {:?} for table {}", peers, table_id);
let region_routes = task
.partitions

View File

@@ -21,7 +21,6 @@ pub mod flownode_handler;
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use api::v1::meta::Partition;
use api::v1::{ColumnDataType, SemanticType};
use common_procedure::Status;
use datatypes::prelude::ConcreteDataType;
@@ -145,10 +144,7 @@ pub fn test_create_logical_table_task(name: &str) -> CreateTableTask {
CreateTableTask {
create_table,
// Single region
partitions: vec![Partition {
column_list: vec![],
value_list: vec![],
}],
partitions: None,
table_info,
}
}
@@ -183,10 +179,7 @@ pub fn test_create_physical_table_task(name: &str) -> CreateTableTask {
CreateTableTask {
create_table,
// Single region
partitions: vec![Partition {
column_list: vec![],
value_list: vec![],
}],
partitions: None,
table_info,
}
}

View File

@@ -15,7 +15,6 @@
use std::collections::HashMap;
use api::v1::column_def::try_as_column_schema;
use api::v1::meta::Partition;
use api::v1::{ColumnDataType, ColumnDef, CreateTableExpr, SemanticType};
use chrono::DateTime;
use common_catalog::consts::{
@@ -175,10 +174,7 @@ pub fn test_create_table_task(name: &str, table_id: TableId) -> CreateTableTask
CreateTableTask {
create_table,
// Single region
partitions: vec![Partition {
column_list: vec![],
value_list: vec![],
}],
partitions: None,
table_info,
}
}

View File

@@ -17,7 +17,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use api::region::RegionResponse;
use api::v1::meta::{Partition, Peer};
use api::v1::meta::Peer;
use api::v1::region::{region_request, RegionRequest};
use api::v1::{ColumnDataType, SemanticType};
use common_error::ext::ErrorExt;
@@ -141,10 +141,7 @@ pub(crate) fn test_create_table_task(name: &str) -> CreateTableTask {
CreateTableTask {
create_table,
// Single region
partitions: vec![Partition {
column_list: vec![],
value_list: vec![],
}],
partitions: None,
table_info,
}
}
@@ -218,7 +215,7 @@ async fn test_on_prepare_with_no_partition_err() {
let node_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(node_manager);
let mut task = test_create_table_task("foo");
task.partitions = vec![];
task.partitions = None;
task.create_table.create_if_not_exists = true;
let mut procedure = CreateTableProcedure::new(task, ddl_context);
let err = procedure.on_prepare().await.unwrap_err();

View File

@@ -96,7 +96,7 @@ impl DdlTask {
/// Creates a [`DdlTask`] to create a table.
pub fn new_create_table(
expr: CreateTableExpr,
partitions: Vec<Partition>,
partitions: Option<Partition>,
table_info: RawTableInfo,
) -> Self {
DdlTask::CreateTable(CreateTableTask::new(expr, partitions, table_info))
@@ -107,7 +107,7 @@ impl DdlTask {
DdlTask::CreateLogicalTables(
table_data
.into_iter()
.map(|(expr, table_info)| CreateTableTask::new(expr, Vec::new(), table_info))
.map(|(expr, table_info)| CreateTableTask::new(expr, None, table_info))
.collect(),
)
}
@@ -606,7 +606,10 @@ impl From<DropTableTask> for PbDropTableTask {
#[derive(Debug, PartialEq, Clone)]
pub struct CreateTableTask {
pub create_table: CreateTableExpr,
pub partitions: Vec<Partition>,
/// The partitions of the table.
///
/// If the table is created with a single region (not partitioned), this field is `None`.
pub partitions: Option<Partition>,
pub table_info: RawTableInfo,
}
@@ -620,7 +623,7 @@ impl TryFrom<PbCreateTableTask> for CreateTableTask {
pb.create_table.context(error::InvalidProtoMsgSnafu {
err_msg: "expected create table",
})?,
pb.partitions,
pb.partitions.first().cloned(),
table_info,
))
}
@@ -633,7 +636,10 @@ impl TryFrom<CreateTableTask> for PbCreateTableTask {
Ok(PbCreateTableTask {
table_info: serde_json::to_vec(&task.table_info).context(error::SerdeJsonSnafu)?,
create_table: Some(task.create_table),
partitions: task.partitions,
partitions: match task.partitions {
Some(p) => vec![p],
None => vec![],
},
})
}
}
@@ -641,7 +647,7 @@ impl TryFrom<CreateTableTask> for PbCreateTableTask {
impl CreateTableTask {
pub fn new(
expr: CreateTableExpr,
partitions: Vec<Partition>,
partitions: Option<Partition>,
table_info: RawTableInfo,
) -> CreateTableTask {
CreateTableTask {
@@ -701,7 +707,10 @@ impl Serialize for CreateTableTask {
let pb = PbCreateTableTask {
create_table: Some(self.create_table.clone()),
partitions: self.partitions.clone(),
partitions: match &self.partitions {
Some(p) => vec![p.clone()],
None => vec![],
},
table_info,
};
let buf = pb.encode_to_vec();
@@ -1315,7 +1324,7 @@ mod tests {
let table_info = test_table_info(1025, "foo", "bar", "baz", Arc::new(schema));
let task = CreateTableTask::new(
CreateTableExpr::default(),
Vec::new(),
None,
RawTableInfo::from(table_info),
);
@@ -1411,8 +1420,7 @@ mod tests {
..Default::default()
};
let mut create_table_task =
CreateTableTask::new(create_table_expr, Vec::new(), raw_table_info);
let mut create_table_task = CreateTableTask::new(create_table_expr, None, raw_table_info);
// Call the sort_columns method
create_table_task.sort_columns();

View File

@@ -391,6 +391,9 @@ impl From<Region> for PbRegion {
}
}
/// Serialized version of `PartitionDef`.
///
/// Represent the entire partition part of one table
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
pub struct Partition {
#[serde(serialize_with = "as_utf8_vec", deserialize_with = "from_utf8_vec")]

View File

@@ -15,7 +15,6 @@
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Mutex};
use api::v1::meta::Partition;
use api::v1::region::region_request::Body as PbRegionRequest;
use api::v1::region::{CreateRequest as PbCreateRegionRequest, RegionColumnDef};
use api::v1::{ColumnDataType, ColumnDef as PbColumnDef, SemanticType};
@@ -84,14 +83,7 @@ fn create_table_task(table_name: Option<&str>) -> CreateTableTask {
.into();
let table_info = build_raw_table_info_from_expr(&expr);
CreateTableTask::new(
expr,
vec![Partition {
column_list: vec![],
value_list: vec![],
}],
table_info,
)
CreateTableTask::new(expr, None, table_info)
}
#[test]

View File

@@ -1389,12 +1389,11 @@ impl StatementExecutor {
async fn create_table_procedure(
&self,
create_table: CreateTableExpr,
partitions: Vec<Partition>,
partitions: Option<Partition>,
table_info: RawTableInfo,
query_context: QueryContextRef,
) -> Result<SubmitDdlTaskResponse> {
let partitions = partitions.into_iter().map(Into::into).collect();
let partitions = partitions.map(|p| p.into()); // to PbPartition
let request = SubmitDdlTaskRequest {
query_context,
task: DdlTask::new_create_table(create_table, partitions, table_info),
@@ -1590,7 +1589,7 @@ fn parse_partitions(
create_table: &CreateTableExpr,
partitions: Option<Partitions>,
query_ctx: &QueryContextRef,
) -> Result<(Vec<MetaPartition>, Vec<String>)> {
) -> Result<(Option<MetaPartition>, Vec<String>)> {
// If partitions are not defined by user, use the timestamp column (which has to be existed) as
// the partition column, and create only one partition.
let partition_columns = find_partition_columns(&partitions)?;
@@ -1600,23 +1599,26 @@ fn parse_partitions(
// Validates partition
let mut exprs = vec![];
for partition in &partition_entries {
for bound in partition {
if let PartitionBound::Expr(expr) = bound {
exprs.push(expr.clone());
}
if let PartitionBound::Expr(expr) = partition {
exprs.push(expr.clone());
}
}
MultiDimPartitionRule::try_new(partition_columns.clone(), vec![], exprs, true)
.context(InvalidPartitionSnafu)?;
Ok((
partition_entries
.into_iter()
.map(|x| MetaPartition::try_from(PartitionDef::new(partition_columns.clone(), x)))
.collect::<std::result::Result<_, _>>()
let meta_partition = if partition_entries.is_empty() {
None
} else {
Some(
MetaPartition::try_from(PartitionDef::new(
partition_columns.clone(),
partition_entries,
))
.context(DeserializePartitionSnafu)?,
partition_columns,
))
)
};
Ok((meta_partition, partition_columns))
}
fn create_table_info(
@@ -1727,7 +1729,7 @@ fn find_partition_entries(
partitions: &Option<Partitions>,
partition_columns: &[String],
query_ctx: &QueryContextRef,
) -> Result<Vec<Vec<PartitionBound>>> {
) -> Result<Vec<PartitionBound>> {
let entries = if let Some(partitions) = partitions {
// extract concrete data type of partition columns
let column_defs = partition_columns
@@ -1756,17 +1758,17 @@ fn find_partition_entries(
for partition in &partitions.exprs {
let partition_expr =
convert_one_expr(partition, &column_name_and_type, &query_ctx.timezone())?;
partition_exprs.push(vec![PartitionBound::Expr(partition_expr)]);
partition_exprs.push(PartitionBound::Expr(partition_expr));
}
// fallback for no expr
if partition_exprs.is_empty() {
partition_exprs.push(vec![PartitionBound::MaxValue]);
partition_exprs.push(PartitionBound::MaxValue);
}
partition_exprs
} else {
vec![vec![PartitionBound::MaxValue]]
vec![PartitionBound::MaxValue]
};
Ok(entries)
}

View File

@@ -48,17 +48,21 @@ pub trait PartitionRule: Sync + Send {
) -> Result<HashMap<RegionNumber, RegionMask>>;
}
/// The right bound(exclusive) of partition range.
/// The bound of one partition.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum PartitionBound {
/// Deprecated since 0.9.0.
Value(Value),
/// Deprecated since 0.15.0.
MaxValue,
Expr(crate::expr::PartitionExpr),
}
/// The partition definition of one table.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PartitionDef {
partition_columns: Vec<String>,
/// Each element represents one partition.
partition_bounds: Vec<PartitionBound>,
}