diff --git a/src/operator/src/schema_helper.rs b/src/operator/src/schema_helper.rs index cef839716e..41d22045b2 100644 --- a/src/operator/src/schema_helper.rs +++ b/src/operator/src/schema_helper.rs @@ -17,12 +17,17 @@ use std::collections::HashMap; use std::sync::Arc; -use api::v1::{AlterTableExpr, ColumnDataType, ColumnSchema, CreateTableExpr, SemanticType}; +use api::v1::alter_table_expr::Kind; +use api::v1::{ + AddColumn, AddColumns, AlterTableExpr, ColumnDataType, ColumnDef, ColumnSchema, + CreateTableExpr, SemanticType, +}; use catalog::CatalogManagerRef; use common_catalog::consts::{ default_engine, is_readonly_schema, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, }; use common_catalog::format_full_table_name; +use common_grpc_expr::util::ColumnExpr; use common_meta::cache_invalidator::{CacheInvalidatorRef, Context}; use common_meta::ddl::{ExecutorContext, ProcedureExecutorRef}; use common_meta::instruction::CacheIdent; @@ -51,6 +56,7 @@ use crate::error::{ Result, SchemaNotFoundSnafu, SchemaReadOnlySnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu, UnexpectedSnafu, }; +use crate::expr_helper; use crate::insert::build_create_table_expr; use crate::statement::ddl::{create_table_info, parse_partitions, verify_alter, NAME_PATTERN_REG}; @@ -596,3 +602,116 @@ impl SchemaHelper { .context(ExecuteDdlSnafu) } } + +/// Schema of a logical table. +struct LogicalSchema { + /// Name of the logical table. + name: String, + /// Schema of columns in the logical table. + columns: Vec, +} + +/// Logical table schemas. +struct LogicalSchemas { + /// Logical table schemas group by physical table name. + schemas: HashMap>, +} + +/// Creates or alters logical tables to match the provided schemas +/// for prometheus metrics. +async fn ensure_logical_tables_for_metrics( + helper: &SchemaHelper, + schemas: &LogicalSchemas, + query_ctx: &QueryContextRef, +) -> Result<()> { + // 1. For each physical table, creates it if it doesn't exist. + for (physical_table_name, _) in &schemas.schemas { + // Check if the physical table exists and create it if it doesn't + let physical_table_opt = helper + .get_table( + &query_ctx.current_catalog(), + &query_ctx.current_schema(), + physical_table_name, + ) + .await?; + + if physical_table_opt.is_none() { + // Physical table doesn't exist, create it + helper + .create_metric_physical_table(query_ctx, physical_table_name.clone()) + .await?; + } + } + + // 2. Collects logical tables that do not exist. (CreateTableExpr) + let mut tables_to_create: Vec = Vec::new(); + + // 3. Collects alterations (columns to add) for each logical table. (AlterTableExpr) + let mut tables_to_alter: Vec = Vec::new(); + + let catalog_name = query_ctx.current_catalog(); + let schema_name = query_ctx.current_schema(); + // Process each logical table to determine if it needs to be created or altered + for (physical_table_name, logical_schemas) in &schemas.schemas { + for logical_schema in logical_schemas { + let table_name = &logical_schema.name; + + // Check if the logical table exists + let table_opt = helper + .get_table(catalog_name, &schema_name, table_name) + .await?; + + if let Some(existing_table) = table_opt { + // Logical table exists, determine if it needs alteration + let existing_schema = existing_table.schema(); + let column_exprs = ColumnExpr::from_column_schemas(&logical_schema.columns); + let add_columns = + expr_helper::extract_add_columns_expr(&existing_schema, column_exprs)?; + let Some(add_columns) = add_columns else { + continue; + }; + + let alter_expr = AlterTableExpr { + catalog_name: catalog_name.to_string(), + schema_name: schema_name.clone(), + table_name: table_name.to_string(), + kind: Some(Kind::AddColumns(add_columns)), + }; + tables_to_alter.push(alter_expr); + } else { + // Logical table doesn't exist, prepare for creation + // Build a CreateTableExpr from the table reference and columns + let table_ref = TableReference::full(catalog_name, &schema_name, table_name); + let mut create_expr = build_create_table_expr( + &table_ref, + &logical_schema.columns, + METRIC_ENGINE_NAME, + )?; + create_expr.create_if_not_exists = true; + // Add the logical table metadata key to link with physical table + create_expr.table_options.insert( + LOGICAL_TABLE_METADATA_KEY.to_string(), + physical_table_name.clone(), + ); + + tables_to_create.push(create_expr); + } + } + } + + // 4. Creates logical tables in batch using `create_logical_tables()`. + if !tables_to_create.is_empty() { + helper + .create_logical_tables(&tables_to_create, query_ctx.clone()) + .await?; + } + + // 5. Alters logical tables in batch using `alter_logical_tables()`. + if !tables_to_alter.is_empty() { + helper + .alter_logical_tables(tables_to_alter, query_ctx.clone()) + .await?; + } + + Ok(()) +}