refactor: copy alter logic to SchemaHelper

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
evenyag
2025-06-19 21:10:20 +08:00
committed by Lei, HUANG
parent 34875c0346
commit 14f3a4ab05
2 changed files with 273 additions and 61 deletions

View File

@@ -14,18 +14,24 @@
//! Utilities to deal with table schemas.
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::{ColumnDataType, ColumnSchema, CreateTableExpr, SemanticType};
use api::v1::{AlterTableExpr, ColumnDataType, ColumnSchema, CreateTableExpr, SemanticType};
use catalog::CatalogManagerRef;
use common_catalog::consts::{default_engine, is_readonly_schema};
use common_catalog::consts::{
default_engine, is_readonly_schema, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME,
};
use common_catalog::format_full_table_name;
use common_meta::cache_invalidator::{CacheInvalidatorRef, Context};
use common_meta::ddl::{ExecutorContext, ProcedureExecutorRef};
use common_meta::instruction::CacheIdent;
use common_meta::key::schema_name::SchemaNameKey;
use common_meta::key::{TableMetadataManagerRef, NAME_PATTERN};
use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use common_meta::rpc::router::Partition;
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use common_query::Output;
use common_telemetry::tracing;
use lazy_static::lazy_static;
use regex::Regex;
@@ -36,18 +42,19 @@ use store_api::metric_engine_consts::{
LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY,
};
use table::dist_table::DistTable;
use table::metadata::{RawTableInfo, TableInfo};
use table::metadata::{RawTableInfo, TableId, TableInfo};
use table::table_name::TableName;
use table::table_reference::TableReference;
use table::TableRef;
use crate::error::{
CatalogSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu, EmptyDdlExprSnafu,
ExecuteDdlSnafu, InvalidPartitionRuleSnafu, InvalidTableNameSnafu, Result, SchemaNotFoundSnafu,
SchemaReadOnlySnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu, UnexpectedSnafu,
ExecuteDdlSnafu, InvalidPartitionRuleSnafu, InvalidTableNameSnafu, InvalidateTableCacheSnafu,
Result, SchemaNotFoundSnafu, SchemaReadOnlySnafu, TableAlreadyExistsSnafu,
TableMetadataManagerSnafu, TableNotFoundSnafu, UnexpectedSnafu,
};
use crate::insert::build_create_table_expr;
use crate::statement::ddl::{create_table_info, parse_partitions};
use crate::statement::ddl::{create_table_info, parse_partitions, verify_alter};
// TODO(yingwen): Replaces operator::statement::ddl::NAME_PATTERN_REG
lazy_static! {
@@ -56,10 +63,12 @@ lazy_static! {
}
/// Helper to query and manipulate table schemas.
#[derive(Clone)]
pub struct SchemaHelper {
catalog_manager: CatalogManagerRef,
table_metadata_manager: TableMetadataManagerRef,
procedure_executor: ProcedureExecutorRef,
cache_invalidator: CacheInvalidatorRef,
}
impl SchemaHelper {
@@ -68,11 +77,13 @@ impl SchemaHelper {
catalog_manager: CatalogManagerRef,
table_metadata_manager: TableMetadataManagerRef,
procedure_executor: ProcedureExecutorRef,
cache_invalidator: CacheInvalidatorRef,
) -> Self {
Self {
catalog_manager,
table_metadata_manager,
procedure_executor,
cache_invalidator,
}
}
@@ -360,6 +371,190 @@ impl SchemaHelper {
.collect())
}
// TODO(yingwen): Replaces StatementExecutor::alter_table_inner
/// Alters a table by [AlterTableExpr].
#[tracing::instrument(skip_all)]
pub async fn alter_table_by_expr(
&self,
expr: AlterTableExpr,
query_context: QueryContextRef,
) -> Result<Output> {
ensure!(
!is_readonly_schema(&expr.schema_name),
SchemaReadOnlySnafu {
name: expr.schema_name.clone()
}
);
let catalog_name = if expr.catalog_name.is_empty() {
DEFAULT_CATALOG_NAME.to_string()
} else {
expr.catalog_name.clone()
};
let schema_name = if expr.schema_name.is_empty() {
DEFAULT_SCHEMA_NAME.to_string()
} else {
expr.schema_name.clone()
};
let table_name = expr.table_name.clone();
let table = self
.catalog_manager
.table(
&catalog_name,
&schema_name,
&table_name,
Some(&query_context),
)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: format_full_table_name(&catalog_name, &schema_name, &table_name),
})?;
let table_id = table.table_info().ident.table_id;
let need_alter = verify_alter(table_id, table.table_info(), expr.clone())?;
if !need_alter {
return Ok(Output::new_with_affected_rows(0));
}
common_telemetry::info!(
"Table info before alter is {:?}, expr: {:?}",
table.table_info(),
expr
);
let physical_table_id = self
.table_metadata_manager
.table_route_manager()
.get_physical_table_id(table_id)
.await
.context(TableMetadataManagerSnafu)?;
let (req, invalidate_keys) = if physical_table_id == table_id {
// This is physical table
let req = SubmitDdlTaskRequest {
query_context,
task: DdlTask::new_alter_table(expr),
};
let invalidate_keys = vec![
CacheIdent::TableId(table_id),
CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
];
(req, invalidate_keys)
} else {
// This is logical table
let req = SubmitDdlTaskRequest {
query_context,
task: DdlTask::new_alter_logical_tables(vec![expr]),
};
let mut invalidate_keys = vec![
CacheIdent::TableId(physical_table_id),
CacheIdent::TableId(table_id),
CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
];
let physical_table = self
.table_metadata_manager
.table_info_manager()
.get(physical_table_id)
.await
.context(TableMetadataManagerSnafu)?
.map(|x| x.into_inner());
if let Some(physical_table) = physical_table {
let physical_table_name = TableName::new(
physical_table.table_info.catalog_name,
physical_table.table_info.schema_name,
physical_table.table_info.name,
);
invalidate_keys.push(CacheIdent::TableName(physical_table_name));
}
(req, invalidate_keys)
};
self.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), req)
.await
.context(ExecuteDdlSnafu)?;
// Invalidates local cache ASAP.
self.cache_invalidator
.invalidate(&Context::default(), &invalidate_keys)
.await
.context(InvalidateTableCacheSnafu)?;
Ok(Output::new_with_affected_rows(0))
}
// TODO(yingwen): Replaces StatementExecutor::alter_logical_tables
/// Alter logical tables.
pub async fn alter_logical_tables(
&self,
alter_table_exprs: Vec<AlterTableExpr>,
query_context: QueryContextRef,
) -> Result<Output> {
let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer();
ensure!(
!alter_table_exprs.is_empty(),
EmptyDdlExprSnafu {
name: "alter logical tables"
}
);
// group by physical table id
let mut groups: HashMap<TableId, Vec<AlterTableExpr>> = HashMap::new();
for expr in alter_table_exprs {
// Get table_id from catalog_manager
let catalog = if expr.catalog_name.is_empty() {
query_context.current_catalog()
} else {
&expr.catalog_name
};
let schema = if expr.schema_name.is_empty() {
query_context.current_schema()
} else {
expr.schema_name.to_string()
};
let table_name = &expr.table_name;
let table = self
.catalog_manager
.table(catalog, &schema, table_name, Some(&query_context))
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: format_full_table_name(catalog, &schema, table_name),
})?;
let table_id = table.table_info().ident.table_id;
let physical_table_id = self
.table_metadata_manager
.table_route_manager()
.get_physical_table_id(table_id)
.await
.context(TableMetadataManagerSnafu)?;
groups.entry(physical_table_id).or_default().push(expr);
}
// Submit procedure for each physical table
let mut handles = Vec::with_capacity(groups.len());
for (_physical_table_id, exprs) in groups {
let fut = self.alter_logical_tables_procedure(exprs, query_context.clone());
handles.push(fut);
}
let _results = futures::future::try_join_all(handles).await?;
Ok(Output::new_with_affected_rows(0))
}
/// Returns the catalog manager.
pub(crate) fn catalog_manager(&self) -> &CatalogManagerRef {
&self.catalog_manager
}
// TODO(yingwen): Replace StatementExecutor::create_table_procedure
/// Submits a procedure to create a non-logical table.
async fn create_table_procedure(
@@ -399,4 +594,22 @@ impl SchemaHelper {
.await
.context(ExecuteDdlSnafu)
}
// TODO(yingwen): Replace StatementExecutor::alter_logical_tables_procedure
/// Submits a procedure to alter logical tables.
async fn alter_logical_tables_procedure(
&self,
tables_data: Vec<AlterTableExpr>,
query_context: QueryContextRef,
) -> Result<SubmitDdlTaskResponse> {
let request = SubmitDdlTaskRequest {
query_context,
task: DdlTask::new_alter_logical_tables(tables_data),
};
self.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), request)
.await
.context(ExecuteDdlSnafu)
}
}

View File

@@ -1152,60 +1152,6 @@ impl StatementExecutor {
Ok(Output::new_with_affected_rows(0))
}
/// Verifies an alter and returns whether it is necessary to perform the alter.
///
/// # Returns
///
/// Returns true if the alter need to be porformed; otherwise, it returns false.
fn verify_alter(
&self,
table_id: TableId,
table_info: Arc<TableInfo>,
expr: AlterTableExpr,
) -> Result<bool> {
let request: AlterTableRequest = common_grpc_expr::alter_expr_to_request(table_id, expr)
.context(AlterExprToRequestSnafu)?;
let AlterTableRequest {
table_name,
alter_kind,
..
} = &request;
if let AlterKind::RenameTable { new_table_name } = alter_kind {
ensure!(
NAME_PATTERN_REG.is_match(new_table_name),
error::UnexpectedSnafu {
violated: format!("Invalid table name: {}", new_table_name)
}
);
} else if let AlterKind::AddColumns { columns } = alter_kind {
// If all the columns are marked as add_if_not_exists and they already exist in the table,
// there is no need to perform the alter.
let column_names: HashSet<_> = table_info
.meta
.schema
.column_schemas()
.iter()
.map(|schema| &schema.name)
.collect();
if columns.iter().all(|column| {
column_names.contains(&column.column_schema.name) && column.add_if_not_exists
}) {
return Ok(false);
}
}
let _ = table_info
.meta
.builder_with_alter_kind(table_name, &request.alter_kind)
.context(error::TableSnafu)?
.build()
.context(error::BuildTableMetaSnafu { table_name })?;
Ok(true)
}
#[tracing::instrument(skip_all)]
pub async fn alter_table(
&self,
@@ -1258,7 +1204,7 @@ impl StatementExecutor {
})?;
let table_id = table.table_info().ident.table_id;
let need_alter = self.verify_alter(table_id, table.table_info(), expr.clone())?;
let need_alter = verify_alter(table_id, table.table_info(), expr.clone())?;
if !need_alter {
return Ok(Output::new_with_affected_rows(0));
}
@@ -1585,6 +1531,59 @@ impl StatementExecutor {
}
}
/// Verifies an alter and returns whether it is necessary to perform the alter.
///
/// # Returns
///
/// Returns true if the alter need to be porformed; otherwise, it returns false.
pub(crate) fn verify_alter(
table_id: TableId,
table_info: Arc<TableInfo>,
expr: AlterTableExpr,
) -> Result<bool> {
let request: AlterTableRequest =
common_grpc_expr::alter_expr_to_request(table_id, expr).context(AlterExprToRequestSnafu)?;
let AlterTableRequest {
table_name,
alter_kind,
..
} = &request;
if let AlterKind::RenameTable { new_table_name } = alter_kind {
ensure!(
NAME_PATTERN_REG.is_match(new_table_name),
error::UnexpectedSnafu {
violated: format!("Invalid table name: {}", new_table_name)
}
);
} else if let AlterKind::AddColumns { columns } = alter_kind {
// If all the columns are marked as add_if_not_exists and they already exist in the table,
// there is no need to perform the alter.
let column_names: HashSet<_> = table_info
.meta
.schema
.column_schemas()
.iter()
.map(|schema| &schema.name)
.collect();
if columns.iter().all(|column| {
column_names.contains(&column.column_schema.name) && column.add_if_not_exists
}) {
return Ok(false);
}
}
let _ = table_info
.meta
.builder_with_alter_kind(table_name, &request.alter_kind)
.context(error::TableSnafu)?
.build()
.context(error::BuildTableMetaSnafu { table_name })?;
Ok(true)
}
/// Parse partition statement [Partitions] into [MetaPartition] and partition columns.
pub(crate) fn parse_partitions(
create_table: &CreateTableExpr,