From 207709c72727bf08cd29f83b8890b2a102585c52 Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 19 Jun 2025 22:32:58 +0800 Subject: [PATCH] refactor: remove Inserter::create_physical_table_on_demand Signed-off-by: evenyag --- src/operator/src/insert.rs | 75 +++---------------------------- src/operator/src/schema_helper.rs | 9 ---- 2 files changed, 5 insertions(+), 79 deletions(-) diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 0164662a8d..0c38f9a1dc 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -22,8 +22,8 @@ use api::v1::region::{ RegionRequestHeader, }; use api::v1::{ - AlterTableExpr, ColumnDataType, ColumnSchema, CreateTableExpr, InsertRequests, - RowInsertRequest, RowInsertRequests, SemanticType, + AlterTableExpr, ColumnSchema, CreateTableExpr, InsertRequests, RowInsertRequest, + RowInsertRequests, SemanticType, }; use client::{OutputData, OutputMeta}; use common_catalog::consts::{ @@ -34,7 +34,6 @@ use common_grpc_expr::util::ColumnExpr; use common_meta::cache::TableFlownodeSetCacheRef; use common_meta::node_manager::{AffectedRows, NodeManagerRef}; use common_meta::peer::Peer; -use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; use common_query::Output; use common_telemetry::tracing_context::TracingContext; use common_telemetry::{error, info, warn}; @@ -48,9 +47,7 @@ use snafu::ResultExt; use sql::partition::partition_rule_for_hexstring; use sql::statements::create::Partitions; use sql::statements::insert::Insert; -use store_api::metric_engine_consts::{ - LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY, -}; +use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME}; use store_api::mito_engine_options::{APPEND_MODE_KEY, MERGE_MODE_KEY}; use store_api::storage::{RegionId, TableId}; use table::metadata::TableInfo; @@ -280,7 +277,8 @@ impl Inserter { validate_column_count_match(&requests)?; // check and create physical table - self.create_physical_table_on_demand(&ctx, physical_table.clone()) + self.schema_helper + .create_metric_physical_table(&ctx, physical_table.clone()) .await?; // check and create logical tables @@ -667,69 +665,6 @@ impl Inserter { }) } - async fn create_physical_table_on_demand( - &self, - ctx: &QueryContextRef, - physical_table: String, - ) -> Result<()> { - let catalog_name = ctx.current_catalog(); - let schema_name = ctx.current_schema(); - - // check if exist - if self - .get_table(catalog_name, &schema_name, &physical_table) - .await? - .is_some() - { - return Ok(()); - } - - let table_reference = TableReference::full(catalog_name, &schema_name, &physical_table); - info!("Physical metric table `{table_reference}` does not exist, try creating table"); - - // schema with timestamp and field column - let default_schema = vec![ - ColumnSchema { - column_name: GREPTIME_TIMESTAMP.to_string(), - datatype: ColumnDataType::TimestampMillisecond as _, - semantic_type: SemanticType::Timestamp as _, - datatype_extension: None, - options: None, - }, - ColumnSchema { - column_name: GREPTIME_VALUE.to_string(), - datatype: ColumnDataType::Float64 as _, - semantic_type: SemanticType::Field as _, - datatype_extension: None, - options: None, - }, - ]; - let create_table_expr = - &mut build_create_table_expr(&table_reference, &default_schema, default_engine())?; - - create_table_expr.engine = METRIC_ENGINE_NAME.to_string(); - create_table_expr - .table_options - .insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "true".to_string()); - - // create physical table - let res = self - .schema_helper - .create_table_by_expr(create_table_expr, None, ctx.clone()) - .await; - - match res { - Ok(_) => { - info!("Successfully created table {table_reference}",); - Ok(()) - } - Err(err) => { - error!(err; "Failed to create table {table_reference}"); - Err(err) - } - } - } - async fn get_table( &self, catalog: &str, diff --git a/src/operator/src/schema_helper.rs b/src/operator/src/schema_helper.rs index a4e3d3d092..cef839716e 100644 --- a/src/operator/src/schema_helper.rs +++ b/src/operator/src/schema_helper.rs @@ -92,7 +92,6 @@ impl SchemaHelper { .context(CatalogSnafu) } - // TODO(yingwen): Remove Inserter::create_physical_table_on_demand() // TODO(yingwen): Can we create the physical table with all columns from the prometheus metrics? /// Creates a physical table for metric engine. /// @@ -160,7 +159,6 @@ impl SchemaHelper { } } - // TODO(yingwen): Replace StatementExecutor::create_table_inner(). /// Creates a table by [CreateTableExpr]. #[tracing::instrument(skip_all)] pub async fn create_table_by_expr( @@ -202,7 +200,6 @@ impl SchemaHelper { } } - // TODO(yingwen): Replaces StatementExecutor::create_non_logic_table() /// Creates a non-logical table. /// - If the schema doesn't exist, returns an error /// - If the table already exists: @@ -303,7 +300,6 @@ impl SchemaHelper { Ok(table) } - // TODO(yingwen): Replace StatementExecutor::create_logical_tables /// Creates logical tables. #[tracing::instrument(skip_all)] pub async fn create_logical_tables( @@ -363,7 +359,6 @@ impl SchemaHelper { .collect()) } - // TODO(yingwen): Replaces StatementExecutor::alter_table_inner /// Alters a table by [AlterTableExpr]. #[tracing::instrument(skip_all)] pub async fn alter_table_by_expr( @@ -483,7 +478,6 @@ impl SchemaHelper { Ok(Output::new_with_affected_rows(0)) } - // TODO(yingwen): Replaces StatementExecutor::alter_logical_tables /// Alter logical tables. pub async fn alter_logical_tables( &self, @@ -547,7 +541,6 @@ impl SchemaHelper { &self.catalog_manager } - // TODO(yingwen): Replace StatementExecutor::create_table_procedure /// Submits a procedure to create a non-logical table. async fn create_table_procedure( &self, @@ -569,7 +562,6 @@ impl SchemaHelper { .context(ExecuteDdlSnafu) } - // TODO(yingwen): Replace StatementExecutor::create_logical_tables_procedure /// Submits a procedure to create logical tables. async fn create_logical_tables_procedure( &self, @@ -587,7 +579,6 @@ impl SchemaHelper { .context(ExecuteDdlSnafu) } - // TODO(yingwen): Replace StatementExecutor::alter_logical_tables_procedure /// Submits a procedure to alter logical tables. async fn alter_logical_tables_procedure( &self,