refactor: refactor alter table procedure (#3678)

* refactor: refactor alter table procedure

* chore: apply suggestions from CR

* chore: remove `alter_expr` and `alter_kind`
This commit is contained in:
Weny Xu
2024-04-09 18:35:51 +08:00
committed by GitHub
parent 2c0c7759ee
commit fda1523ced
16 changed files with 895 additions and 449 deletions

View File

@@ -140,7 +140,7 @@ impl TableMetadataBencher {
let start = Instant::now();
let _ = self
.table_metadata_manager
.rename_table(table_info.unwrap(), new_table_name)
.rename_table(&table_info.unwrap(), new_table_name)
.await;
start.elapsed()

View File

@@ -46,7 +46,7 @@ impl AlterLogicalTablesProcedure {
// Updates physical table's metadata
self.context
.table_metadata_manager
.update_table_info(physical_table_info.clone(), new_raw_table_info)
.update_table_info(physical_table_info, new_raw_table_info)
.await?;
Ok(())

View File

@@ -12,52 +12,47 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod check;
mod metadata;
mod region_request;
mod update_metadata;
use std::vec;
use api::v1::alter_expr::Kind;
use api::v1::region::{
alter_request, region_request, AddColumn, AddColumns, AlterRequest, DropColumn, DropColumns,
RegionColumnDef, RegionRequest, RegionRequestHeader,
};
use api::v1::{AlterExpr, RenameTable};
use api::v1::RenameTable;
use async_trait::async_trait;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_grpc_expr::alter_expr_to_request;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, Status, StringKey,
};
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{debug, info};
use futures::future;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::{ColumnId, RegionId};
use snafu::ResultExt;
use store_api::storage::RegionId;
use strum::AsRefStr;
use table::metadata::{RawTableInfo, TableId, TableInfo};
use table::requests::AlterKind;
use table::metadata::{RawTableInfo, TableId};
use table::table_reference::TableReference;
use crate::cache_invalidator::Context;
use crate::ddl::utils::add_peer_context_if_needed;
use crate::ddl::DdlContext;
use crate::error::{self, ConvertAlterTableRequestSnafu, Error, InvalidProtoMsgSnafu, Result};
use crate::error::{Error, Result};
use crate::instruction::CacheIdent;
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::DeserializedValueWithBytes;
use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
use crate::metrics;
use crate::rpc::ddl::AlterTableTask;
use crate::rpc::router::{find_leader_regions, find_leaders};
use crate::table_name::TableName;
/// The alter table procedure
pub struct AlterTableProcedure {
// The runtime context.
context: DdlContext,
// The serialized data.
data: AlterTableData,
/// proto alter Kind for adding/dropping columns.
kind: Option<alter_request::Kind>,
}
impl AlterTableProcedure {
@@ -65,123 +60,36 @@ impl AlterTableProcedure {
pub fn new(
cluster_id: u64,
table_id: TableId,
task: AlterTableTask,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
context: DdlContext,
) -> Result<Self> {
let alter_kind = task
.alter_table
.kind
.as_ref()
.context(InvalidProtoMsgSnafu {
err_msg: "'kind' is absent",
})?;
let (kind, next_column_id) =
create_proto_alter_kind(&table_info_value.table_info, alter_kind)?;
debug!(
"New AlterTableProcedure, kind: {:?}, next_column_id: {:?}",
kind, next_column_id
);
task.validate()?;
Ok(Self {
context,
data: AlterTableData::new(task, table_info_value, cluster_id, next_column_id),
kind,
data: AlterTableData::new(task, table_id, cluster_id),
})
}
pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data: AlterTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
let alter_kind = data
.task
.alter_table
.kind
.as_ref()
.context(InvalidProtoMsgSnafu {
err_msg: "'kind' is absent",
})
.map_err(ProcedureError::external)?;
let (kind, next_column_id) =
create_proto_alter_kind(&data.table_info_value.table_info, alter_kind)
.map_err(ProcedureError::external)?;
assert_eq!(data.next_column_id, next_column_id);
Ok(AlterTableProcedure {
context,
data,
kind,
})
Ok(AlterTableProcedure { context, data })
}
// Checks whether the table exists.
async fn on_prepare(&mut self) -> Result<Status> {
let alter_expr = &self.alter_expr();
let catalog = &alter_expr.catalog_name;
let schema = &alter_expr.schema_name;
let alter_kind = self.alter_kind()?;
let manager = &self.context.table_metadata_manager;
if let Kind::RenameTable(RenameTable { new_table_name }) = alter_kind {
let new_table_name_key = TableNameKey::new(catalog, schema, new_table_name);
let exist = manager
.table_name_manager()
.exists(new_table_name_key)
.await?;
ensure!(
!exist,
error::TableAlreadyExistsSnafu {
table_name: TableName::from(new_table_name_key).to_string(),
}
)
}
let table_name_key = TableNameKey::new(catalog, schema, &alter_expr.table_name);
let exist = manager.table_name_manager().exists(table_name_key).await?;
ensure!(
exist,
error::TableNotFoundSnafu {
table_name: TableName::from(table_name_key).to_string()
}
);
pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
self.check_alter().await?;
self.fill_table_info().await?;
// Safety: Checked in `AlterTableProcedure::new`.
let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
if matches!(alter_kind, Kind::RenameTable { .. }) {
self.data.state = AlterTableState::UpdateMetadata;
} else {
self.data.state = AlterTableState::SubmitAlterRegionRequests;
};
Ok(Status::executing(true))
}
fn alter_expr(&self) -> &AlterExpr {
&self.data.task.alter_table
}
fn alter_kind(&self) -> Result<&Kind> {
self.alter_expr()
.kind
.as_ref()
.context(InvalidProtoMsgSnafu {
err_msg: "'kind' is absent",
})
}
pub fn create_alter_region_request(&self, region_id: RegionId) -> Result<AlterRequest> {
let table_info = self.data.table_info();
Ok(AlterRequest {
region_id: region_id.as_u64(),
schema_version: table_info.ident.version,
kind: self.kind.clone(),
})
}
pub async fn submit_alter_region_requests(&mut self) -> Result<Status> {
let table_id = self.data.table_id();
let (_, physical_table_route) = self
@@ -200,30 +108,17 @@ impl AlterTableProcedure {
for region in regions {
let region_id = RegionId::new(table_id, region);
let request = self.create_alter_region_request(region_id)?;
let request = RegionRequest {
header: Some(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(region_request::Body::Alter(request)),
};
let request = self.make_alter_region_request(region_id)?;
debug!("Submitting {request:?} to {datanode}");
let datanode = datanode.clone();
let requester = requester.clone();
alter_region_tasks.push(async move {
if let Err(err) = requester.handle(request).await {
if err.status_code() != StatusCode::RequestOutdated {
// Treat request outdated as success.
// The engine will throw this code when the schema version not match.
// As this procedure has locked the table, the only reason for this error
// is procedure is succeeded before and is retrying.
return Err(add_peer_context_if_needed(datanode)(err));
}
}
Ok(())
requester
.handle(request)
.await
.map_err(add_peer_context_if_needed(datanode))
});
}
}
@@ -238,91 +133,39 @@ impl AlterTableProcedure {
Ok(Status::executing(true))
}
/// Update table metadata for rename table operation.
async fn on_update_metadata_for_rename(&self, new_table_name: String) -> Result<()> {
let table_metadata_manager = &self.context.table_metadata_manager;
let current_table_info_value = self.data.table_info_value.clone();
table_metadata_manager
.rename_table(current_table_info_value, new_table_name)
.await?;
Ok(())
}
async fn on_update_metadata_for_alter(&self, new_table_info: RawTableInfo) -> Result<()> {
let table_metadata_manager = &self.context.table_metadata_manager;
let current_table_info_value = self.data.table_info_value.clone();
table_metadata_manager
.update_table_info(current_table_info_value, new_table_info)
.await?;
Ok(())
}
fn build_new_table_info(&self) -> Result<TableInfo> {
// Builds new_meta
let table_info = TableInfo::try_from(self.data.table_info().clone())
.context(error::ConvertRawTableInfoSnafu)?;
let table_ref = self.data.table_ref();
let request = alter_expr_to_request(self.data.table_id(), self.alter_expr().clone())
.context(ConvertAlterTableRequestSnafu)?;
let new_meta = table_info
.meta
.builder_with_alter_kind(table_ref.table, &request.alter_kind, false)
.context(error::TableSnafu)?
.build()
.with_context(|_| error::BuildTableMetaSnafu {
table_name: table_ref.table,
})?;
let mut new_info = table_info.clone();
new_info.meta = new_meta;
new_info.ident.version = table_info.ident.version + 1;
if let Some(column_id) = self.data.next_column_id {
new_info.meta.next_column_id = new_info.meta.next_column_id.max(column_id);
}
if let AlterKind::RenameTable { new_table_name } = &request.alter_kind {
new_info.name = new_table_name.to_string();
}
Ok(new_info)
}
/// Update table metadata.
async fn on_update_metadata(&mut self) -> Result<Status> {
pub(crate) async fn on_update_metadata(&mut self) -> Result<Status> {
let table_id = self.data.table_id();
let table_ref = self.data.table_ref();
let new_info = self.build_new_table_info()?;
// Safety: checked before.
let table_info_value = self.data.table_info_value.as_ref().unwrap();
let new_info = self.build_new_table_info(&table_info_value.table_info)?;
debug!(
"starting update table: {} metadata, new table info {:?}",
"Starting update table: {} metadata, new table info {:?}",
table_ref.to_string(),
new_info
);
if let Kind::RenameTable(RenameTable { new_table_name }) = self.alter_kind()? {
self.on_update_metadata_for_rename(new_table_name.to_string())
// Safety: Checked in `AlterTableProcedure::new`.
let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
if let Kind::RenameTable(RenameTable { new_table_name }) = alter_kind {
self.on_update_metadata_for_rename(new_table_name.to_string(), table_info_value)
.await?;
} else {
self.on_update_metadata_for_alter(new_info.into()).await?;
self.on_update_metadata_for_alter(new_info.into(), table_info_value)
.await?;
}
info!("Updated table metadata for table {table_ref}, table_id: {table_id}");
self.data.state = AlterTableState::InvalidateTableCache;
Ok(Status::executing(true))
}
/// Broadcasts the invalidating table cache instructions.
async fn on_broadcast(&mut self) -> Result<Status> {
let alter_kind = self.alter_kind()?;
// Safety: Checked in `AlterTableProcedure::new`.
let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
let cache_invalidator = &self.context.cache_invalidator;
let cache_keys = if matches!(alter_kind, Kind::RenameTable { .. }) {
vec![CacheIdent::TableName(self.data.table_ref().into())]
@@ -348,7 +191,9 @@ impl AlterTableProcedure {
lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
lock_key.push(TableLock::Write(table_id).into());
if let Ok(Kind::RenameTable(RenameTable { new_table_name })) = self.alter_kind() {
// Safety: Checked in `AlterTableProcedure::new`.
let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
if let Kind::RenameTable(RenameTable { new_table_name }) = alter_kind {
lock_key.push(
TableNameLock::new(table_ref.catalog, table_ref.schema, new_table_name).into(),
)
@@ -403,8 +248,9 @@ impl Procedure for AlterTableProcedure {
#[derive(Debug, Serialize, Deserialize, AsRefStr)]
enum AlterTableState {
/// Prepares to alter the table
/// Prepares to alter the table.
Prepare,
/// Sends alter region requests to Datanode.
SubmitAlterRegionRequests,
/// Updates table metadata.
UpdateMetadata,
@@ -412,30 +258,25 @@ enum AlterTableState {
InvalidateTableCache,
}
// The serialized data of alter table.
#[derive(Debug, Serialize, Deserialize)]
pub struct AlterTableData {
cluster_id: u64,
state: AlterTableState,
task: AlterTableTask,
table_id: TableId,
/// Table info value before alteration.
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
/// Next column id of the table if the task adds columns to the table.
next_column_id: Option<ColumnId>,
table_info_value: Option<DeserializedValueWithBytes<TableInfoValue>>,
}
impl AlterTableData {
pub fn new(
task: AlterTableTask,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
cluster_id: u64,
next_column_id: Option<ColumnId>,
) -> Self {
pub fn new(task: AlterTableTask, table_id: TableId, cluster_id: u64) -> Self {
Self {
state: AlterTableState::Prepare,
task,
table_info_value,
table_id,
cluster_id,
next_column_id,
table_info_value: None,
}
}
@@ -444,76 +285,12 @@ impl AlterTableData {
}
fn table_id(&self) -> TableId {
self.table_info().ident.table_id
self.table_id
}
fn table_info(&self) -> &RawTableInfo {
&self.table_info_value.table_info
}
}
/// Creates region proto alter kind from `table_info` and `alter_kind`.
///
/// Returns the kind and next column id if it adds new columns.
///
/// # Panics
/// Panics if kind is rename.
pub fn create_proto_alter_kind(
table_info: &RawTableInfo,
alter_kind: &Kind,
) -> Result<(Option<alter_request::Kind>, Option<ColumnId>)> {
match alter_kind {
Kind::AddColumns(x) => {
let mut next_column_id = table_info.meta.next_column_id;
let add_columns = x
.add_columns
.iter()
.map(|add_column| {
let column_def =
add_column
.column_def
.as_ref()
.context(InvalidProtoMsgSnafu {
err_msg: "'column_def' is absent",
})?;
let column_id = next_column_id;
next_column_id += 1;
let column_def = RegionColumnDef {
column_def: Some(column_def.clone()),
column_id,
};
Ok(AddColumn {
column_def: Some(column_def),
location: add_column.location.clone(),
})
})
.collect::<Result<Vec<_>>>()?;
Ok((
Some(alter_request::Kind::AddColumns(AddColumns { add_columns })),
Some(next_column_id),
))
}
Kind::DropColumns(x) => {
let drop_columns = x
.drop_columns
.iter()
.map(|x| DropColumn {
name: x.name.clone(),
})
.collect::<Vec<_>>();
Ok((
Some(alter_request::Kind::DropColumns(DropColumns {
drop_columns,
})),
None,
))
}
Kind::RenameTable(_) => Ok((None, None)),
fn table_info(&self) -> Option<&RawTableInfo> {
self.table_info_value
.as_ref()
.map(|value| &value.table_info)
}
}

View File

@@ -0,0 +1,62 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::alter_expr::Kind;
use api::v1::RenameTable;
use common_catalog::format_full_table_name;
use snafu::ensure;
use crate::ddl::alter_table::AlterTableProcedure;
use crate::error::{self, Result};
use crate::key::table_name::TableNameKey;
impl AlterTableProcedure {
/// Checks:
/// - The new table name doesn't exist (rename).
/// - Table exists.
pub(crate) async fn check_alter(&self) -> Result<()> {
let alter_expr = &self.data.task.alter_table;
let catalog = &alter_expr.catalog_name;
let schema = &alter_expr.schema_name;
let table_name = &alter_expr.table_name;
// Safety: Checked in `AlterTableProcedure::new`.
let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
let manager = &self.context.table_metadata_manager;
if let Kind::RenameTable(RenameTable { new_table_name }) = alter_kind {
let new_table_name_key = TableNameKey::new(catalog, schema, new_table_name);
let exists = manager
.table_name_manager()
.exists(new_table_name_key)
.await?;
ensure!(
!exists,
error::TableAlreadyExistsSnafu {
table_name: format_full_table_name(catalog, schema, new_table_name),
}
)
}
let table_name_key = TableNameKey::new(catalog, schema, table_name);
let exists = manager.table_name_manager().exists(table_name_key).await?;
ensure!(
exists,
error::TableNotFoundSnafu {
table_name: format_full_table_name(catalog, schema, &alter_expr.table_name),
}
);
Ok(())
}
}

View File

@@ -0,0 +1,42 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_catalog::format_full_table_name;
use snafu::OptionExt;
use crate::ddl::alter_table::AlterTableProcedure;
use crate::error::{self, Result};
impl AlterTableProcedure {
/// Fetches the table info.
pub(crate) async fn fill_table_info(&mut self) -> Result<()> {
let table_id = self.data.table_id();
let alter_expr = &self.data.task.alter_table;
let catalog = &alter_expr.catalog_name;
let schema = &alter_expr.schema_name;
let table_name = &alter_expr.table_name;
let table_info_value = self
.context
.table_metadata_manager
.table_info_manager()
.get(table_id)
.await?
.with_context(|| error::TableNotFoundSnafu {
table_name: format_full_table_name(catalog, schema, table_name),
})?;
self.data.table_info_value = Some(table_info_value);
Ok(())
}
}

View File

@@ -0,0 +1,258 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::alter_expr::Kind;
use api::v1::region::region_request::Body;
use api::v1::region::{
alter_request, AddColumn, AddColumns, AlterRequest, DropColumn, DropColumns, RegionColumnDef,
RegionRequest, RegionRequestHeader,
};
use common_telemetry::tracing_context::TracingContext;
use snafu::OptionExt;
use store_api::storage::RegionId;
use table::metadata::RawTableInfo;
use crate::ddl::alter_table::AlterTableProcedure;
use crate::error::{InvalidProtoMsgSnafu, Result};
impl AlterTableProcedure {
/// Makes alter region request.
pub(crate) fn make_alter_region_request(&self, region_id: RegionId) -> Result<RegionRequest> {
// Safety: Checked in `AlterTableProcedure::new`.
let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
// Safety: checked
let table_info = self.data.table_info().unwrap();
let kind = create_proto_alter_kind(table_info, alter_kind)?;
Ok(RegionRequest {
header: Some(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(Body::Alter(AlterRequest {
region_id: region_id.as_u64(),
schema_version: table_info.ident.version,
kind,
})),
})
}
}
/// Creates region proto alter kind from `table_info` and `alter_kind`.
///
/// Returns the kind and next column id if it adds new columns.
fn create_proto_alter_kind(
table_info: &RawTableInfo,
alter_kind: &Kind,
) -> Result<Option<alter_request::Kind>> {
match alter_kind {
Kind::AddColumns(x) => {
let mut next_column_id = table_info.meta.next_column_id;
let add_columns = x
.add_columns
.iter()
.map(|add_column| {
let column_def =
add_column
.column_def
.as_ref()
.context(InvalidProtoMsgSnafu {
err_msg: "'column_def' is absent",
})?;
let column_id = next_column_id;
next_column_id += 1;
let column_def = RegionColumnDef {
column_def: Some(column_def.clone()),
column_id,
};
Ok(AddColumn {
column_def: Some(column_def),
location: add_column.location.clone(),
})
})
.collect::<Result<Vec<_>>>()?;
Ok(Some(alter_request::Kind::AddColumns(AddColumns {
add_columns,
})))
}
Kind::DropColumns(x) => {
let drop_columns = x
.drop_columns
.iter()
.map(|x| DropColumn {
name: x.name.clone(),
})
.collect::<Vec<_>>();
Ok(Some(alter_request::Kind::DropColumns(DropColumns {
drop_columns,
})))
}
Kind::RenameTable(_) => Ok(None),
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::add_column_location::LocationType;
use api::v1::alter_expr::Kind;
use api::v1::region::region_request::Body;
use api::v1::region::RegionColumnDef;
use api::v1::{
region, AddColumn, AddColumnLocation, AddColumns, AlterExpr, ColumnDataType,
ColumnDef as PbColumnDef, SemanticType,
};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use store_api::storage::RegionId;
use crate::ddl::alter_table::AlterTableProcedure;
use crate::ddl::test_util::columns::TestColumnDefBuilder;
use crate::ddl::test_util::create_table::{
build_raw_table_info_from_expr, TestCreateTableExprBuilder,
};
use crate::key::table_route::TableRouteValue;
use crate::peer::Peer;
use crate::rpc::ddl::AlterTableTask;
use crate::rpc::router::{Region, RegionRoute};
use crate::test_util::{new_ddl_context, MockDatanodeManager};
#[tokio::test]
async fn test_make_alter_region_request() {
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(datanode_manager);
let cluster_id = 1;
let table_id = 1024;
let region_id = RegionId::new(table_id, 1);
let table_name = "foo";
let create_table = TestCreateTableExprBuilder::default()
.column_defs([
TestColumnDefBuilder::default()
.name("ts")
.data_type(ColumnDataType::TimestampMillisecond)
.semantic_type(SemanticType::Timestamp)
.build()
.unwrap()
.into(),
TestColumnDefBuilder::default()
.name("host")
.data_type(ColumnDataType::String)
.semantic_type(SemanticType::Tag)
.build()
.unwrap()
.into(),
TestColumnDefBuilder::default()
.name("cpu")
.data_type(ColumnDataType::Float64)
.semantic_type(SemanticType::Field)
.build()
.unwrap()
.into(),
])
.table_id(table_id)
.time_index("ts")
.primary_keys(["host".into()])
.table_name(table_name)
.build()
.unwrap()
.into();
let table_info = build_raw_table_info_from_expr(&create_table);
// Puts a value to table name key.
ddl_context
.table_metadata_manager
.create_table_metadata(
table_info,
TableRouteValue::physical(vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(Peer::empty(1)),
follower_peers: vec![],
leader_status: None,
leader_down_since: None,
}]),
HashMap::new(),
)
.await
.unwrap();
let task = AlterTableTask {
alter_table: AlterExpr {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
kind: Some(Kind::AddColumns(AddColumns {
add_columns: vec![AddColumn {
column_def: Some(PbColumnDef {
name: "my_tag3".to_string(),
data_type: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: b"hello".to_vec(),
semantic_type: SemanticType::Tag as i32,
comment: String::new(),
..Default::default()
}),
location: Some(AddColumnLocation {
location_type: LocationType::After as i32,
after_column_name: "my_tag2".to_string(),
}),
}],
})),
},
};
let mut procedure =
AlterTableProcedure::new(cluster_id, table_id, task, ddl_context).unwrap();
procedure.on_prepare().await.unwrap();
let Some(Body::Alter(alter_region_request)) =
procedure.make_alter_region_request(region_id).unwrap().body
else {
unreachable!()
};
assert_eq!(alter_region_request.region_id, region_id.as_u64());
assert_eq!(alter_region_request.schema_version, 1);
assert_eq!(
alter_region_request.kind,
Some(region::alter_request::Kind::AddColumns(
region::AddColumns {
add_columns: vec![region::AddColumn {
column_def: Some(RegionColumnDef {
column_def: Some(PbColumnDef {
name: "my_tag3".to_string(),
data_type: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: b"hello".to_vec(),
semantic_type: SemanticType::Tag as i32,
comment: String::new(),
..Default::default()
}),
column_id: 3,
}),
location: Some(AddColumnLocation {
location_type: LocationType::After as i32,
after_column_name: "my_tag2".to_string(),
}),
}]
}
))
);
}
}

View File

@@ -0,0 +1,87 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_grpc_expr::alter_expr_to_request;
use snafu::ResultExt;
use table::metadata::{RawTableInfo, TableInfo};
use table::requests::AlterKind;
use crate::ddl::alter_table::AlterTableProcedure;
use crate::error::{self, Result};
use crate::key::table_info::TableInfoValue;
use crate::key::DeserializedValueWithBytes;
impl AlterTableProcedure {
/// Builds new_meta
pub(crate) fn build_new_table_info(&self, table_info: &RawTableInfo) -> Result<TableInfo> {
let table_info =
TableInfo::try_from(table_info.clone()).context(error::ConvertRawTableInfoSnafu)?;
let table_ref = self.data.table_ref();
let alter_expr = self.data.task.alter_table.clone();
let request = alter_expr_to_request(self.data.table_id(), alter_expr)
.context(error::ConvertAlterTableRequestSnafu)?;
let new_meta = table_info
.meta
.builder_with_alter_kind(table_ref.table, &request.alter_kind, false)
.context(error::TableSnafu)?
.build()
.with_context(|_| error::BuildTableMetaSnafu {
table_name: table_ref.table,
})?;
let mut new_info = table_info.clone();
new_info.meta = new_meta;
new_info.ident.version = table_info.ident.version + 1;
match request.alter_kind {
AlterKind::AddColumns { columns } => {
new_info.meta.next_column_id += columns.len() as u32;
}
AlterKind::RenameTable { new_table_name } => {
new_info.name = new_table_name.to_string();
}
AlterKind::DropColumns { .. } => {}
}
Ok(new_info)
}
/// Updates table metadata for rename table operation.
pub(crate) async fn on_update_metadata_for_rename(
&self,
new_table_name: String,
current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
) -> Result<()> {
let table_metadata_manager = &self.context.table_metadata_manager;
table_metadata_manager
.rename_table(current_table_info_value, new_table_name)
.await?;
Ok(())
}
/// Updates table metadata for alter table operation.
pub(crate) async fn on_update_metadata_for_alter(
&self,
new_table_info: RawTableInfo,
current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
) -> Result<()> {
let table_metadata_manager = &self.context.table_metadata_manager;
table_metadata_manager
.update_table_info(current_table_info_value, new_table_info)
.await?;
Ok(())
}
}

View File

@@ -61,7 +61,7 @@ impl CreateLogicalTablesProcedure {
// Update physical table's metadata
self.context
.table_metadata_manager
.update_table_info(physical_table_info, new_table_info)
.update_table_info(&physical_table_info, new_table_info)
.await?;
// Invalid physical table cache

View File

@@ -28,7 +28,7 @@ pub struct TestAlterTableExpr {
table_name: String,
#[builder(setter(into))]
add_columns: Vec<ColumnDef>,
#[builder(setter(into))]
#[builder(setter(into, strip_option))]
new_table_name: Option<String>,
}

View File

@@ -43,6 +43,7 @@ pub struct TestCreateTableExpr {
primary_keys: Vec<String>,
create_if_not_exists: bool,
table_options: HashMap<String, String>,
#[builder(setter(into, strip_option))]
table_id: Option<TableId>,
#[builder(setter(into), default = "MITO2_ENGINE.to_string()")]
engine: String,

View File

@@ -13,6 +13,7 @@
// limitations under the License.
mod alter_logical_tables;
mod alter_table;
mod create_logical_tables;
mod create_table;
mod drop_database;

View File

@@ -0,0 +1,354 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::alter_expr::Kind;
use api::v1::meta::Partition;
use api::v1::region::{region_request, QueryRequest, RegionRequest};
use api::v1::{
AddColumn, AddColumns, AlterExpr, ColumnDataType, ColumnDef as PbColumnDef, DropColumn,
DropColumns, SemanticType,
};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::debug;
use store_api::storage::RegionId;
use table::metadata::TableId;
use tokio::sync::mpsc::{self};
use crate::datanode_manager::HandleResponse;
use crate::ddl::alter_table::AlterTableProcedure;
use crate::ddl::test_util::alter_table::TestAlterTableExprBuilder;
use crate::ddl::test_util::columns::TestColumnDefBuilder;
use crate::ddl::test_util::create_table::{
build_raw_table_info_from_expr, TestCreateTableExprBuilder,
};
use crate::error::Result;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::peer::Peer;
use crate::rpc::ddl::{AlterTableTask, CreateTableTask};
use crate::rpc::router::{Region, RegionRoute};
use crate::test_util::{new_ddl_context, MockDatanodeHandler, MockDatanodeManager};
fn test_create_table_task(name: &str, table_id: TableId) -> CreateTableTask {
let create_table = TestCreateTableExprBuilder::default()
.column_defs([
TestColumnDefBuilder::default()
.name("ts")
.data_type(ColumnDataType::TimestampMillisecond)
.semantic_type(SemanticType::Timestamp)
.build()
.unwrap()
.into(),
TestColumnDefBuilder::default()
.name("host")
.data_type(ColumnDataType::String)
.semantic_type(SemanticType::Tag)
.build()
.unwrap()
.into(),
TestColumnDefBuilder::default()
.name("cpu")
.data_type(ColumnDataType::Float64)
.semantic_type(SemanticType::Field)
.build()
.unwrap()
.into(),
])
.table_id(table_id)
.time_index("ts")
.primary_keys(["host".into()])
.table_name(name)
.build()
.unwrap()
.into();
let table_info = build_raw_table_info_from_expr(&create_table);
CreateTableTask {
create_table,
// Single region
partitions: vec![Partition {
column_list: vec![],
value_list: vec![],
}],
table_info,
}
}
fn test_rename_alter_table_task(table_name: &str, new_table_name: &str) -> AlterTableTask {
let builder = TestAlterTableExprBuilder::default()
.table_name(table_name)
.new_table_name(new_table_name)
.build()
.unwrap();
AlterTableTask {
alter_table: builder.into(),
}
}
#[tokio::test]
async fn test_on_prepare_table_exists_err() {
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(datanode_manager);
let cluster_id = 1;
let task = test_create_table_task("foo", 1024);
// Puts a value to table name key.
ddl_context
.table_metadata_manager
.create_table_metadata(
task.table_info.clone(),
TableRouteValue::physical(vec![]),
HashMap::new(),
)
.await
.unwrap();
let task = test_rename_alter_table_task("non-exists", "foo");
let mut procedure = AlterTableProcedure::new(cluster_id, 1024, task, ddl_context).unwrap();
let err = procedure.on_prepare().await.unwrap_err();
assert_matches!(err.status_code(), StatusCode::TableAlreadyExists);
}
#[tokio::test]
async fn test_on_prepare_table_not_exists_err() {
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(datanode_manager);
let cluster_id = 1;
let task = test_rename_alter_table_task("non-exists", "foo");
let mut procedure = AlterTableProcedure::new(cluster_id, 1024, task, ddl_context).unwrap();
let err = procedure.on_prepare().await.unwrap_err();
assert_matches!(err.status_code(), StatusCode::TableNotFound);
}
#[derive(Clone)]
pub struct DatanodeWatcher(mpsc::Sender<(Peer, RegionRequest)>);
#[async_trait::async_trait]
impl MockDatanodeHandler for DatanodeWatcher {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<HandleResponse> {
debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}");
self.0.send((peer.clone(), request)).await.unwrap();
Ok(HandleResponse::new(0))
}
async fn handle_query(
&self,
_peer: &Peer,
_request: QueryRequest,
) -> Result<SendableRecordBatchStream> {
unreachable!()
}
}
#[tokio::test]
async fn test_on_submit_alter_request() {
let (tx, mut rx) = mpsc::channel(8);
let datanode_handler = DatanodeWatcher(tx);
let datanode_manager = Arc::new(MockDatanodeManager::new(datanode_handler));
let ddl_context = new_ddl_context(datanode_manager);
let cluster_id = 1;
let table_id = 1024;
let table_name = "foo";
let task = test_create_table_task(table_name, table_id);
// Puts a value to table name key.
ddl_context
.table_metadata_manager
.create_table_metadata(
task.table_info.clone(),
TableRouteValue::physical(vec![
RegionRoute {
region: Region::new_test(RegionId::new(table_id, 1)),
leader_peer: Some(Peer::empty(1)),
follower_peers: vec![Peer::empty(5)],
leader_status: None,
leader_down_since: None,
},
RegionRoute {
region: Region::new_test(RegionId::new(table_id, 2)),
leader_peer: Some(Peer::empty(2)),
follower_peers: vec![Peer::empty(4)],
leader_status: None,
leader_down_since: None,
},
RegionRoute {
region: Region::new_test(RegionId::new(table_id, 3)),
leader_peer: Some(Peer::empty(3)),
follower_peers: vec![],
leader_status: None,
leader_down_since: None,
},
]),
HashMap::new(),
)
.await
.unwrap();
let alter_table_task = AlterTableTask {
alter_table: AlterExpr {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
kind: Some(Kind::DropColumns(DropColumns {
drop_columns: vec![DropColumn {
name: "my_field_column".to_string(),
}],
})),
},
};
let mut procedure =
AlterTableProcedure::new(cluster_id, table_id, alter_table_task, ddl_context).unwrap();
procedure.on_prepare().await.unwrap();
procedure.submit_alter_region_requests().await.unwrap();
let check = |peer: Peer,
request: RegionRequest,
expected_peer_id: u64,
expected_region_id: RegionId| {
assert_eq!(peer.id, expected_peer_id);
let Some(region_request::Body::Alter(req)) = request.body else {
unreachable!();
};
assert_eq!(req.region_id, expected_region_id);
};
let mut results = Vec::new();
for _ in 0..3 {
let result = rx.try_recv().unwrap();
results.push(result);
}
results.sort_unstable_by(|(a, _), (b, _)| a.id.cmp(&b.id));
let (peer, request) = results.remove(0);
check(peer, request, 1, RegionId::new(table_id, 1));
let (peer, request) = results.remove(0);
check(peer, request, 2, RegionId::new(table_id, 2));
let (peer, request) = results.remove(0);
check(peer, request, 3, RegionId::new(table_id, 3));
}
#[tokio::test]
async fn test_on_update_metadata_rename() {
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(datanode_manager);
let cluster_id = 1;
let table_name = "foo";
let new_table_name = "bar";
let table_id = 1024;
let task = test_create_table_task(table_name, table_id);
// Puts a value to table name key.
ddl_context
.table_metadata_manager
.create_table_metadata(
task.table_info.clone(),
TableRouteValue::physical(vec![]),
HashMap::new(),
)
.await
.unwrap();
let task = test_rename_alter_table_task(table_name, new_table_name);
let mut procedure =
AlterTableProcedure::new(cluster_id, table_id, task, ddl_context.clone()).unwrap();
procedure.on_prepare().await.unwrap();
procedure.on_update_metadata().await.unwrap();
let old_table_name_exists = ddl_context
.table_metadata_manager
.table_name_manager()
.exists(TableNameKey::new(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
table_name,
))
.await
.unwrap();
assert!(!old_table_name_exists);
let value = ddl_context
.table_metadata_manager
.table_name_manager()
.get(TableNameKey::new(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
new_table_name,
))
.await
.unwrap()
.unwrap();
assert_eq!(value.table_id(), table_id);
}
#[tokio::test]
async fn test_on_update_metadata_add_columns() {
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(datanode_manager);
let cluster_id = 1;
let table_name = "foo";
let table_id = 1024;
let task = test_create_table_task(table_name, table_id);
// Puts a value to table name key.
ddl_context
.table_metadata_manager
.create_table_metadata(
task.table_info.clone(),
TableRouteValue::physical(vec![]),
HashMap::new(),
)
.await
.unwrap();
let task = AlterTableTask {
alter_table: AlterExpr {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
kind: Some(Kind::AddColumns(AddColumns {
add_columns: vec![AddColumn {
column_def: Some(PbColumnDef {
name: "my_tag3".to_string(),
data_type: ColumnDataType::String as i32,
semantic_type: SemanticType::Tag as i32,
is_nullable: true,
..Default::default()
}),
location: None,
}],
})),
},
};
let mut procedure =
AlterTableProcedure::new(cluster_id, table_id, task, ddl_context.clone()).unwrap();
procedure.on_prepare().await.unwrap();
procedure.on_update_metadata().await.unwrap();
let table_info = ddl_context
.table_metadata_manager
.table_info_manager()
.get(table_id)
.await
.unwrap()
.unwrap()
.into_inner()
.table_info;
assert_eq!(
table_info.meta.schema.column_schemas.len() as u32,
table_info.meta.next_column_id
);
}

View File

@@ -206,13 +206,12 @@ impl DdlManager {
pub async fn submit_alter_table_task(
&self,
cluster_id: ClusterId,
table_id: TableId,
alter_table_task: AlterTableTask,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
) -> Result<(ProcedureId, Option<Output>)> {
let context = self.create_context();
let procedure =
AlterTableProcedure::new(cluster_id, alter_table_task, table_info_value, context)?;
let procedure = AlterTableProcedure::new(cluster_id, table_id, alter_table_task, context)?;
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
@@ -442,12 +441,12 @@ async fn handle_alter_table_task(
})?
.table_id();
let (table_info_value, table_route_value) = ddl_manager
let table_route_value = ddl_manager
.table_metadata_manager()
.get_full_table_info(table_id)
.await?;
let table_route_value = table_route_value
.table_route_manager()
.table_route_storage()
.get_raw(table_id)
.await?
.context(TableRouteNotFoundSnafu { table_id })?
.into_inner();
@@ -458,12 +457,8 @@ async fn handle_alter_table_task(
}
);
let table_info_value = table_info_value.with_context(|| TableInfoNotFoundSnafu {
table: table_ref.to_string(),
})?;
let (id, _) = ddl_manager
.submit_alter_table_task(cluster_id, alter_table_task, table_info_value)
.submit_alter_table_task(cluster_id, table_id, alter_table_task)
.await?;
info!("Table: {table_id} is altered via procedure_id {id:?}");

View File

@@ -595,7 +595,7 @@ impl TableMetadataManager {
/// and the new `TableNameKey` MUST be empty.
pub async fn rename_table(
&self,
current_table_info_value: DeserializedValueWithBytes<TableInfoValue>,
current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
new_table_name: String,
) -> Result<()> {
let current_table_info = &current_table_info_value.table_info;
@@ -629,7 +629,7 @@ impl TableMetadataManager {
// Updates table info.
let (update_table_info_txn, on_update_table_info_failure) = self
.table_info_manager()
.build_update_txn(table_id, &current_table_info_value, &new_table_info_value)?;
.build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
let txn = Txn::merge_all(vec![update_table_name_txn, update_table_info_txn]);
@@ -653,7 +653,7 @@ impl TableMetadataManager {
/// Updates table info and returns an error if different metadata exists.
pub async fn update_table_info(
&self,
current_table_info_value: DeserializedValueWithBytes<TableInfoValue>,
current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
new_table_info: RawTableInfo,
) -> Result<()> {
let table_id = current_table_info_value.table_info.ident.table_id;
@@ -663,7 +663,7 @@ impl TableMetadataManager {
// Updates table info.
let (update_table_info_txn, on_update_table_info_failure) = self
.table_info_manager()
.build_update_txn(table_id, &current_table_info_value, &new_table_info_value)?;
.build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
let r = self.kv_backend.txn(update_table_info_txn).await?;
@@ -1229,12 +1229,12 @@ mod tests {
DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
table_metadata_manager
.rename_table(table_info_value.clone(), new_table_name.clone())
.rename_table(&table_info_value, new_table_name.clone())
.await
.unwrap();
// if remote metadata was updated, it should be ok.
table_metadata_manager
.rename_table(table_info_value.clone(), new_table_name.clone())
.rename_table(&table_info_value, new_table_name.clone())
.await
.unwrap();
let mut modified_table_info = table_info.clone();
@@ -1244,7 +1244,7 @@ mod tests {
// if the table_info_value is wrong, it should return an error.
// The ABA problem.
assert!(table_metadata_manager
.rename_table(modified_table_info_value.clone(), new_table_name.clone())
.rename_table(&modified_table_info_value, new_table_name.clone())
.await
.is_err());
@@ -1302,12 +1302,12 @@ mod tests {
DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
// should be ok.
table_metadata_manager
.update_table_info(current_table_info_value.clone(), new_table_info.clone())
.update_table_info(&current_table_info_value, new_table_info.clone())
.await
.unwrap();
// if table info was updated, it should be ok.
table_metadata_manager
.update_table_info(current_table_info_value.clone(), new_table_info.clone())
.update_table_info(&current_table_info_value, new_table_info.clone())
.await
.unwrap();
@@ -1329,7 +1329,7 @@ mod tests {
// if the current_table_info_value is wrong, it should return an error.
// The ABA problem.
assert!(table_metadata_manager
.update_table_info(wrong_table_info_value, new_table_info)
.update_table_info(&wrong_table_info_value, new_table_info)
.await
.is_err())
}

View File

@@ -474,6 +474,16 @@ pub struct AlterTableTask {
}
impl AlterTableTask {
pub fn validate(&self) -> Result<()> {
self.alter_table
.kind
.as_ref()
.context(error::UnexpectedSnafu {
err_msg: "'kind' is absent",
})?;
Ok(())
}
pub fn table_ref(&self) -> TableReference {
TableReference {
catalog: &self.alter_table.catalog_name,

View File

@@ -15,19 +15,13 @@
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Mutex};
use api::v1::add_column_location::LocationType;
use api::v1::alter_expr::Kind;
use api::v1::meta::Partition;
use api::v1::region::region_request::{self, Body as PbRegionRequest};
use api::v1::region::{CreateRequest as PbCreateRegionRequest, RegionColumnDef};
use api::v1::{
region, AddColumn, AddColumnLocation, AddColumns, AlterExpr, ColumnDataType,
ColumnDef as PbColumnDef, DropColumn, DropColumns, SemanticType,
};
use api::v1::{ColumnDataType, ColumnDef as PbColumnDef, SemanticType};
use client::client_manager::DatanodeClients;
use common_catalog::consts::MITO2_ENGINE;
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::ddl::alter_table::AlterTableProcedure;
use common_meta::ddl::create_logical_tables::{CreateLogicalTablesProcedure, CreateTablesState};
use common_meta::ddl::create_table::*;
use common_meta::ddl::drop_table::executor::DropTableExecutor;
@@ -39,7 +33,7 @@ use common_meta::ddl::test_util::create_table::{
use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_route::{PhysicalTableRouteValue, TableRouteValue};
use common_meta::key::DeserializedValueWithBytes;
use common_meta::rpc::ddl::{AlterTableTask, CreateTableTask, DropTableTask};
use common_meta::rpc::ddl::{CreateTableTask, DropTableTask};
use common_meta::rpc::router::{find_leaders, RegionRoute};
use common_meta::table_name::TableName;
use common_procedure::Status;
@@ -375,138 +369,3 @@ async fn test_on_datanode_drop_regions() {
assert!(expected_dropped_regions.lock().unwrap().is_empty());
}
#[test]
fn test_create_alter_region_request() {
let alter_table_task = AlterTableTask {
alter_table: AlterExpr {
catalog_name: "my_catalog".to_string(),
schema_name: "my_schema".to_string(),
table_name: "my_table".to_string(),
kind: Some(Kind::AddColumns(AddColumns {
add_columns: vec![AddColumn {
column_def: Some(PbColumnDef {
name: "my_tag3".to_string(),
data_type: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: b"hello".to_vec(),
semantic_type: SemanticType::Tag as i32,
comment: String::new(),
..Default::default()
}),
location: Some(AddColumnLocation {
location_type: LocationType::After as i32,
after_column_name: "my_tag2".to_string(),
}),
}],
})),
},
};
let procedure = AlterTableProcedure::new(
1,
alter_table_task,
DeserializedValueWithBytes::from_inner(TableInfoValue::new(test_data::new_table_info())),
test_data::new_ddl_context(Arc::new(DatanodeClients::default())),
)
.unwrap();
let region_id = RegionId::new(42, 1);
let alter_region_request = procedure.create_alter_region_request(region_id).unwrap();
assert_eq!(alter_region_request.region_id, region_id.as_u64());
assert_eq!(alter_region_request.schema_version, 1);
assert_eq!(
alter_region_request.kind,
Some(region::alter_request::Kind::AddColumns(
region::AddColumns {
add_columns: vec![region::AddColumn {
column_def: Some(RegionColumnDef {
column_def: Some(PbColumnDef {
name: "my_tag3".to_string(),
data_type: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: b"hello".to_vec(),
semantic_type: SemanticType::Tag as i32,
comment: String::new(),
..Default::default()
}),
column_id: 3,
}),
location: Some(AddColumnLocation {
location_type: LocationType::After as i32,
after_column_name: "my_tag2".to_string(),
}),
}]
}
))
);
}
#[tokio::test]
async fn test_submit_alter_region_requests() {
let alter_table_task = AlterTableTask {
alter_table: AlterExpr {
catalog_name: "my_catalog".to_string(),
schema_name: "my_schema".to_string(),
table_name: "my_table".to_string(),
kind: Some(Kind::DropColumns(DropColumns {
drop_columns: vec![DropColumn {
name: "my_field_column".to_string(),
}],
})),
},
};
let (region_server, mut rx) = EchoRegionServer::new();
let region_routes = test_data::new_region_routes();
let datanode_manager = new_datanode_manager(&region_server, &region_routes).await;
let context = test_data::new_ddl_context(datanode_manager);
let table_info = test_data::new_table_info();
context
.table_metadata_manager
.create_table_metadata(
table_info.clone(),
TableRouteValue::physical(region_routes),
HashMap::default(),
)
.await
.unwrap();
let mut procedure = AlterTableProcedure::new(
1,
alter_table_task,
DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info)),
context,
)
.unwrap();
let expected_altered_regions = Arc::new(Mutex::new(HashSet::from([
RegionId::new(42, 1),
RegionId::new(42, 2),
RegionId::new(42, 3),
])));
let handle = tokio::spawn({
let expected_altered_regions = expected_altered_regions.clone();
let mut max_recv = expected_altered_regions.lock().unwrap().len();
async move {
while let Some(region_request::Body::Alter(request)) = rx.recv().await {
let region_id = RegionId::from_u64(request.region_id);
expected_altered_regions.lock().unwrap().remove(&region_id);
max_recv -= 1;
if max_recv == 0 {
break;
}
}
}
});
let status = procedure.submit_alter_region_requests().await.unwrap();
assert!(matches!(status, Status::Executing { persist: true }));
handle.await.unwrap();
assert!(expected_altered_regions.lock().unwrap().is_empty());
}