feat: distributed alter table in region server (#2311)

* feat: distributed alter table in region server

* rebase
This commit is contained in:
LFC
2023-09-05 14:07:50 +08:00
committed by Ruihang Xia
parent 922e342b63
commit 711e27d9fa
33 changed files with 902 additions and 381 deletions

View File

@@ -229,15 +229,16 @@ impl CatalogManager for FrontendCatalogManager {
None
}
})
.collect();
.collect::<Vec<_>>();
let column_defs = expr_factory::column_schemas_to_defs(request.schema.column_schemas)
.map_err(|e| {
InvalidSystemTableDefSnafu {
err_msg: e.to_string(),
}
.build()
})?;
let column_defs =
expr_factory::column_schemas_to_defs(request.schema.column_schemas, &primary_keys)
.map_err(|e| {
InvalidSystemTableDefSnafu {
err_msg: e.to_string(),
}
.build()
})?;
let mut create_table = CreateTableExpr {
catalog_name: request.catalog_name,

View File

@@ -18,7 +18,7 @@ use api::helper::ColumnDataTypeWrapper;
use api::v1::alter_expr::Kind;
use api::v1::{
AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, CreateTableExpr, DropColumn,
DropColumns, RenameTable,
DropColumns, RenameTable, SemanticType,
};
use common_error::ext::BoxedError;
use common_grpc_expr::util::ColumnExpr;
@@ -106,12 +106,14 @@ pub(crate) async fn create_external_expr(
serde_json::to_string(&meta).context(EncodeJsonSnafu)?,
);
let primary_keys = vec![];
let expr = CreateTableExpr {
catalog_name,
schema_name,
table_name,
desc: "".to_string(),
column_defs: column_schemas_to_defs(schema.column_schemas)?,
column_defs: column_schemas_to_defs(schema.column_schemas, &primary_keys)?,
time_index: "".to_string(),
primary_keys: vec![],
create_if_not_exists: create.if_not_exists,
@@ -135,14 +137,17 @@ pub fn create_to_expr(create: &CreateTable, query_ctx: QueryContextRef) -> Resul
&TableOptions::try_from(&to_lowercase_options_map(&create.options))
.context(UnrecognizedTableOptionSnafu)?,
);
let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
let expr = CreateTableExpr {
catalog_name,
schema_name,
table_name,
desc: "".to_string(),
column_defs: columns_to_expr(&create.columns, &time_index)?,
column_defs: columns_to_expr(&create.columns, &time_index, &primary_keys)?,
time_index,
primary_keys: find_primary_keys(&create.columns, &create.constraints)?,
primary_keys,
create_if_not_exists: create.if_not_exists,
table_options,
table_id: None,
@@ -231,16 +236,22 @@ pub fn find_time_index(constraints: &[TableConstraint]) -> Result<String> {
Ok(time_index.first().unwrap().to_string())
}
fn columns_to_expr(column_defs: &[ColumnDef], time_index: &str) -> Result<Vec<api::v1::ColumnDef>> {
fn columns_to_expr(
column_defs: &[ColumnDef],
time_index: &str,
primary_keys: &[String],
) -> Result<Vec<api::v1::ColumnDef>> {
let column_schemas = column_defs
.iter()
.map(|c| column_def_to_schema(c, c.name.to_string() == time_index).context(ParseSqlSnafu))
.collect::<Result<Vec<ColumnSchema>>>()?;
column_schemas_to_defs(column_schemas)
column_schemas_to_defs(column_schemas, primary_keys)
}
pub(crate) fn column_schemas_to_defs(
column_schemas: Vec<ColumnSchema>,
primary_keys: &[String],
) -> Result<Vec<api::v1::ColumnDef>> {
let column_datatypes = column_schemas
.iter()
@@ -255,9 +266,17 @@ pub(crate) fn column_schemas_to_defs(
.iter()
.zip(column_datatypes)
.map(|(schema, datatype)| {
let semantic_type = if schema.is_time_index() {
SemanticType::Timestamp
} else if primary_keys.contains(&schema.name) {
SemanticType::Tag
} else {
SemanticType::Field
} as i32;
Ok(api::v1::ColumnDef {
name: schema.name.clone(),
datatype: datatype as i32,
data_type: datatype as i32,
is_nullable: schema.is_nullable(),
default_constraint: match schema.default_constraint() {
None => vec![],
@@ -269,6 +288,7 @@ pub(crate) fn column_schemas_to_defs(
})?
}
},
semantic_type,
})
})
.collect()
@@ -300,7 +320,6 @@ pub(crate) fn to_alter_expr(
.map_err(BoxedError::new)
.context(ExternalSnafu)?,
),
is_key: false,
location: location.as_ref().map(From::from),
}],
}),
@@ -319,7 +338,6 @@ pub(crate) fn to_alter_expr(
schema_name,
table_name,
kind: Some(kind),
..Default::default()
})
}

View File

@@ -184,7 +184,6 @@ impl<'a> Inserter<'a> {
schema_name: schema_name.to_string(),
table_name: table_name.to_string(),
kind: Some(Kind::AddColumns(add_columns)),
..Default::default()
};
let req = Request::Ddl(DdlRequest {

View File

@@ -407,7 +407,7 @@ impl DistInstance {
Ok(())
}
async fn handle_alter_table(&self, mut expr: AlterExpr) -> Result<Output> {
async fn handle_alter_table(&self, expr: AlterExpr) -> Result<Output> {
let catalog_name = if expr.catalog_name.is_empty() {
DEFAULT_CATALOG_NAME
} else {
@@ -432,8 +432,6 @@ impl DistInstance {
})?;
let table_id = table.table_info().ident.table_id;
expr.table_id = Some(api::v1::TableId { id: table_id });
self.verify_alter(table_id, table.table_info(), expr.clone())?;
let req = SubmitDdlTaskRequest {
@@ -824,7 +822,7 @@ fn find_partition_entries(
for column in column_defs {
let column_name = &column.name;
let data_type = ConcreteDataType::from(
ColumnDataTypeWrapper::try_new(column.datatype).context(ColumnDataTypeSnafu)?,
ColumnDataTypeWrapper::try_new(column.data_type).context(ColumnDataTypeSnafu)?,
);
column_name_and_type.push((column_name, data_type));
}