diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index d065253d9b..c569591e00 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -26,7 +26,7 @@ use common_error::ext::BoxedError; use common_meta::cache::{LayeredCacheRegistryRef, TableFlownodeSetCacheRef, TableRouteCacheRef}; use common_meta::ddl::ProcedureExecutorRef; use common_meta::key::flow::FlowMetadataManagerRef; -use common_meta::key::TableMetadataManagerRef; +use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; use common_meta::node_manager::{Flownode, NodeManagerRef}; use common_query::Output; @@ -37,6 +37,7 @@ use greptime_proto::v1::flow::{flow_server, FlowRequest, FlowResponse, InsertReq use itertools::Itertools; use operator::delete::Deleter; use operator::insert::Inserter; +use operator::schema_helper::SchemaHelper; use operator::statement::StatementExecutor; use partition::manager::PartitionRuleManager; use query::{QueryEngine, QueryEngineFactory}; @@ -546,8 +547,14 @@ impl FrontendInvoker { name: TABLE_FLOWNODE_SET_CACHE_NAME, })?; - let inserter = Arc::new(Inserter::new( + let schema_helper = SchemaHelper::new( catalog_manager.clone(), + Arc::new(TableMetadataManager::new(kv_backend.clone())), + procedure_executor.clone(), + layered_cache_registry.clone(), + ); + let inserter = Arc::new(Inserter::new( + schema_helper, partition_manager.clone(), node_manager.clone(), table_flownode_cache, @@ -588,7 +595,7 @@ impl FrontendInvoker { .start_timer(); self.inserter - .handle_row_inserts(requests, ctx, &self.statement_executor, false, false) + .handle_row_inserts(requests, ctx, false, false) .await .map_err(BoxedError::new) .context(common_frontend::error::ExternalSnafu) diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index d92bd5737a..073f6b0eff 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -116,8 +116,7 @@ impl Frontend { if let Some(t) = self.export_metrics_task.as_ref() { if t.send_by_handler { let inserter = self.instance.inserter().clone(); - let statement_executor = self.instance.statement_executor().clone(); - let handler = ExportMetricHandler::new_handler(inserter, statement_executor); + let handler = ExportMetricHandler::new_handler(inserter); t.start(Some(handler)).context(error::StartServerSnafu)? } else { t.start(None).context(error::StartServerSnafu)?; diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs index e9c132da42..3ef7552124 100644 --- a/src/frontend/src/instance/builder.rs +++ b/src/frontend/src/instance/builder.rs @@ -30,6 +30,7 @@ use operator::flow::FlowServiceOperator; use operator::insert::Inserter; use operator::procedure::ProcedureServiceOperator; use operator::request::Requester; +use operator::schema_helper::SchemaHelper; use operator::statement::{StatementExecutor, StatementExecutorRef}; use operator::table::TableMutationOperator; use partition::manager::PartitionRuleManager; @@ -130,8 +131,15 @@ impl FrontendBuilder { name: TABLE_FLOWNODE_SET_CACHE_NAME, })?; - let inserter = Arc::new(Inserter::new( + let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); + let schema_helper = SchemaHelper::new( self.catalog_manager.clone(), + table_metadata_manager.clone(), + self.procedure_executor.clone(), + local_cache_invalidator.clone(), + ); + let inserter = Arc::new(Inserter::new( + schema_helper, partition_manager.clone(), node_manager.clone(), table_flownode_cache, @@ -176,7 +184,7 @@ impl FrontendBuilder { self.catalog_manager.clone(), query_engine.clone(), self.procedure_executor, - kv_backend.clone(), + kv_backend, local_cache_invalidator, inserter.clone(), table_route_cache, @@ -219,7 +227,7 @@ impl FrontendBuilder { plugins, inserter, deleter, - table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)), + table_metadata_manager, slow_query_recorder, limiter, process_manager, diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index 5383bd931a..69c78734ad 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -408,7 +408,7 @@ impl Instance { ctx: QueryContextRef, ) -> Result { self.inserter - .handle_column_inserts(requests, ctx, self.statement_executor.as_ref()) + .handle_column_inserts(requests, ctx) .await .context(TableOperationSnafu) } @@ -422,13 +422,7 @@ impl Instance { is_single_value: bool, ) -> Result { self.inserter - .handle_row_inserts( - requests, - ctx, - self.statement_executor.as_ref(), - accommodate_existing_schema, - is_single_value, - ) + .handle_row_inserts(requests, ctx, accommodate_existing_schema, is_single_value) .await .context(TableOperationSnafu) } @@ -441,10 +435,7 @@ impl Instance { ) -> Result { self.inserter .handle_last_non_null_inserts( - requests, - ctx, - self.statement_executor.as_ref(), - true, + requests, ctx, true, // Influx protocol may writes multiple fields (values). false, ) @@ -460,7 +451,7 @@ impl Instance { physical_table: String, ) -> Result { self.inserter - .handle_metric_row_inserts(requests, ctx, &self.statement_executor, physical_table) + .handle_metric_row_inserts(requests, ctx, physical_table) .await .context(TableOperationSnafu) } diff --git a/src/frontend/src/instance/log_handler.rs b/src/frontend/src/instance/log_handler.rs index 179d5e098f..8faf178610 100644 --- a/src/frontend/src/instance/log_handler.rs +++ b/src/frontend/src/instance/log_handler.rs @@ -135,7 +135,7 @@ impl Instance { }; self.inserter - .handle_log_inserts(log, ctx, self.statement_executor.as_ref()) + .handle_log_inserts(log, ctx) .await .map_err(BoxedError::new) .context(ExecuteGrpcRequestSnafu) @@ -157,7 +157,7 @@ impl Instance { }; self.inserter - .handle_trace_inserts(rows, ctx, self.statement_executor.as_ref()) + .handle_trace_inserts(rows, ctx) .await .map_err(BoxedError::new) .context(ExecuteGrpcRequestSnafu) diff --git a/src/frontend/src/instance/prom_store.rs b/src/frontend/src/instance/prom_store.rs index 13a02ff476..a73e42a5d1 100644 --- a/src/frontend/src/instance/prom_store.rs +++ b/src/frontend/src/instance/prom_store.rs @@ -28,7 +28,6 @@ use common_query::Output; use common_recordbatch::RecordBatches; use common_telemetry::{debug, tracing}; use operator::insert::InserterRef; -use operator::statement::StatementExecutor; use prost::Message; use servers::error::{self, AuthSnafu, InFlightWriteBytesExceededSnafu, Result as ServerResult}; use servers::http::header::{collect_plan_metrics, CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF}; @@ -271,18 +270,11 @@ impl PromStoreProtocolHandler for Instance { /// so only implement `PromStoreProtocolHandler::write` method. pub struct ExportMetricHandler { inserter: InserterRef, - statement_executor: Arc, } impl ExportMetricHandler { - pub fn new_handler( - inserter: InserterRef, - statement_executor: Arc, - ) -> PromStoreProtocolHandlerRef { - Arc::new(Self { - inserter, - statement_executor, - }) + pub fn new_handler(inserter: InserterRef) -> PromStoreProtocolHandlerRef { + Arc::new(Self { inserter }) } } @@ -295,12 +287,7 @@ impl PromStoreProtocolHandler for ExportMetricHandler { _: bool, ) -> ServerResult { self.inserter - .handle_metric_row_inserts( - request, - ctx, - &self.statement_executor, - GREPTIME_PHYSICAL_TABLE.to_string(), - ) + .handle_metric_row_inserts(request, ctx, GREPTIME_PHYSICAL_TABLE.to_string()) .await .map_err(BoxedError::new) .context(error::ExecuteGrpcQuerySnafu) diff --git a/src/frontend/src/slow_query_recorder.rs b/src/frontend/src/slow_query_recorder.rs index c87a25f4e3..593ba736d7 100644 --- a/src/frontend/src/slow_query_recorder.rs +++ b/src/frontend/src/slow_query_recorder.rs @@ -233,7 +233,7 @@ impl SlowQueryEventHandler { .into(); self.inserter - .handle_row_inserts(requests, query_ctx, &self.statement_executor, false, false) + .handle_row_inserts(requests, query_ctx, false, false) .await .context(TableOperationSnafu)?; diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index e95060b872..933b845886 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -25,7 +25,6 @@ use api::v1::{ AlterTableExpr, ColumnDataType, ColumnSchema, CreateTableExpr, InsertRequests, RowInsertRequest, RowInsertRequests, SemanticType, }; -use catalog::CatalogManagerRef; use client::{OutputData, OutputMeta}; use common_catalog::consts::{ default_engine, trace_services_table_name, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN, @@ -63,7 +62,7 @@ use table::table_reference::TableReference; use table::TableRef; use crate::error::{ - CatalogSnafu, ColumnOptionsSnafu, CreatePartitionRulesSnafu, FindRegionLeaderSnafu, + ColumnOptionsSnafu, CreatePartitionRulesSnafu, FindRegionLeaderSnafu, InvalidInsertRequestSnafu, JoinTaskSnafu, RequestInsertsSnafu, Result, TableNotFoundSnafu, }; use crate::expr_helper; @@ -72,10 +71,10 @@ use crate::req_convert::common::preprocess_row_insert_requests; use crate::req_convert::insert::{ fill_reqs_with_impure_default, ColumnToRow, RowToRegion, StatementToRegion, TableToRegion, }; -use crate::statement::StatementExecutor; +use crate::schema_helper::SchemaHelper; pub struct Inserter { - catalog_manager: CatalogManagerRef, + schema_helper: SchemaHelper, pub(crate) partition_manager: PartitionRuleManagerRef, pub(crate) node_manager: NodeManagerRef, pub(crate) table_flownode_set_cache: TableFlownodeSetCacheRef, @@ -127,13 +126,13 @@ pub struct InstantAndNormalInsertRequests { impl Inserter { pub fn new( - catalog_manager: CatalogManagerRef, + schema_helper: SchemaHelper, partition_manager: PartitionRuleManagerRef, node_manager: NodeManagerRef, table_flownode_set_cache: TableFlownodeSetCacheRef, ) -> Self { Self { - catalog_manager, + schema_helper, partition_manager, node_manager, table_flownode_set_cache, @@ -144,10 +143,9 @@ impl Inserter { &self, requests: InsertRequests, ctx: QueryContextRef, - statement_executor: &StatementExecutor, ) -> Result { let row_inserts = ColumnToRow::convert(requests)?; - self.handle_row_inserts(row_inserts, ctx, statement_executor, false, false) + self.handle_row_inserts(row_inserts, ctx, false, false) .await } @@ -156,7 +154,6 @@ impl Inserter { &self, mut requests: RowInsertRequests, ctx: QueryContextRef, - statement_executor: &StatementExecutor, accommodate_existing_schema: bool, is_single_value: bool, ) -> Result { @@ -164,7 +161,6 @@ impl Inserter { self.handle_row_inserts_with_create_type( requests, ctx, - statement_executor, AutoCreateTableType::Physical, accommodate_existing_schema, is_single_value, @@ -177,12 +173,10 @@ impl Inserter { &self, requests: RowInsertRequests, ctx: QueryContextRef, - statement_executor: &StatementExecutor, ) -> Result { self.handle_row_inserts_with_create_type( requests, ctx, - statement_executor, AutoCreateTableType::Log, false, false, @@ -194,12 +188,10 @@ impl Inserter { &self, requests: RowInsertRequests, ctx: QueryContextRef, - statement_executor: &StatementExecutor, ) -> Result { self.handle_row_inserts_with_create_type( requests, ctx, - statement_executor, AutoCreateTableType::Trace, false, false, @@ -212,14 +204,12 @@ impl Inserter { &self, requests: RowInsertRequests, ctx: QueryContextRef, - statement_executor: &StatementExecutor, accommodate_existing_schema: bool, is_single_value: bool, ) -> Result { self.handle_row_inserts_with_create_type( requests, ctx, - statement_executor, AutoCreateTableType::LastNonNull, accommodate_existing_schema, is_single_value, @@ -232,7 +222,6 @@ impl Inserter { &self, mut requests: RowInsertRequests, ctx: QueryContextRef, - statement_executor: &StatementExecutor, create_type: AutoCreateTableType, accommodate_existing_schema: bool, is_single_value: bool, @@ -254,7 +243,6 @@ impl Inserter { &mut requests, &ctx, create_type, - statement_executor, accommodate_existing_schema, is_single_value, ) @@ -280,7 +268,6 @@ impl Inserter { &self, mut requests: RowInsertRequests, ctx: QueryContextRef, - statement_executor: &StatementExecutor, physical_table: String, ) -> Result { // remove empty requests @@ -293,7 +280,7 @@ impl Inserter { validate_column_count_match(&requests)?; // check and create physical table - self.create_physical_table_on_demand(&ctx, physical_table.clone(), statement_executor) + self.create_physical_table_on_demand(&ctx, physical_table.clone()) .await?; // check and create logical tables @@ -305,7 +292,6 @@ impl Inserter { &mut requests, &ctx, AutoCreateTableType::Logical(physical_table.to_string()), - statement_executor, true, true, ) @@ -350,10 +336,13 @@ impl Inserter { insert: &Insert, ctx: &QueryContextRef, ) -> Result { - let (inserts, table_info) = - StatementToRegion::new(self.catalog_manager.as_ref(), &self.partition_manager, ctx) - .convert(insert, ctx) - .await?; + let (inserts, table_info) = StatementToRegion::new( + self.schema_helper.catalog_manager().as_ref(), + &self.partition_manager, + ctx, + ) + .convert(insert, ctx) + .await?; let table_infos = HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter()); @@ -482,7 +471,6 @@ impl Inserter { requests: &mut RowInsertRequests, ctx: &QueryContextRef, auto_create_table_type: AutoCreateTableType, - statement_executor: &StatementExecutor, accommodate_existing_schema: bool, is_single_value: bool, ) -> Result { @@ -543,7 +531,7 @@ impl Inserter { instant_table_ids.insert(table_info.table_id()); } table_infos.insert(table_info.table_id(), table.table_info()); - if let Some(alter_expr) = self.get_alter_table_expr_on_demand( + if let Some(alter_expr) = Self::get_alter_table_expr_on_demand( req, &table, ctx, @@ -565,9 +553,7 @@ impl Inserter { AutoCreateTableType::Logical(_) => { if !create_tables.is_empty() { // Creates logical tables in batch. - let tables = self - .create_logical_tables(create_tables, ctx, statement_executor) - .await?; + let tables = self.create_logical_tables(create_tables, ctx).await?; for table in tables { let table_info = table.table_info(); @@ -579,7 +565,7 @@ impl Inserter { } if !alter_tables.is_empty() { // Alter logical tables in batch. - statement_executor + self.schema_helper .alter_logical_tables(alter_tables, ctx.clone()) .await?; } @@ -590,9 +576,7 @@ impl Inserter { // note that auto create table shouldn't be ttl instant table // for it's a very unexpected behavior and should be set by user explicitly for create_table in create_tables { - let table = self - .create_physical_table(create_table, None, ctx, statement_executor) - .await?; + let table = self.create_physical_table(create_table, None, ctx).await?; let table_info = table.table_info(); if table_info.is_ttl_instant_table() { instant_table_ids.insert(table_info.table_id()); @@ -600,8 +584,8 @@ impl Inserter { table_infos.insert(table_info.table_id(), table.table_info()); } for alter_expr in alter_tables.into_iter() { - statement_executor - .alter_table_inner(alter_expr, ctx.clone()) + self.schema_helper + .alter_table_by_expr(alter_expr, ctx.clone()) .await?; } } @@ -619,9 +603,7 @@ impl Inserter { create_table .table_options .insert(APPEND_MODE_KEY.to_string(), "false".to_string()); - let table = self - .create_physical_table(create_table, None, ctx, statement_executor) - .await?; + let table = self.create_physical_table(create_table, None, ctx).await?; let table_info = table.table_info(); if table_info.is_ttl_instant_table() { instant_table_ids.insert(table_info.table_id()); @@ -662,12 +644,7 @@ impl Inserter { ); let table = self - .create_physical_table( - create_table, - Some(partitions), - ctx, - statement_executor, - ) + .create_physical_table(create_table, Some(partitions), ctx) .await?; let table_info = table.table_info(); if table_info.is_ttl_instant_table() { @@ -677,8 +654,8 @@ impl Inserter { } } for alter_expr in alter_tables.into_iter() { - statement_executor - .alter_table_inner(alter_expr, ctx.clone()) + self.schema_helper + .alter_table_by_expr(alter_expr, ctx.clone()) .await?; } } @@ -694,7 +671,6 @@ impl Inserter { &self, ctx: &QueryContextRef, physical_table: String, - statement_executor: &StatementExecutor, ) -> Result<()> { let catalog_name = ctx.current_catalog(); let schema_name = ctx.current_schema(); @@ -737,8 +713,9 @@ impl Inserter { .insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "true".to_string()); // create physical table - let res = statement_executor - .create_table_inner(create_table_expr, None, ctx.clone()) + let res = self + .schema_helper + .create_table_by_expr(create_table_expr, None, ctx.clone()) .await; match res { @@ -759,10 +736,7 @@ impl Inserter { schema: &str, table: &str, ) -> Result> { - self.catalog_manager - .table(catalog, schema, table, None) - .await - .context(CatalogSnafu) + self.schema_helper.get_table(catalog, schema, table).await } fn get_create_table_expr_on_demand( @@ -830,7 +804,6 @@ impl Inserter { /// When `accommodate_existing_schema` is true and `is_single_value` is true, it also consider fields when modifying the /// input `req`. fn get_alter_table_expr_on_demand( - &self, req: &mut RowInsertRequest, table: &TableRef, ctx: &QueryContextRef, @@ -918,7 +891,6 @@ impl Inserter { mut create_table_expr: CreateTableExpr, partitions: Option, ctx: &QueryContextRef, - statement_executor: &StatementExecutor, ) -> Result { { let table_ref = TableReference::full( @@ -929,8 +901,9 @@ impl Inserter { info!("Table `{table_ref}` does not exist, try creating table"); } - let res = statement_executor - .create_table_inner(&mut create_table_expr, partitions, ctx.clone()) + let res = self + .schema_helper + .create_table_by_expr(&mut create_table_expr, partitions, ctx.clone()) .await; let table_ref = TableReference::full( @@ -958,9 +931,9 @@ impl Inserter { &self, create_table_exprs: Vec, ctx: &QueryContextRef, - statement_executor: &StatementExecutor, ) -> Result> { - let res = statement_executor + let res = self + .schema_helper .create_logical_tables(&create_table_exprs, ctx.clone()) .await; @@ -1145,19 +1118,14 @@ mod tests { use api::v1::{ColumnSchema as GrpcColumnSchema, RowInsertRequest, Rows, SemanticType, Value}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; - use common_meta::cache::new_table_flownode_set_cache; - use common_meta::ddl::test_util::datanode_handler::NaiveDatanodeHandler; - use common_meta::test_util::MockDatanodeManager; use datatypes::data_type::ConcreteDataType; use datatypes::schema::ColumnSchema; - use moka::future::Cache; use session::context::QueryContext; use table::dist_table::DummyDataSource; use table::metadata::{TableInfoBuilder, TableMetaBuilder, TableType}; use table::TableRef; use super::*; - use crate::tests::{create_partition_rule_manager, prepare_mocked_backend}; fn make_table_ref_with_schema(ts_name: &str, field_name: &str) -> TableRef { let schema = datatypes::schema::SchemaBuilder::try_from_columns(vec![ @@ -1237,20 +1205,8 @@ mod tests { DEFAULT_SCHEMA_NAME, )); - let kv_backend = prepare_mocked_backend().await; - let inserter = Inserter::new( - catalog::memory::MemoryCatalogManager::new(), - create_partition_rule_manager(kv_backend.clone()).await, - Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)), - Arc::new(new_table_flownode_set_cache( - String::new(), - Cache::new(100), - kv_backend.clone(), - )), - ); - let alter_expr = inserter - .get_alter_table_expr_on_demand(&mut req, &table, &ctx, true, true) - .unwrap(); + let alter_expr = + Inserter::get_alter_table_expr_on_demand(&mut req, &table, &ctx, true, true).unwrap(); assert!(alter_expr.is_none()); // The request's schema should have updated names for timestamp and field columns diff --git a/src/operator/src/schema_helper.rs b/src/operator/src/schema_helper.rs index b4d146465f..dfcd93918d 100644 --- a/src/operator/src/schema_helper.rs +++ b/src/operator/src/schema_helper.rs @@ -62,7 +62,7 @@ lazy_static! { static ref NAME_PATTERN_REG: Regex = Regex::new(&format!("^{NAME_PATTERN}$")).unwrap(); } -/// Helper to query and manipulate table schemas. +/// Helper to query and manipulate (CREATE/ALTER) table schemas. #[derive(Clone)] pub struct SchemaHelper { catalog_manager: CatalogManagerRef, diff --git a/src/pipeline/src/manager/pipeline_operator.rs b/src/pipeline/src/manager/pipeline_operator.rs index 6ad190cf23..f03644b67c 100644 --- a/src/pipeline/src/manager/pipeline_operator.rs +++ b/src/pipeline/src/manager/pipeline_operator.rs @@ -88,7 +88,6 @@ impl PipelineOperator { catalog.to_string(), Arc::new(PipelineTable::new( self.inserter.clone(), - self.statement_executor.clone(), table, self.query_engine.clone(), )), diff --git a/src/pipeline/src/manager/table.rs b/src/pipeline/src/manager/table.rs index 4ca656a008..bb1492b963 100644 --- a/src/pipeline/src/manager/table.rs +++ b/src/pipeline/src/manager/table.rs @@ -30,7 +30,6 @@ use datatypes::timestamp::TimestampNanosecond; use datatypes::vectors::{StringVector, TimestampNanosecondVector, Vector}; use itertools::Itertools; use operator::insert::InserterRef; -use operator::statement::StatementExecutorRef; use query::dataframe::DataFrame; use query::QueryEngineRef; use session::context::{QueryContextBuilder, QueryContextRef}; @@ -61,7 +60,6 @@ pub(crate) const EMPTY_SCHEMA_NAME: &str = ""; /// Every catalog has its own pipeline table. pub struct PipelineTable { inserter: InserterRef, - statement_executor: StatementExecutorRef, table: TableRef, query_engine: QueryEngineRef, cache: PipelineCache, @@ -69,15 +67,9 @@ pub struct PipelineTable { impl PipelineTable { /// Create a new PipelineTable. - pub fn new( - inserter: InserterRef, - statement_executor: StatementExecutorRef, - table: TableRef, - query_engine: QueryEngineRef, - ) -> Self { + pub fn new(inserter: InserterRef, table: TableRef, query_engine: QueryEngineRef) -> Self { Self { inserter, - statement_executor, table, query_engine, cache: PipelineCache::new(), @@ -232,13 +224,7 @@ impl PipelineTable { let output = self .inserter - .handle_row_inserts( - requests, - Self::query_ctx(&table_info), - &self.statement_executor, - false, - false, - ) + .handle_row_inserts(requests, Self::query_ctx(&table_info), false, false) .await .context(InsertPipelineSnafu)?;