refactor: remove Inserter::create_physical_table_on_demand

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
evenyag
2025-06-19 22:32:58 +08:00
committed by Lei, HUANG
parent deca8c44fa
commit 207709c727
2 changed files with 5 additions and 79 deletions

View File

@@ -22,8 +22,8 @@ use api::v1::region::{
RegionRequestHeader,
};
use api::v1::{
AlterTableExpr, ColumnDataType, ColumnSchema, CreateTableExpr, InsertRequests,
RowInsertRequest, RowInsertRequests, SemanticType,
AlterTableExpr, ColumnSchema, CreateTableExpr, InsertRequests, RowInsertRequest,
RowInsertRequests, SemanticType,
};
use client::{OutputData, OutputMeta};
use common_catalog::consts::{
@@ -34,7 +34,6 @@ use common_grpc_expr::util::ColumnExpr;
use common_meta::cache::TableFlownodeSetCacheRef;
use common_meta::node_manager::{AffectedRows, NodeManagerRef};
use common_meta::peer::Peer;
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use common_query::Output;
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{error, info, warn};
@@ -48,9 +47,7 @@ use snafu::ResultExt;
use sql::partition::partition_rule_for_hexstring;
use sql::statements::create::Partitions;
use sql::statements::insert::Insert;
use store_api::metric_engine_consts::{
LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY,
};
use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
use store_api::mito_engine_options::{APPEND_MODE_KEY, MERGE_MODE_KEY};
use store_api::storage::{RegionId, TableId};
use table::metadata::TableInfo;
@@ -280,7 +277,8 @@ impl Inserter {
validate_column_count_match(&requests)?;
// check and create physical table
self.create_physical_table_on_demand(&ctx, physical_table.clone())
self.schema_helper
.create_metric_physical_table(&ctx, physical_table.clone())
.await?;
// check and create logical tables
@@ -667,69 +665,6 @@ impl Inserter {
})
}
async fn create_physical_table_on_demand(
&self,
ctx: &QueryContextRef,
physical_table: String,
) -> Result<()> {
let catalog_name = ctx.current_catalog();
let schema_name = ctx.current_schema();
// check if exist
if self
.get_table(catalog_name, &schema_name, &physical_table)
.await?
.is_some()
{
return Ok(());
}
let table_reference = TableReference::full(catalog_name, &schema_name, &physical_table);
info!("Physical metric table `{table_reference}` does not exist, try creating table");
// schema with timestamp and field column
let default_schema = vec![
ColumnSchema {
column_name: GREPTIME_TIMESTAMP.to_string(),
datatype: ColumnDataType::TimestampMillisecond as _,
semantic_type: SemanticType::Timestamp as _,
datatype_extension: None,
options: None,
},
ColumnSchema {
column_name: GREPTIME_VALUE.to_string(),
datatype: ColumnDataType::Float64 as _,
semantic_type: SemanticType::Field as _,
datatype_extension: None,
options: None,
},
];
let create_table_expr =
&mut build_create_table_expr(&table_reference, &default_schema, default_engine())?;
create_table_expr.engine = METRIC_ENGINE_NAME.to_string();
create_table_expr
.table_options
.insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "true".to_string());
// create physical table
let res = self
.schema_helper
.create_table_by_expr(create_table_expr, None, ctx.clone())
.await;
match res {
Ok(_) => {
info!("Successfully created table {table_reference}",);
Ok(())
}
Err(err) => {
error!(err; "Failed to create table {table_reference}");
Err(err)
}
}
}
async fn get_table(
&self,
catalog: &str,

View File

@@ -92,7 +92,6 @@ impl SchemaHelper {
.context(CatalogSnafu)
}
// TODO(yingwen): Remove Inserter::create_physical_table_on_demand()
// TODO(yingwen): Can we create the physical table with all columns from the prometheus metrics?
/// Creates a physical table for metric engine.
///
@@ -160,7 +159,6 @@ impl SchemaHelper {
}
}
// TODO(yingwen): Replace StatementExecutor::create_table_inner().
/// Creates a table by [CreateTableExpr].
#[tracing::instrument(skip_all)]
pub async fn create_table_by_expr(
@@ -202,7 +200,6 @@ impl SchemaHelper {
}
}
// TODO(yingwen): Replaces StatementExecutor::create_non_logic_table()
/// Creates a non-logical table.
/// - If the schema doesn't exist, returns an error
/// - If the table already exists:
@@ -303,7 +300,6 @@ impl SchemaHelper {
Ok(table)
}
// TODO(yingwen): Replace StatementExecutor::create_logical_tables
/// Creates logical tables.
#[tracing::instrument(skip_all)]
pub async fn create_logical_tables(
@@ -363,7 +359,6 @@ impl SchemaHelper {
.collect())
}
// TODO(yingwen): Replaces StatementExecutor::alter_table_inner
/// Alters a table by [AlterTableExpr].
#[tracing::instrument(skip_all)]
pub async fn alter_table_by_expr(
@@ -483,7 +478,6 @@ impl SchemaHelper {
Ok(Output::new_with_affected_rows(0))
}
// TODO(yingwen): Replaces StatementExecutor::alter_logical_tables
/// Alter logical tables.
pub async fn alter_logical_tables(
&self,
@@ -547,7 +541,6 @@ impl SchemaHelper {
&self.catalog_manager
}
// TODO(yingwen): Replace StatementExecutor::create_table_procedure
/// Submits a procedure to create a non-logical table.
async fn create_table_procedure(
&self,
@@ -569,7 +562,6 @@ impl SchemaHelper {
.context(ExecuteDdlSnafu)
}
// TODO(yingwen): Replace StatementExecutor::create_logical_tables_procedure
/// Submits a procedure to create logical tables.
async fn create_logical_tables_procedure(
&self,
@@ -587,7 +579,6 @@ impl SchemaHelper {
.context(ExecuteDdlSnafu)
}
// TODO(yingwen): Replace StatementExecutor::alter_logical_tables_procedure
/// Submits a procedure to alter logical tables.
async fn alter_logical_tables_procedure(
&self,