feat: metric engine support alter (#3098)

* feat: metric engine support alter

* chore: by comment

* feat: get physical table route for frontend
This commit is contained in:
JeremyHi
2024-01-05 17:46:39 +08:00
committed by GitHub
parent e0a43f37d7
commit bd1a5dc265
5 changed files with 137 additions and 52 deletions

View File

@@ -40,9 +40,7 @@ use table::requests::AlterKind;
use crate::cache_invalidator::Context; use crate::cache_invalidator::Context;
use crate::ddl::utils::handle_operate_region_error; use crate::ddl::utils::handle_operate_region_error;
use crate::ddl::DdlContext; use crate::ddl::DdlContext;
use crate::error::{ use crate::error::{self, ConvertAlterTableRequestSnafu, InvalidProtoMsgSnafu, Result};
self, ConvertAlterTableRequestSnafu, InvalidProtoMsgSnafu, Result, TableRouteNotFoundSnafu,
};
use crate::key::table_info::TableInfoValue; use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey; use crate::key::table_name::TableNameKey;
use crate::key::DeserializedValueWithBytes; use crate::key::DeserializedValueWithBytes;
@@ -65,6 +63,7 @@ impl AlterTableProcedure {
cluster_id: u64, cluster_id: u64,
task: AlterTableTask, task: AlterTableTask,
table_info_value: DeserializedValueWithBytes<TableInfoValue>, table_info_value: DeserializedValueWithBytes<TableInfoValue>,
physical_table_name: Option<TableName>,
context: DdlContext, context: DdlContext,
) -> Result<Self> { ) -> Result<Self> {
let alter_kind = task let alter_kind = task
@@ -84,7 +83,13 @@ impl AlterTableProcedure {
Ok(Self { Ok(Self {
context, context,
data: AlterTableData::new(task, table_info_value, cluster_id, next_column_id), data: AlterTableData::new(
task,
table_info_value,
physical_table_name,
cluster_id,
next_column_id,
),
kind, kind,
}) })
} }
@@ -182,23 +187,19 @@ impl AlterTableProcedure {
pub async fn submit_alter_region_requests(&mut self) -> Result<Status> { pub async fn submit_alter_region_requests(&mut self) -> Result<Status> {
let table_id = self.data.table_id(); let table_id = self.data.table_id();
let (_, physical_table_route) = self
let table_route = self
.context .context
.table_metadata_manager .table_metadata_manager
.table_route_manager() .table_route_manager()
.get(table_id) .get_physical_table_route(table_id)
.await? .await?;
.context(TableRouteNotFoundSnafu { table_id })?
.into_inner();
let region_routes = table_route.region_routes()?;
let leaders = find_leaders(region_routes); let leaders = find_leaders(&physical_table_route.region_routes);
let mut alter_region_tasks = Vec::with_capacity(leaders.len()); let mut alter_region_tasks = Vec::with_capacity(leaders.len());
for datanode in leaders { for datanode in leaders {
let requester = self.context.datanode_manager.datanode(&datanode).await; let requester = self.context.datanode_manager.datanode(&datanode).await;
let regions = find_leader_regions(region_routes, &datanode); let regions = find_leader_regions(&physical_table_route.region_routes, &datanode);
for region in regions { for region in regions {
let region_id = RegionId::new(table_id, region); let region_id = RegionId::new(table_id, region);
@@ -335,13 +336,24 @@ impl AlterTableProcedure {
} }
fn lock_key_inner(&self) -> Vec<String> { fn lock_key_inner(&self) -> Vec<String> {
let mut lock_key = vec![];
if let Some(physical_table_name) = self.data.physical_table_name() {
let physical_table_key = common_catalog::format_full_table_name(
&physical_table_name.catalog_name,
&physical_table_name.schema_name,
&physical_table_name.table_name,
);
lock_key.push(physical_table_key);
}
let table_ref = self.data.table_ref(); let table_ref = self.data.table_ref();
let table_key = common_catalog::format_full_table_name( let table_key = common_catalog::format_full_table_name(
table_ref.catalog, table_ref.catalog,
table_ref.schema, table_ref.schema,
table_ref.table, table_ref.table,
); );
let mut lock_key = vec![table_key]; lock_key.push(table_key);
if let Ok(Kind::RenameTable(RenameTable { new_table_name })) = self.alter_kind() { if let Ok(Kind::RenameTable(RenameTable { new_table_name })) = self.alter_kind() {
lock_key.push(common_catalog::format_full_table_name( lock_key.push(common_catalog::format_full_table_name(
@@ -415,6 +427,8 @@ pub struct AlterTableData {
task: AlterTableTask, task: AlterTableTask,
/// Table info value before alteration. /// Table info value before alteration.
table_info_value: DeserializedValueWithBytes<TableInfoValue>, table_info_value: DeserializedValueWithBytes<TableInfoValue>,
/// Physical table name, if the table to alter is a logical table.
physical_table_name: Option<TableName>,
cluster_id: u64, cluster_id: u64,
/// Next column id of the table if the task adds columns to the table. /// Next column id of the table if the task adds columns to the table.
next_column_id: Option<ColumnId>, next_column_id: Option<ColumnId>,
@@ -424,6 +438,7 @@ impl AlterTableData {
pub fn new( pub fn new(
task: AlterTableTask, task: AlterTableTask,
table_info_value: DeserializedValueWithBytes<TableInfoValue>, table_info_value: DeserializedValueWithBytes<TableInfoValue>,
physical_table_name: Option<TableName>,
cluster_id: u64, cluster_id: u64,
next_column_id: Option<ColumnId>, next_column_id: Option<ColumnId>,
) -> Self { ) -> Self {
@@ -431,6 +446,7 @@ impl AlterTableData {
state: AlterTableState::Prepare, state: AlterTableState::Prepare,
task, task,
table_info_value, table_info_value,
physical_table_name,
cluster_id, cluster_id,
next_column_id, next_column_id,
} }
@@ -447,6 +463,10 @@ impl AlterTableData {
fn table_info(&self) -> &RawTableInfo { fn table_info(&self) -> &RawTableInfo {
&self.table_info_value.table_info &self.table_info_value.table_info
} }
fn physical_table_name(&self) -> Option<&TableName> {
self.physical_table_name.as_ref()
}
} }
/// Creates region proto alter kind from `table_info` and `alter_kind`. /// Creates region proto alter kind from `table_info` and `alter_kind`.

View File

@@ -46,6 +46,8 @@ use crate::rpc::ddl::{
TruncateTableTask, TruncateTableTask,
}; };
use crate::rpc::router::RegionRoute; use crate::rpc::router::RegionRoute;
use crate::table_name::TableName;
pub type DdlManagerRef = Arc<DdlManager>; pub type DdlManagerRef = Arc<DdlManager>;
/// The [DdlManager] provides the ability to execute Ddl. /// The [DdlManager] provides the ability to execute Ddl.
@@ -160,11 +162,17 @@ impl DdlManager {
cluster_id: u64, cluster_id: u64,
alter_table_task: AlterTableTask, alter_table_task: AlterTableTask,
table_info_value: DeserializedValueWithBytes<TableInfoValue>, table_info_value: DeserializedValueWithBytes<TableInfoValue>,
physical_table_name: Option<TableName>,
) -> Result<ProcedureId> { ) -> Result<ProcedureId> {
let context = self.create_context(); let context = self.create_context();
let procedure = let procedure = AlterTableProcedure::new(
AlterTableProcedure::new(cluster_id, alter_table_task, table_info_value, context)?; cluster_id,
alter_table_task,
table_info_value,
physical_table_name,
context,
)?;
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
@@ -327,8 +335,38 @@ async fn handle_alter_table_task(
table_name: table_ref.to_string(), table_name: table_ref.to_string(),
})?; })?;
let physical_table_id = ddl_manager
.table_metadata_manager()
.table_route_manager()
.get_physical_table_id(table_id)
.await?;
let physical_table_name = if physical_table_id == table_id {
None
} else {
let physical_table_info = &ddl_manager
.table_metadata_manager()
.table_info_manager()
.get(physical_table_id)
.await?
.with_context(|| error::TableInfoNotFoundSnafu {
table_name: table_ref.to_string(),
})?
.table_info;
Some(TableName {
catalog_name: physical_table_info.catalog_name.clone(),
schema_name: physical_table_info.schema_name.clone(),
table_name: physical_table_info.name.clone(),
})
};
let id = ddl_manager let id = ddl_manager
.submit_alter_table_task(cluster_id, alter_table_task, table_info_value) .submit_alter_table_task(
cluster_id,
alter_table_task,
table_info_value,
physical_table_name,
)
.await?; .await?;
info!("Table: {table_id} is altered via procedure_id {id:?}"); info!("Table: {table_id} is altered via procedure_id {id:?}");

View File

@@ -16,12 +16,14 @@ use std::collections::HashMap;
use std::fmt::Display; use std::fmt::Display;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt}; use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::{RegionId, RegionNumber}; use store_api::storage::{RegionId, RegionNumber};
use table::metadata::TableId; use table::metadata::TableId;
use super::{DeserializedValueWithBytes, TableMetaValue}; use super::{DeserializedValueWithBytes, TableMetaValue};
use crate::error::{Result, SerdeJsonSnafu, UnexpectedLogicalRouteTableSnafu}; use crate::error::{
Result, SerdeJsonSnafu, TableRouteNotFoundSnafu, UnexpectedLogicalRouteTableSnafu,
};
use crate::key::{to_removed_key, RegionDistribution, TableMetaKey, TABLE_ROUTE_PREFIX}; use crate::key::{to_removed_key, RegionDistribution, TableMetaKey, TABLE_ROUTE_PREFIX};
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse}; use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse};
use crate::kv_backend::KvBackendRef; use crate::kv_backend::KvBackendRef;
@@ -334,6 +336,54 @@ impl TableRouteManager {
.transpose() .transpose()
} }
pub async fn get_physical_table_id(
&self,
logical_or_physical_table_id: TableId,
) -> Result<TableId> {
let table_route = self
.get(logical_or_physical_table_id)
.await?
.context(TableRouteNotFoundSnafu {
table_id: logical_or_physical_table_id,
})?
.into_inner();
match table_route {
TableRouteValue::Physical(_) => Ok(logical_or_physical_table_id),
TableRouteValue::Logical(x) => Ok(x.physical_table_id()),
}
}
pub async fn get_physical_table_route(
&self,
logical_or_physical_table_id: TableId,
) -> Result<(TableId, PhysicalTableRouteValue)> {
let table_route = self
.get(logical_or_physical_table_id)
.await?
.context(TableRouteNotFoundSnafu {
table_id: logical_or_physical_table_id,
})?
.into_inner();
match table_route {
TableRouteValue::Physical(x) => Ok((logical_or_physical_table_id, x)),
TableRouteValue::Logical(x) => {
let physical_table_id = x.physical_table_id();
let physical_table_route =
self.get(physical_table_id)
.await?
.context(TableRouteNotFoundSnafu {
table_id: physical_table_id,
})?;
Ok((
physical_table_id,
physical_table_route.physical_table_route().clone(),
))
}
}
}
/// It may return a subset of the `table_ids`. /// It may return a subset of the `table_ids`.
pub async fn batch_get( pub async fn batch_get(
&self, &self,

View File

@@ -313,6 +313,7 @@ fn test_create_alter_region_request() {
1, 1,
alter_table_task, alter_table_task,
DeserializedValueWithBytes::from_inner(TableInfoValue::new(test_data::new_table_info())), DeserializedValueWithBytes::from_inner(TableInfoValue::new(test_data::new_table_info())),
None,
test_data::new_ddl_context(Arc::new(DatanodeClients::default())), test_data::new_ddl_context(Arc::new(DatanodeClients::default())),
) )
.unwrap(); .unwrap();
@@ -383,6 +384,7 @@ async fn test_submit_alter_region_requests() {
1, 1,
alter_table_task, alter_table_task,
DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info)), DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info)),
None,
context, context,
) )
.unwrap(); .unwrap();

View File

@@ -16,7 +16,7 @@ use std::collections::{HashMap, HashSet};
use std::sync::Arc; use std::sync::Arc;
use api::v1::Rows; use api::v1::Rows;
use common_meta::key::table_route::{TableRouteManager, TableRouteValue}; use common_meta::key::table_route::TableRouteManager;
use common_meta::kv_backend::KvBackendRef; use common_meta::kv_backend::KvBackendRef;
use common_meta::peer::Peer; use common_meta::peer::Peer;
use common_meta::rpc::router; use common_meta::rpc::router;
@@ -29,7 +29,7 @@ use store_api::storage::{RegionId, RegionNumber};
use table::metadata::TableId; use table::metadata::TableId;
use crate::columns::RangeColumnsPartitionRule; use crate::columns::RangeColumnsPartitionRule;
use crate::error::{FindLeaderSnafu, InvalidTableRouteDataSnafu, Result}; use crate::error::{FindLeaderSnafu, Result};
use crate::partition::{PartitionBound, PartitionDef, PartitionExpr}; use crate::partition::{PartitionBound, PartitionDef, PartitionExpr};
use crate::range::RangePartitionRule; use crate::range::RangePartitionRule;
use crate::splitter::RowSplitter; use crate::splitter::RowSplitter;
@@ -65,38 +65,13 @@ impl PartitionRuleManager {
} }
} }
/// Find table route of given table name.
async fn find_table_route(&self, table_id: TableId) -> Result<TableRouteValue> {
let route = self
.table_route_manager
.get(table_id)
.await
.context(error::TableRouteManagerSnafu)?
.context(error::FindTableRoutesSnafu { table_id })?
.into_inner();
Ok(route)
}
async fn find_region_routes(&self, table_id: TableId) -> Result<Vec<RegionRoute>> { async fn find_region_routes(&self, table_id: TableId) -> Result<Vec<RegionRoute>> {
let table_route = self.find_table_route(table_id).await?; let (_, route) = self
.table_route_manager
let region_routes = match table_route { .get_physical_table_route(table_id)
TableRouteValue::Physical(x) => x.region_routes, .await
.context(error::TableRouteManagerSnafu)?;
TableRouteValue::Logical(x) => { Ok(route.region_routes)
let TableRouteValue::Physical(physical_table_route) =
self.find_table_route(x.physical_table_id()).await?
else {
return InvalidTableRouteDataSnafu {
table_id: x.physical_table_id(),
err_msg: "expected to be a physical table route",
}
.fail();
};
physical_table_route.region_routes
}
};
Ok(region_routes)
} }
pub async fn find_table_partitions(&self, table_id: TableId) -> Result<Vec<PartitionInfo>> { pub async fn find_table_partitions(&self, table_id: TableId) -> Result<Vec<PartitionInfo>> {