mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
Compare commits
1 Commits
c112cdf241
...
correct-re
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
afc3f88240 |
@@ -113,15 +113,19 @@ impl TableMetadataAllocator {
|
||||
table_id: TableId,
|
||||
task: &CreateTableTask,
|
||||
) -> Result<PhysicalTableRouteValue> {
|
||||
let regions = task.partitions.len();
|
||||
let num_regions = task
|
||||
.partitions
|
||||
.as_ref()
|
||||
.map(|p| p.value_list.len())
|
||||
.unwrap_or(1);
|
||||
ensure!(
|
||||
regions > 0,
|
||||
num_regions > 0,
|
||||
error::UnexpectedSnafu {
|
||||
err_msg: "The number of partitions must be greater than 0"
|
||||
}
|
||||
);
|
||||
|
||||
let peers = self.peer_allocator.alloc(regions).await?;
|
||||
let peers = self.peer_allocator.alloc(num_regions).await?;
|
||||
debug!("Allocated peers {:?} for table {}", peers, table_id);
|
||||
let region_routes = task
|
||||
.partitions
|
||||
|
||||
@@ -21,7 +21,6 @@ pub mod flownode_handler;
|
||||
use std::assert_matches::assert_matches;
|
||||
use std::collections::HashMap;
|
||||
|
||||
use api::v1::meta::Partition;
|
||||
use api::v1::{ColumnDataType, SemanticType};
|
||||
use common_procedure::Status;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
@@ -145,10 +144,7 @@ pub fn test_create_logical_table_task(name: &str) -> CreateTableTask {
|
||||
CreateTableTask {
|
||||
create_table,
|
||||
// Single region
|
||||
partitions: vec![Partition {
|
||||
column_list: vec![],
|
||||
value_list: vec![],
|
||||
}],
|
||||
partitions: None,
|
||||
table_info,
|
||||
}
|
||||
}
|
||||
@@ -183,10 +179,7 @@ pub fn test_create_physical_table_task(name: &str) -> CreateTableTask {
|
||||
CreateTableTask {
|
||||
create_table,
|
||||
// Single region
|
||||
partitions: vec![Partition {
|
||||
column_list: vec![],
|
||||
value_list: vec![],
|
||||
}],
|
||||
partitions: None,
|
||||
table_info,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,7 +15,6 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use api::v1::column_def::try_as_column_schema;
|
||||
use api::v1::meta::Partition;
|
||||
use api::v1::{ColumnDataType, ColumnDef, CreateTableExpr, SemanticType};
|
||||
use chrono::DateTime;
|
||||
use common_catalog::consts::{
|
||||
@@ -175,10 +174,7 @@ pub fn test_create_table_task(name: &str, table_id: TableId) -> CreateTableTask
|
||||
CreateTableTask {
|
||||
create_table,
|
||||
// Single region
|
||||
partitions: vec![Partition {
|
||||
column_list: vec![],
|
||||
value_list: vec![],
|
||||
}],
|
||||
partitions: None,
|
||||
table_info,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,7 +17,7 @@ use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::region::RegionResponse;
|
||||
use api::v1::meta::{Partition, Peer};
|
||||
use api::v1::meta::Peer;
|
||||
use api::v1::region::{region_request, RegionRequest};
|
||||
use api::v1::{ColumnDataType, SemanticType};
|
||||
use common_error::ext::ErrorExt;
|
||||
@@ -141,10 +141,7 @@ pub(crate) fn test_create_table_task(name: &str) -> CreateTableTask {
|
||||
CreateTableTask {
|
||||
create_table,
|
||||
// Single region
|
||||
partitions: vec![Partition {
|
||||
column_list: vec![],
|
||||
value_list: vec![],
|
||||
}],
|
||||
partitions: None,
|
||||
table_info,
|
||||
}
|
||||
}
|
||||
@@ -218,7 +215,7 @@ async fn test_on_prepare_with_no_partition_err() {
|
||||
let node_manager = Arc::new(MockDatanodeManager::new(()));
|
||||
let ddl_context = new_ddl_context(node_manager);
|
||||
let mut task = test_create_table_task("foo");
|
||||
task.partitions = vec![];
|
||||
task.partitions = None;
|
||||
task.create_table.create_if_not_exists = true;
|
||||
let mut procedure = CreateTableProcedure::new(task, ddl_context);
|
||||
let err = procedure.on_prepare().await.unwrap_err();
|
||||
|
||||
@@ -96,7 +96,7 @@ impl DdlTask {
|
||||
/// Creates a [`DdlTask`] to create a table.
|
||||
pub fn new_create_table(
|
||||
expr: CreateTableExpr,
|
||||
partitions: Vec<Partition>,
|
||||
partitions: Option<Partition>,
|
||||
table_info: RawTableInfo,
|
||||
) -> Self {
|
||||
DdlTask::CreateTable(CreateTableTask::new(expr, partitions, table_info))
|
||||
@@ -107,7 +107,7 @@ impl DdlTask {
|
||||
DdlTask::CreateLogicalTables(
|
||||
table_data
|
||||
.into_iter()
|
||||
.map(|(expr, table_info)| CreateTableTask::new(expr, Vec::new(), table_info))
|
||||
.map(|(expr, table_info)| CreateTableTask::new(expr, None, table_info))
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
@@ -606,7 +606,10 @@ impl From<DropTableTask> for PbDropTableTask {
|
||||
#[derive(Debug, PartialEq, Clone)]
|
||||
pub struct CreateTableTask {
|
||||
pub create_table: CreateTableExpr,
|
||||
pub partitions: Vec<Partition>,
|
||||
/// The partitions of the table.
|
||||
///
|
||||
/// If the table is created with a single region (not partitioned), this field is `None`.
|
||||
pub partitions: Option<Partition>,
|
||||
pub table_info: RawTableInfo,
|
||||
}
|
||||
|
||||
@@ -620,7 +623,7 @@ impl TryFrom<PbCreateTableTask> for CreateTableTask {
|
||||
pb.create_table.context(error::InvalidProtoMsgSnafu {
|
||||
err_msg: "expected create table",
|
||||
})?,
|
||||
pb.partitions,
|
||||
pb.partitions.first().cloned(),
|
||||
table_info,
|
||||
))
|
||||
}
|
||||
@@ -633,7 +636,10 @@ impl TryFrom<CreateTableTask> for PbCreateTableTask {
|
||||
Ok(PbCreateTableTask {
|
||||
table_info: serde_json::to_vec(&task.table_info).context(error::SerdeJsonSnafu)?,
|
||||
create_table: Some(task.create_table),
|
||||
partitions: task.partitions,
|
||||
partitions: match task.partitions {
|
||||
Some(p) => vec![p],
|
||||
None => vec![],
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -641,7 +647,7 @@ impl TryFrom<CreateTableTask> for PbCreateTableTask {
|
||||
impl CreateTableTask {
|
||||
pub fn new(
|
||||
expr: CreateTableExpr,
|
||||
partitions: Vec<Partition>,
|
||||
partitions: Option<Partition>,
|
||||
table_info: RawTableInfo,
|
||||
) -> CreateTableTask {
|
||||
CreateTableTask {
|
||||
@@ -701,7 +707,10 @@ impl Serialize for CreateTableTask {
|
||||
|
||||
let pb = PbCreateTableTask {
|
||||
create_table: Some(self.create_table.clone()),
|
||||
partitions: self.partitions.clone(),
|
||||
partitions: match &self.partitions {
|
||||
Some(p) => vec![p.clone()],
|
||||
None => vec![],
|
||||
},
|
||||
table_info,
|
||||
};
|
||||
let buf = pb.encode_to_vec();
|
||||
@@ -1315,7 +1324,7 @@ mod tests {
|
||||
let table_info = test_table_info(1025, "foo", "bar", "baz", Arc::new(schema));
|
||||
let task = CreateTableTask::new(
|
||||
CreateTableExpr::default(),
|
||||
Vec::new(),
|
||||
None,
|
||||
RawTableInfo::from(table_info),
|
||||
);
|
||||
|
||||
@@ -1411,8 +1420,7 @@ mod tests {
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let mut create_table_task =
|
||||
CreateTableTask::new(create_table_expr, Vec::new(), raw_table_info);
|
||||
let mut create_table_task = CreateTableTask::new(create_table_expr, None, raw_table_info);
|
||||
|
||||
// Call the sort_columns method
|
||||
create_table_task.sort_columns();
|
||||
|
||||
@@ -391,6 +391,9 @@ impl From<Region> for PbRegion {
|
||||
}
|
||||
}
|
||||
|
||||
/// Serialized version of `PartitionDef`.
|
||||
///
|
||||
/// Represent the entire partition part of one table
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
|
||||
pub struct Partition {
|
||||
#[serde(serialize_with = "as_utf8_vec", deserialize_with = "from_utf8_vec")]
|
||||
|
||||
@@ -15,7 +15,6 @@
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use api::v1::meta::Partition;
|
||||
use api::v1::region::region_request::Body as PbRegionRequest;
|
||||
use api::v1::region::{CreateRequest as PbCreateRegionRequest, RegionColumnDef};
|
||||
use api::v1::{ColumnDataType, ColumnDef as PbColumnDef, SemanticType};
|
||||
@@ -84,14 +83,7 @@ fn create_table_task(table_name: Option<&str>) -> CreateTableTask {
|
||||
.into();
|
||||
|
||||
let table_info = build_raw_table_info_from_expr(&expr);
|
||||
CreateTableTask::new(
|
||||
expr,
|
||||
vec![Partition {
|
||||
column_list: vec![],
|
||||
value_list: vec![],
|
||||
}],
|
||||
table_info,
|
||||
)
|
||||
CreateTableTask::new(expr, None, table_info)
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -1389,12 +1389,11 @@ impl StatementExecutor {
|
||||
async fn create_table_procedure(
|
||||
&self,
|
||||
create_table: CreateTableExpr,
|
||||
partitions: Vec<Partition>,
|
||||
partitions: Option<Partition>,
|
||||
table_info: RawTableInfo,
|
||||
query_context: QueryContextRef,
|
||||
) -> Result<SubmitDdlTaskResponse> {
|
||||
let partitions = partitions.into_iter().map(Into::into).collect();
|
||||
|
||||
let partitions = partitions.map(|p| p.into()); // to PbPartition
|
||||
let request = SubmitDdlTaskRequest {
|
||||
query_context,
|
||||
task: DdlTask::new_create_table(create_table, partitions, table_info),
|
||||
@@ -1590,7 +1589,7 @@ fn parse_partitions(
|
||||
create_table: &CreateTableExpr,
|
||||
partitions: Option<Partitions>,
|
||||
query_ctx: &QueryContextRef,
|
||||
) -> Result<(Vec<MetaPartition>, Vec<String>)> {
|
||||
) -> Result<(Option<MetaPartition>, Vec<String>)> {
|
||||
// If partitions are not defined by user, use the timestamp column (which has to be existed) as
|
||||
// the partition column, and create only one partition.
|
||||
let partition_columns = find_partition_columns(&partitions)?;
|
||||
@@ -1600,23 +1599,26 @@ fn parse_partitions(
|
||||
// Validates partition
|
||||
let mut exprs = vec![];
|
||||
for partition in &partition_entries {
|
||||
for bound in partition {
|
||||
if let PartitionBound::Expr(expr) = bound {
|
||||
exprs.push(expr.clone());
|
||||
}
|
||||
if let PartitionBound::Expr(expr) = partition {
|
||||
exprs.push(expr.clone());
|
||||
}
|
||||
}
|
||||
MultiDimPartitionRule::try_new(partition_columns.clone(), vec![], exprs, true)
|
||||
.context(InvalidPartitionSnafu)?;
|
||||
|
||||
Ok((
|
||||
partition_entries
|
||||
.into_iter()
|
||||
.map(|x| MetaPartition::try_from(PartitionDef::new(partition_columns.clone(), x)))
|
||||
.collect::<std::result::Result<_, _>>()
|
||||
let meta_partition = if partition_entries.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(
|
||||
MetaPartition::try_from(PartitionDef::new(
|
||||
partition_columns.clone(),
|
||||
partition_entries,
|
||||
))
|
||||
.context(DeserializePartitionSnafu)?,
|
||||
partition_columns,
|
||||
))
|
||||
)
|
||||
};
|
||||
|
||||
Ok((meta_partition, partition_columns))
|
||||
}
|
||||
|
||||
fn create_table_info(
|
||||
@@ -1727,7 +1729,7 @@ fn find_partition_entries(
|
||||
partitions: &Option<Partitions>,
|
||||
partition_columns: &[String],
|
||||
query_ctx: &QueryContextRef,
|
||||
) -> Result<Vec<Vec<PartitionBound>>> {
|
||||
) -> Result<Vec<PartitionBound>> {
|
||||
let entries = if let Some(partitions) = partitions {
|
||||
// extract concrete data type of partition columns
|
||||
let column_defs = partition_columns
|
||||
@@ -1756,17 +1758,17 @@ fn find_partition_entries(
|
||||
for partition in &partitions.exprs {
|
||||
let partition_expr =
|
||||
convert_one_expr(partition, &column_name_and_type, &query_ctx.timezone())?;
|
||||
partition_exprs.push(vec![PartitionBound::Expr(partition_expr)]);
|
||||
partition_exprs.push(PartitionBound::Expr(partition_expr));
|
||||
}
|
||||
|
||||
// fallback for no expr
|
||||
if partition_exprs.is_empty() {
|
||||
partition_exprs.push(vec![PartitionBound::MaxValue]);
|
||||
partition_exprs.push(PartitionBound::MaxValue);
|
||||
}
|
||||
|
||||
partition_exprs
|
||||
} else {
|
||||
vec![vec![PartitionBound::MaxValue]]
|
||||
vec![PartitionBound::MaxValue]
|
||||
};
|
||||
Ok(entries)
|
||||
}
|
||||
|
||||
@@ -48,17 +48,21 @@ pub trait PartitionRule: Sync + Send {
|
||||
) -> Result<HashMap<RegionNumber, RegionMask>>;
|
||||
}
|
||||
|
||||
/// The right bound(exclusive) of partition range.
|
||||
/// The bound of one partition.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
|
||||
pub enum PartitionBound {
|
||||
/// Deprecated since 0.9.0.
|
||||
Value(Value),
|
||||
/// Deprecated since 0.15.0.
|
||||
MaxValue,
|
||||
Expr(crate::expr::PartitionExpr),
|
||||
}
|
||||
|
||||
/// The partition definition of one table.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct PartitionDef {
|
||||
partition_columns: Vec<String>,
|
||||
/// Each element represents one partition.
|
||||
partition_bounds: Vec<PartitionBound>,
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user