refactor: define schema_helper mod to handle schema creation

It has the same logic as Inserter and StatementExecutor

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
evenyag
2025-06-19 20:32:28 +08:00
committed by Lei, HUANG
parent 1d07864b29
commit 34875c0346
5 changed files with 408 additions and 4 deletions

View File

@@ -1011,7 +1011,8 @@ fn validate_column_count_match(requests: &RowInsertRequests) -> Result<()> {
Ok(())
}
fn build_create_table_expr(
/// Builds a [CreateTableExpr] for the given table and schema.
pub(crate) fn build_create_table_expr(
table: &TableReference,
request_schema: &[ColumnSchema],
engine: &str,

View File

@@ -27,6 +27,7 @@ pub mod procedure;
pub mod region_req_factory;
pub mod req_convert;
pub mod request;
pub mod schema_helper;
pub mod statement;
pub mod table;
#[cfg(test)]

View File

@@ -0,0 +1,402 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Utilities to deal with table schemas.
use std::sync::Arc;
use api::v1::{ColumnDataType, ColumnSchema, CreateTableExpr, SemanticType};
use catalog::CatalogManagerRef;
use common_catalog::consts::{default_engine, is_readonly_schema};
use common_catalog::format_full_table_name;
use common_meta::ddl::{ExecutorContext, ProcedureExecutorRef};
use common_meta::key::schema_name::SchemaNameKey;
use common_meta::key::{TableMetadataManagerRef, NAME_PATTERN};
use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use common_meta::rpc::router::Partition;
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use common_telemetry::tracing;
use lazy_static::lazy_static;
use regex::Regex;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use sql::statements::create::Partitions;
use store_api::metric_engine_consts::{
LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY,
};
use table::dist_table::DistTable;
use table::metadata::{RawTableInfo, TableInfo};
use table::table_name::TableName;
use table::table_reference::TableReference;
use table::TableRef;
use crate::error::{
CatalogSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu, EmptyDdlExprSnafu,
ExecuteDdlSnafu, InvalidPartitionRuleSnafu, InvalidTableNameSnafu, Result, SchemaNotFoundSnafu,
SchemaReadOnlySnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu, UnexpectedSnafu,
};
use crate::insert::build_create_table_expr;
use crate::statement::ddl::{create_table_info, parse_partitions};
// TODO(yingwen): Replaces operator::statement::ddl::NAME_PATTERN_REG
lazy_static! {
/// Regex to validate table name.
static ref NAME_PATTERN_REG: Regex = Regex::new(&format!("^{NAME_PATTERN}$")).unwrap();
}
/// Helper to query and manipulate table schemas.
pub struct SchemaHelper {
catalog_manager: CatalogManagerRef,
table_metadata_manager: TableMetadataManagerRef,
procedure_executor: ProcedureExecutorRef,
}
impl SchemaHelper {
/// Creates a new [`SchemaHelper`].
pub fn new(
catalog_manager: CatalogManagerRef,
table_metadata_manager: TableMetadataManagerRef,
procedure_executor: ProcedureExecutorRef,
) -> Self {
Self {
catalog_manager,
table_metadata_manager,
procedure_executor,
}
}
/// Gets the table by catalog, schema and table name.
pub async fn get_table(
&self,
catalog: &str,
schema: &str,
table: &str,
) -> Result<Option<TableRef>> {
self.catalog_manager
.table(catalog, schema, table, None)
.await
.context(CatalogSnafu)
}
// TODO(yingwen): Remove Inserter::create_physical_table_on_demand()
// TODO(yingwen): Can we create the physical table with all columns from the prometheus metrics?
/// Creates a physical table for metric engine.
///
/// If table already exists, do nothing.
pub async fn create_metric_physical_table(
&self,
ctx: &QueryContextRef,
physical_table: String,
) -> Result<()> {
let catalog_name = ctx.current_catalog();
let schema_name = ctx.current_schema();
// check if exist
if self
.get_table(catalog_name, &schema_name, &physical_table)
.await?
.is_some()
{
return Ok(());
}
let table_reference = TableReference::full(catalog_name, &schema_name, &physical_table);
common_telemetry::info!(
"Physical metric table `{table_reference}` does not exist, try creating table"
);
// schema with timestamp and field column
let default_schema = vec![
ColumnSchema {
column_name: GREPTIME_TIMESTAMP.to_string(),
datatype: ColumnDataType::TimestampMillisecond as _,
semantic_type: SemanticType::Timestamp as _,
datatype_extension: None,
options: None,
},
ColumnSchema {
column_name: GREPTIME_VALUE.to_string(),
datatype: ColumnDataType::Float64 as _,
semantic_type: SemanticType::Field as _,
datatype_extension: None,
options: None,
},
];
let create_table_expr =
&mut build_create_table_expr(&table_reference, &default_schema, default_engine())?;
create_table_expr.engine = METRIC_ENGINE_NAME.to_string();
create_table_expr
.table_options
.insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "true".to_string());
// create physical table.
// TODO(yingwen): Simplify this function. But remember to start the timer.
let res = self
.create_table_by_expr(create_table_expr, None, ctx.clone())
.await;
match res {
Ok(_) => {
common_telemetry::info!("Successfully created table {table_reference}",);
Ok(())
}
Err(err) => {
common_telemetry::error!(err; "Failed to create table {table_reference}");
Err(err)
}
}
}
// TODO(yingwen): Replace StatementExecutor::create_table_inner().
/// Creates a table by [CreateTableExpr].
#[tracing::instrument(skip_all)]
pub async fn create_table_by_expr(
&self,
create_table: &mut CreateTableExpr,
partitions: Option<Partitions>,
query_ctx: QueryContextRef,
) -> Result<TableRef> {
ensure!(
!is_readonly_schema(&create_table.schema_name),
SchemaReadOnlySnafu {
name: create_table.schema_name.clone()
}
);
if create_table.engine == METRIC_ENGINE_NAME
&& create_table
.table_options
.contains_key(LOGICAL_TABLE_METADATA_KEY)
{
// Create logical tables
ensure!(
partitions.is_none(),
InvalidPartitionRuleSnafu {
reason: "logical table in metric engine should not have partition rule, it will be inherited from physical table",
}
);
self.create_logical_tables(std::slice::from_ref(create_table), query_ctx)
.await?
.into_iter()
.next()
.context(UnexpectedSnafu {
violated: "expected to create logical tables",
})
} else {
// Create other normal table
self.create_non_logic_table(create_table, partitions, query_ctx)
.await
}
}
// TODO(yingwen): Replaces StatementExecutor::create_non_logic_table()
/// Creates a non-logical table.
/// - If the schema doesn't exist, returns an error
/// - If the table already exists:
/// - If `create_if_not_exists` is true, returns the existing table
/// - If `create_if_not_exists` is false, returns an error
#[tracing::instrument(skip_all)]
pub async fn create_non_logic_table(
&self,
create_table: &mut CreateTableExpr,
partitions: Option<Partitions>,
query_ctx: QueryContextRef,
) -> Result<TableRef> {
let _timer = crate::metrics::DIST_CREATE_TABLE.start_timer();
// Check if schema exists
let schema = self
.table_metadata_manager
.schema_manager()
.get(SchemaNameKey::new(
&create_table.catalog_name,
&create_table.schema_name,
))
.await
.context(TableMetadataManagerSnafu)?;
ensure!(
schema.is_some(),
SchemaNotFoundSnafu {
schema_info: &create_table.schema_name,
}
);
// if table exists.
if let Some(table) = self
.catalog_manager
.table(
&create_table.catalog_name,
&create_table.schema_name,
&create_table.table_name,
Some(&query_ctx),
)
.await
.context(CatalogSnafu)?
{
return if create_table.create_if_not_exists {
Ok(table)
} else {
TableAlreadyExistsSnafu {
table: format_full_table_name(
&create_table.catalog_name,
&create_table.schema_name,
&create_table.table_name,
),
}
.fail()
};
}
ensure!(
NAME_PATTERN_REG.is_match(&create_table.table_name),
InvalidTableNameSnafu {
table_name: &create_table.table_name,
}
);
let table_name = TableName::new(
&create_table.catalog_name,
&create_table.schema_name,
&create_table.table_name,
);
let (partitions, partition_cols) = parse_partitions(create_table, partitions, &query_ctx)?;
let mut table_info = create_table_info(create_table, partition_cols)?;
let resp = self
.create_table_procedure(
create_table.clone(),
partitions,
table_info.clone(),
query_ctx,
)
.await?;
let table_id = resp.table_ids.into_iter().next().context(UnexpectedSnafu {
violated: "expected table_id",
})?;
common_telemetry::info!(
"Successfully created table '{table_name}' with table id {table_id}"
);
table_info.ident.table_id = table_id;
let table_info: Arc<TableInfo> =
Arc::new(table_info.try_into().context(CreateTableInfoSnafu)?);
create_table.table_id = Some(api::v1::TableId { id: table_id });
let table = DistTable::table(table_info);
Ok(table)
}
// TODO(yingwen): Replace StatementExecutor::create_logical_tables
/// Creates logical tables.
#[tracing::instrument(skip_all)]
pub async fn create_logical_tables(
&self,
create_table_exprs: &[CreateTableExpr],
query_context: QueryContextRef,
) -> Result<Vec<TableRef>> {
let _timer = crate::metrics::DIST_CREATE_TABLES.start_timer();
ensure!(
!create_table_exprs.is_empty(),
EmptyDdlExprSnafu {
name: "create logic tables"
}
);
// Check table names
for create_table in create_table_exprs {
ensure!(
NAME_PATTERN_REG.is_match(&create_table.table_name),
InvalidTableNameSnafu {
table_name: &create_table.table_name,
}
);
}
let mut raw_tables_info = create_table_exprs
.iter()
.map(|create| create_table_info(create, vec![]))
.collect::<Result<Vec<_>>>()?;
let tables_data = create_table_exprs
.iter()
.cloned()
.zip(raw_tables_info.iter().cloned())
.collect::<Vec<_>>();
let resp = self
.create_logical_tables_procedure(tables_data, query_context)
.await?;
let table_ids = resp.table_ids;
ensure!(table_ids.len() == raw_tables_info.len(), CreateLogicalTablesSnafu {
reason: format!("The number of tables is inconsistent with the expected number to be created, expected: {}, actual: {}", raw_tables_info.len(), table_ids.len())
});
common_telemetry::info!("Successfully created logical tables: {:?}", table_ids);
for (i, table_info) in raw_tables_info.iter_mut().enumerate() {
table_info.ident.table_id = table_ids[i];
}
let tables_info = raw_tables_info
.into_iter()
.map(|x| x.try_into().context(CreateTableInfoSnafu))
.collect::<Result<Vec<_>>>()?;
Ok(tables_info
.into_iter()
.map(|x| DistTable::table(Arc::new(x)))
.collect())
}
// TODO(yingwen): Replace StatementExecutor::create_table_procedure
/// Submits a procedure to create a non-logical table.
async fn create_table_procedure(
&self,
create_table: CreateTableExpr,
partitions: Vec<Partition>,
table_info: RawTableInfo,
query_context: QueryContextRef,
) -> Result<SubmitDdlTaskResponse> {
let partitions = partitions.into_iter().map(Into::into).collect();
let request = SubmitDdlTaskRequest {
query_context,
task: DdlTask::new_create_table(create_table, partitions, table_info),
};
self.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), request)
.await
.context(ExecuteDdlSnafu)
}
// TODO(yingwen): Replace StatementExecutor::create_logical_tables_procedure
/// Submits a procedure to create logical tables.
async fn create_logical_tables_procedure(
&self,
tables_data: Vec<(CreateTableExpr, RawTableInfo)>,
query_context: QueryContextRef,
) -> Result<SubmitDdlTaskResponse> {
let request = SubmitDdlTaskRequest {
query_context,
task: DdlTask::new_create_logical_tables(tables_data),
};
self.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), request)
.await
.context(ExecuteDdlSnafu)
}
}

View File

@@ -18,7 +18,7 @@ mod copy_query_to;
mod copy_table_from;
mod copy_table_to;
mod cursor;
mod ddl;
pub(crate) mod ddl;
mod describe;
mod dml;
mod kill;

View File

@@ -1586,7 +1586,7 @@ impl StatementExecutor {
}
/// Parse partition statement [Partitions] into [MetaPartition] and partition columns.
fn parse_partitions(
pub(crate) fn parse_partitions(
create_table: &CreateTableExpr,
partitions: Option<Partitions>,
query_ctx: &QueryContextRef,
@@ -1619,7 +1619,7 @@ fn parse_partitions(
))
}
fn create_table_info(
pub(crate) fn create_table_info(
create_table: &CreateTableExpr,
partition_columns: Vec<String>,
) -> Result<RawTableInfo> {