mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-23 22:49:58 +00:00
Compare commits
1 Commits
docs/vecto
...
correct-re
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
afc3f88240 |
@@ -113,15 +113,19 @@ impl TableMetadataAllocator {
|
|||||||
table_id: TableId,
|
table_id: TableId,
|
||||||
task: &CreateTableTask,
|
task: &CreateTableTask,
|
||||||
) -> Result<PhysicalTableRouteValue> {
|
) -> Result<PhysicalTableRouteValue> {
|
||||||
let regions = task.partitions.len();
|
let num_regions = task
|
||||||
|
.partitions
|
||||||
|
.as_ref()
|
||||||
|
.map(|p| p.value_list.len())
|
||||||
|
.unwrap_or(1);
|
||||||
ensure!(
|
ensure!(
|
||||||
regions > 0,
|
num_regions > 0,
|
||||||
error::UnexpectedSnafu {
|
error::UnexpectedSnafu {
|
||||||
err_msg: "The number of partitions must be greater than 0"
|
err_msg: "The number of partitions must be greater than 0"
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
let peers = self.peer_allocator.alloc(regions).await?;
|
let peers = self.peer_allocator.alloc(num_regions).await?;
|
||||||
debug!("Allocated peers {:?} for table {}", peers, table_id);
|
debug!("Allocated peers {:?} for table {}", peers, table_id);
|
||||||
let region_routes = task
|
let region_routes = task
|
||||||
.partitions
|
.partitions
|
||||||
|
|||||||
@@ -21,7 +21,6 @@ pub mod flownode_handler;
|
|||||||
use std::assert_matches::assert_matches;
|
use std::assert_matches::assert_matches;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
|
||||||
use api::v1::meta::Partition;
|
|
||||||
use api::v1::{ColumnDataType, SemanticType};
|
use api::v1::{ColumnDataType, SemanticType};
|
||||||
use common_procedure::Status;
|
use common_procedure::Status;
|
||||||
use datatypes::prelude::ConcreteDataType;
|
use datatypes::prelude::ConcreteDataType;
|
||||||
@@ -145,10 +144,7 @@ pub fn test_create_logical_table_task(name: &str) -> CreateTableTask {
|
|||||||
CreateTableTask {
|
CreateTableTask {
|
||||||
create_table,
|
create_table,
|
||||||
// Single region
|
// Single region
|
||||||
partitions: vec![Partition {
|
partitions: None,
|
||||||
column_list: vec![],
|
|
||||||
value_list: vec![],
|
|
||||||
}],
|
|
||||||
table_info,
|
table_info,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -183,10 +179,7 @@ pub fn test_create_physical_table_task(name: &str) -> CreateTableTask {
|
|||||||
CreateTableTask {
|
CreateTableTask {
|
||||||
create_table,
|
create_table,
|
||||||
// Single region
|
// Single region
|
||||||
partitions: vec![Partition {
|
partitions: None,
|
||||||
column_list: vec![],
|
|
||||||
value_list: vec![],
|
|
||||||
}],
|
|
||||||
table_info,
|
table_info,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -15,7 +15,6 @@
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
|
||||||
use api::v1::column_def::try_as_column_schema;
|
use api::v1::column_def::try_as_column_schema;
|
||||||
use api::v1::meta::Partition;
|
|
||||||
use api::v1::{ColumnDataType, ColumnDef, CreateTableExpr, SemanticType};
|
use api::v1::{ColumnDataType, ColumnDef, CreateTableExpr, SemanticType};
|
||||||
use chrono::DateTime;
|
use chrono::DateTime;
|
||||||
use common_catalog::consts::{
|
use common_catalog::consts::{
|
||||||
@@ -175,10 +174,7 @@ pub fn test_create_table_task(name: &str, table_id: TableId) -> CreateTableTask
|
|||||||
CreateTableTask {
|
CreateTableTask {
|
||||||
create_table,
|
create_table,
|
||||||
// Single region
|
// Single region
|
||||||
partitions: vec![Partition {
|
partitions: None,
|
||||||
column_list: vec![],
|
|
||||||
value_list: vec![],
|
|
||||||
}],
|
|
||||||
table_info,
|
table_info,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,7 +17,7 @@ use std::collections::HashMap;
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use api::region::RegionResponse;
|
use api::region::RegionResponse;
|
||||||
use api::v1::meta::{Partition, Peer};
|
use api::v1::meta::Peer;
|
||||||
use api::v1::region::{region_request, RegionRequest};
|
use api::v1::region::{region_request, RegionRequest};
|
||||||
use api::v1::{ColumnDataType, SemanticType};
|
use api::v1::{ColumnDataType, SemanticType};
|
||||||
use common_error::ext::ErrorExt;
|
use common_error::ext::ErrorExt;
|
||||||
@@ -141,10 +141,7 @@ pub(crate) fn test_create_table_task(name: &str) -> CreateTableTask {
|
|||||||
CreateTableTask {
|
CreateTableTask {
|
||||||
create_table,
|
create_table,
|
||||||
// Single region
|
// Single region
|
||||||
partitions: vec![Partition {
|
partitions: None,
|
||||||
column_list: vec![],
|
|
||||||
value_list: vec![],
|
|
||||||
}],
|
|
||||||
table_info,
|
table_info,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -218,7 +215,7 @@ async fn test_on_prepare_with_no_partition_err() {
|
|||||||
let node_manager = Arc::new(MockDatanodeManager::new(()));
|
let node_manager = Arc::new(MockDatanodeManager::new(()));
|
||||||
let ddl_context = new_ddl_context(node_manager);
|
let ddl_context = new_ddl_context(node_manager);
|
||||||
let mut task = test_create_table_task("foo");
|
let mut task = test_create_table_task("foo");
|
||||||
task.partitions = vec![];
|
task.partitions = None;
|
||||||
task.create_table.create_if_not_exists = true;
|
task.create_table.create_if_not_exists = true;
|
||||||
let mut procedure = CreateTableProcedure::new(task, ddl_context);
|
let mut procedure = CreateTableProcedure::new(task, ddl_context);
|
||||||
let err = procedure.on_prepare().await.unwrap_err();
|
let err = procedure.on_prepare().await.unwrap_err();
|
||||||
|
|||||||
@@ -96,7 +96,7 @@ impl DdlTask {
|
|||||||
/// Creates a [`DdlTask`] to create a table.
|
/// Creates a [`DdlTask`] to create a table.
|
||||||
pub fn new_create_table(
|
pub fn new_create_table(
|
||||||
expr: CreateTableExpr,
|
expr: CreateTableExpr,
|
||||||
partitions: Vec<Partition>,
|
partitions: Option<Partition>,
|
||||||
table_info: RawTableInfo,
|
table_info: RawTableInfo,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
DdlTask::CreateTable(CreateTableTask::new(expr, partitions, table_info))
|
DdlTask::CreateTable(CreateTableTask::new(expr, partitions, table_info))
|
||||||
@@ -107,7 +107,7 @@ impl DdlTask {
|
|||||||
DdlTask::CreateLogicalTables(
|
DdlTask::CreateLogicalTables(
|
||||||
table_data
|
table_data
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|(expr, table_info)| CreateTableTask::new(expr, Vec::new(), table_info))
|
.map(|(expr, table_info)| CreateTableTask::new(expr, None, table_info))
|
||||||
.collect(),
|
.collect(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
@@ -606,7 +606,10 @@ impl From<DropTableTask> for PbDropTableTask {
|
|||||||
#[derive(Debug, PartialEq, Clone)]
|
#[derive(Debug, PartialEq, Clone)]
|
||||||
pub struct CreateTableTask {
|
pub struct CreateTableTask {
|
||||||
pub create_table: CreateTableExpr,
|
pub create_table: CreateTableExpr,
|
||||||
pub partitions: Vec<Partition>,
|
/// The partitions of the table.
|
||||||
|
///
|
||||||
|
/// If the table is created with a single region (not partitioned), this field is `None`.
|
||||||
|
pub partitions: Option<Partition>,
|
||||||
pub table_info: RawTableInfo,
|
pub table_info: RawTableInfo,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -620,7 +623,7 @@ impl TryFrom<PbCreateTableTask> for CreateTableTask {
|
|||||||
pb.create_table.context(error::InvalidProtoMsgSnafu {
|
pb.create_table.context(error::InvalidProtoMsgSnafu {
|
||||||
err_msg: "expected create table",
|
err_msg: "expected create table",
|
||||||
})?,
|
})?,
|
||||||
pb.partitions,
|
pb.partitions.first().cloned(),
|
||||||
table_info,
|
table_info,
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
@@ -633,7 +636,10 @@ impl TryFrom<CreateTableTask> for PbCreateTableTask {
|
|||||||
Ok(PbCreateTableTask {
|
Ok(PbCreateTableTask {
|
||||||
table_info: serde_json::to_vec(&task.table_info).context(error::SerdeJsonSnafu)?,
|
table_info: serde_json::to_vec(&task.table_info).context(error::SerdeJsonSnafu)?,
|
||||||
create_table: Some(task.create_table),
|
create_table: Some(task.create_table),
|
||||||
partitions: task.partitions,
|
partitions: match task.partitions {
|
||||||
|
Some(p) => vec![p],
|
||||||
|
None => vec![],
|
||||||
|
},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -641,7 +647,7 @@ impl TryFrom<CreateTableTask> for PbCreateTableTask {
|
|||||||
impl CreateTableTask {
|
impl CreateTableTask {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
expr: CreateTableExpr,
|
expr: CreateTableExpr,
|
||||||
partitions: Vec<Partition>,
|
partitions: Option<Partition>,
|
||||||
table_info: RawTableInfo,
|
table_info: RawTableInfo,
|
||||||
) -> CreateTableTask {
|
) -> CreateTableTask {
|
||||||
CreateTableTask {
|
CreateTableTask {
|
||||||
@@ -701,7 +707,10 @@ impl Serialize for CreateTableTask {
|
|||||||
|
|
||||||
let pb = PbCreateTableTask {
|
let pb = PbCreateTableTask {
|
||||||
create_table: Some(self.create_table.clone()),
|
create_table: Some(self.create_table.clone()),
|
||||||
partitions: self.partitions.clone(),
|
partitions: match &self.partitions {
|
||||||
|
Some(p) => vec![p.clone()],
|
||||||
|
None => vec![],
|
||||||
|
},
|
||||||
table_info,
|
table_info,
|
||||||
};
|
};
|
||||||
let buf = pb.encode_to_vec();
|
let buf = pb.encode_to_vec();
|
||||||
@@ -1315,7 +1324,7 @@ mod tests {
|
|||||||
let table_info = test_table_info(1025, "foo", "bar", "baz", Arc::new(schema));
|
let table_info = test_table_info(1025, "foo", "bar", "baz", Arc::new(schema));
|
||||||
let task = CreateTableTask::new(
|
let task = CreateTableTask::new(
|
||||||
CreateTableExpr::default(),
|
CreateTableExpr::default(),
|
||||||
Vec::new(),
|
None,
|
||||||
RawTableInfo::from(table_info),
|
RawTableInfo::from(table_info),
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -1411,8 +1420,7 @@ mod tests {
|
|||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut create_table_task =
|
let mut create_table_task = CreateTableTask::new(create_table_expr, None, raw_table_info);
|
||||||
CreateTableTask::new(create_table_expr, Vec::new(), raw_table_info);
|
|
||||||
|
|
||||||
// Call the sort_columns method
|
// Call the sort_columns method
|
||||||
create_table_task.sort_columns();
|
create_table_task.sort_columns();
|
||||||
|
|||||||
@@ -391,6 +391,9 @@ impl From<Region> for PbRegion {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Serialized version of `PartitionDef`.
|
||||||
|
///
|
||||||
|
/// Represent the entire partition part of one table
|
||||||
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
|
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
|
||||||
pub struct Partition {
|
pub struct Partition {
|
||||||
#[serde(serialize_with = "as_utf8_vec", deserialize_with = "from_utf8_vec")]
|
#[serde(serialize_with = "as_utf8_vec", deserialize_with = "from_utf8_vec")]
|
||||||
|
|||||||
@@ -15,7 +15,6 @@
|
|||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
use api::v1::meta::Partition;
|
|
||||||
use api::v1::region::region_request::Body as PbRegionRequest;
|
use api::v1::region::region_request::Body as PbRegionRequest;
|
||||||
use api::v1::region::{CreateRequest as PbCreateRegionRequest, RegionColumnDef};
|
use api::v1::region::{CreateRequest as PbCreateRegionRequest, RegionColumnDef};
|
||||||
use api::v1::{ColumnDataType, ColumnDef as PbColumnDef, SemanticType};
|
use api::v1::{ColumnDataType, ColumnDef as PbColumnDef, SemanticType};
|
||||||
@@ -84,14 +83,7 @@ fn create_table_task(table_name: Option<&str>) -> CreateTableTask {
|
|||||||
.into();
|
.into();
|
||||||
|
|
||||||
let table_info = build_raw_table_info_from_expr(&expr);
|
let table_info = build_raw_table_info_from_expr(&expr);
|
||||||
CreateTableTask::new(
|
CreateTableTask::new(expr, None, table_info)
|
||||||
expr,
|
|
||||||
vec![Partition {
|
|
||||||
column_list: vec![],
|
|
||||||
value_list: vec![],
|
|
||||||
}],
|
|
||||||
table_info,
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|||||||
@@ -1389,12 +1389,11 @@ impl StatementExecutor {
|
|||||||
async fn create_table_procedure(
|
async fn create_table_procedure(
|
||||||
&self,
|
&self,
|
||||||
create_table: CreateTableExpr,
|
create_table: CreateTableExpr,
|
||||||
partitions: Vec<Partition>,
|
partitions: Option<Partition>,
|
||||||
table_info: RawTableInfo,
|
table_info: RawTableInfo,
|
||||||
query_context: QueryContextRef,
|
query_context: QueryContextRef,
|
||||||
) -> Result<SubmitDdlTaskResponse> {
|
) -> Result<SubmitDdlTaskResponse> {
|
||||||
let partitions = partitions.into_iter().map(Into::into).collect();
|
let partitions = partitions.map(|p| p.into()); // to PbPartition
|
||||||
|
|
||||||
let request = SubmitDdlTaskRequest {
|
let request = SubmitDdlTaskRequest {
|
||||||
query_context,
|
query_context,
|
||||||
task: DdlTask::new_create_table(create_table, partitions, table_info),
|
task: DdlTask::new_create_table(create_table, partitions, table_info),
|
||||||
@@ -1590,7 +1589,7 @@ fn parse_partitions(
|
|||||||
create_table: &CreateTableExpr,
|
create_table: &CreateTableExpr,
|
||||||
partitions: Option<Partitions>,
|
partitions: Option<Partitions>,
|
||||||
query_ctx: &QueryContextRef,
|
query_ctx: &QueryContextRef,
|
||||||
) -> Result<(Vec<MetaPartition>, Vec<String>)> {
|
) -> Result<(Option<MetaPartition>, Vec<String>)> {
|
||||||
// If partitions are not defined by user, use the timestamp column (which has to be existed) as
|
// If partitions are not defined by user, use the timestamp column (which has to be existed) as
|
||||||
// the partition column, and create only one partition.
|
// the partition column, and create only one partition.
|
||||||
let partition_columns = find_partition_columns(&partitions)?;
|
let partition_columns = find_partition_columns(&partitions)?;
|
||||||
@@ -1600,23 +1599,26 @@ fn parse_partitions(
|
|||||||
// Validates partition
|
// Validates partition
|
||||||
let mut exprs = vec![];
|
let mut exprs = vec![];
|
||||||
for partition in &partition_entries {
|
for partition in &partition_entries {
|
||||||
for bound in partition {
|
if let PartitionBound::Expr(expr) = partition {
|
||||||
if let PartitionBound::Expr(expr) = bound {
|
exprs.push(expr.clone());
|
||||||
exprs.push(expr.clone());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
MultiDimPartitionRule::try_new(partition_columns.clone(), vec![], exprs, true)
|
MultiDimPartitionRule::try_new(partition_columns.clone(), vec![], exprs, true)
|
||||||
.context(InvalidPartitionSnafu)?;
|
.context(InvalidPartitionSnafu)?;
|
||||||
|
|
||||||
Ok((
|
let meta_partition = if partition_entries.is_empty() {
|
||||||
partition_entries
|
None
|
||||||
.into_iter()
|
} else {
|
||||||
.map(|x| MetaPartition::try_from(PartitionDef::new(partition_columns.clone(), x)))
|
Some(
|
||||||
.collect::<std::result::Result<_, _>>()
|
MetaPartition::try_from(PartitionDef::new(
|
||||||
|
partition_columns.clone(),
|
||||||
|
partition_entries,
|
||||||
|
))
|
||||||
.context(DeserializePartitionSnafu)?,
|
.context(DeserializePartitionSnafu)?,
|
||||||
partition_columns,
|
)
|
||||||
))
|
};
|
||||||
|
|
||||||
|
Ok((meta_partition, partition_columns))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn create_table_info(
|
fn create_table_info(
|
||||||
@@ -1727,7 +1729,7 @@ fn find_partition_entries(
|
|||||||
partitions: &Option<Partitions>,
|
partitions: &Option<Partitions>,
|
||||||
partition_columns: &[String],
|
partition_columns: &[String],
|
||||||
query_ctx: &QueryContextRef,
|
query_ctx: &QueryContextRef,
|
||||||
) -> Result<Vec<Vec<PartitionBound>>> {
|
) -> Result<Vec<PartitionBound>> {
|
||||||
let entries = if let Some(partitions) = partitions {
|
let entries = if let Some(partitions) = partitions {
|
||||||
// extract concrete data type of partition columns
|
// extract concrete data type of partition columns
|
||||||
let column_defs = partition_columns
|
let column_defs = partition_columns
|
||||||
@@ -1756,17 +1758,17 @@ fn find_partition_entries(
|
|||||||
for partition in &partitions.exprs {
|
for partition in &partitions.exprs {
|
||||||
let partition_expr =
|
let partition_expr =
|
||||||
convert_one_expr(partition, &column_name_and_type, &query_ctx.timezone())?;
|
convert_one_expr(partition, &column_name_and_type, &query_ctx.timezone())?;
|
||||||
partition_exprs.push(vec![PartitionBound::Expr(partition_expr)]);
|
partition_exprs.push(PartitionBound::Expr(partition_expr));
|
||||||
}
|
}
|
||||||
|
|
||||||
// fallback for no expr
|
// fallback for no expr
|
||||||
if partition_exprs.is_empty() {
|
if partition_exprs.is_empty() {
|
||||||
partition_exprs.push(vec![PartitionBound::MaxValue]);
|
partition_exprs.push(PartitionBound::MaxValue);
|
||||||
}
|
}
|
||||||
|
|
||||||
partition_exprs
|
partition_exprs
|
||||||
} else {
|
} else {
|
||||||
vec![vec![PartitionBound::MaxValue]]
|
vec![PartitionBound::MaxValue]
|
||||||
};
|
};
|
||||||
Ok(entries)
|
Ok(entries)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -48,17 +48,21 @@ pub trait PartitionRule: Sync + Send {
|
|||||||
) -> Result<HashMap<RegionNumber, RegionMask>>;
|
) -> Result<HashMap<RegionNumber, RegionMask>>;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The right bound(exclusive) of partition range.
|
/// The bound of one partition.
|
||||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
|
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
|
||||||
pub enum PartitionBound {
|
pub enum PartitionBound {
|
||||||
|
/// Deprecated since 0.9.0.
|
||||||
Value(Value),
|
Value(Value),
|
||||||
|
/// Deprecated since 0.15.0.
|
||||||
MaxValue,
|
MaxValue,
|
||||||
Expr(crate::expr::PartitionExpr),
|
Expr(crate::expr::PartitionExpr),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The partition definition of one table.
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
pub struct PartitionDef {
|
pub struct PartitionDef {
|
||||||
partition_columns: Vec<String>,
|
partition_columns: Vec<String>,
|
||||||
|
/// Each element represents one partition.
|
||||||
partition_bounds: Vec<PartitionBound>,
|
partition_bounds: Vec<PartitionBound>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user