From ba679dddfaaca5cd451443321ddc3eea0397851f Mon Sep 17 00:00:00 2001 From: QuakeWang <45645138+QuakeWang@users.noreply.github.com> Date: Thu, 21 May 2026 15:16:22 +0800 Subject: [PATCH] fix: track INSERT SELECT in process manager (#8138) * fix: track insert select in process list Signed-off-by: QuakeWang * fix: avoid generic process tracking future Signed-off-by: QuakeWang --------- Signed-off-by: QuakeWang --- src/catalog/src/memory/manager.rs | 29 +++ src/frontend/src/instance.rs | 380 +++++++++++++++++++++++++++--- src/sql/src/statements/insert.rs | 46 ++++ 3 files changed, 424 insertions(+), 31 deletions(-) diff --git a/src/catalog/src/memory/manager.rs b/src/catalog/src/memory/manager.rs index 6e747f62ed..2aeec17a71 100644 --- a/src/catalog/src/memory/manager.rs +++ b/src/catalog/src/memory/manager.rs @@ -33,6 +33,8 @@ use table::metadata::{TableId, TableInfoRef}; use crate::error::{CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu}; use crate::information_schema::InformationSchemaProvider; +#[cfg(any(test, feature = "testing"))] +use crate::process_manager::ProcessManagerRef; use crate::system_schema::SystemSchemaProvider; use crate::{CatalogManager, DeregisterTableRequest, RegisterSchemaRequest, RegisterTableRequest}; @@ -258,6 +260,33 @@ impl MemoryCatalogManager { manager } + #[cfg(any(test, feature = "testing"))] + pub fn register_process_list_table(&self, process_manager: ProcessManagerRef) { + let backend = Arc::new(MemoryKvBackend::new()); + let manager = Arc::new(self.clone()); + let information_schema_provider = InformationSchemaProvider::new( + DEFAULT_CATALOG_NAME.to_string(), + Arc::downgrade(&manager) as Weak, + Arc::new(FlowMetadataManager::new(backend.clone())), + Some(process_manager), + backend, + ); + let process_list = information_schema_provider + .table(crate::information_schema::PROCESS_LIST) + .unwrap(); + self.catalogs + .write() + .unwrap() + .get_mut(DEFAULT_CATALOG_NAME) + .unwrap() + .get_mut(INFORMATION_SCHEMA_NAME) + .unwrap() + .insert( + crate::information_schema::PROCESS_LIST.to_string(), + process_list, + ); + } + fn schema_exist_sync(&self, catalog: &str, schema: &str) -> Result { Ok(self .catalogs diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 359d59ccf8..24075601f6 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -202,21 +202,25 @@ impl Instance { let query_interceptor = self.plugins.get::>(); let query_interceptor = query_interceptor.as_ref(); - if stmt.is_readonly() { - let slow_query_timer = self - .slow_query_options - .enable - .then(|| self.event_recorder.clone()) - .flatten() - .map(|event_recorder| { - SlowQueryTimer::new( - CatalogQueryStatement::Sql(stmt.clone()), - self.slow_query_options.threshold, - self.slow_query_options.sample_ratio, - self.slow_query_options.record_type, - event_recorder, - ) - }); + let is_readonly_stmt = stmt.is_readonly(); + if should_track_statement_process(&stmt) { + let slow_query_timer = if is_readonly_stmt { + self.slow_query_options + .enable + .then(|| self.event_recorder.clone()) + .flatten() + .map(|event_recorder| { + SlowQueryTimer::new( + CatalogQueryStatement::Sql(stmt.clone()), + self.slow_query_options.threshold, + self.slow_query_options.sample_ratio, + self.slow_query_options.record_type, + event_recorder, + ) + }) + } else { + None + }; let ticket = self.process_manager.register_query( query_ctx.current_catalog().to_string(), @@ -654,21 +658,25 @@ impl Instance { .map(|s| s.to_string()) .unwrap_or_else(|| plan.display_indent().to_string()); - let result = if is_readonly_plan(&plan) { - let slow_query_timer = self - .slow_query_options - .enable - .then(|| self.event_recorder.clone()) - .flatten() - .map(|event_recorder| { - SlowQueryTimer::new( - CatalogQueryStatement::Plan(query.clone()), - self.slow_query_options.threshold, - self.slow_query_options.sample_ratio, - self.slow_query_options.record_type, - event_recorder, - ) - }); + let plan_is_readonly = is_readonly_plan(&plan); + let result = if should_track_plan_process(stmt.as_ref(), &plan) { + let slow_query_timer = if plan_is_readonly { + self.slow_query_options + .enable + .then(|| self.event_recorder.clone()) + .flatten() + .map(|event_recorder| { + SlowQueryTimer::new( + CatalogQueryStatement::Plan(query.clone()), + self.slow_query_options.threshold, + self.slow_query_options.sample_ratio, + self.slow_query_options.record_type, + event_recorder, + ) + }) + } else { + None + }; let ticket = self.process_manager.register_query( query_ctx.current_catalog().to_string(), @@ -1183,6 +1191,16 @@ fn is_readonly_plan(plan: &LogicalPlan) -> bool { !matches!(plan, LogicalPlan::Dml(_) | LogicalPlan::Ddl(_)) } +fn should_track_statement_process(stmt: &Statement) -> bool { + stmt.is_readonly() + || matches!(stmt, Statement::Insert(insert) if insert.has_non_values_query_source()) +} + +fn should_track_plan_process(stmt: Option<&Statement>, plan: &LogicalPlan) -> bool { + is_readonly_plan(plan) + || matches!(stmt, Some(Statement::Insert(insert)) if insert.has_non_values_query_source()) +} + #[cfg(test)] mod tests { use std::collections::HashMap; @@ -1191,13 +1209,232 @@ mod tests { use std::thread; use std::time::{Duration, Instant}; + use api::v1::meta::{ProcedureDetailResponse, ReconcileRequest, ReconcileResponse}; + use catalog::process_manager::ProcessManager; use common_base::Plugins; + use common_meta::cache::LayeredCacheRegistryBuilder; + use common_meta::kv_backend::memory::MemoryKvBackend; + use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutor}; + use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse}; + use common_meta::rpc::procedure::{ + MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse, + }; + use common_query::Output; + use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; + use datafusion_expr::dml::InsertOp; + use datafusion_expr::{LogicalPlanBuilder, LogicalTableSource}; + use datatypes::prelude::ConcreteDataType; + use datatypes::schema::{ColumnSchema, Schema as GtSchema}; use query::query_engine::options::QueryOptions; - use session::context::QueryContext; + use session::context::{Channel, ConnInfo, QueryContext, QueryContextBuilder}; use sql::dialect::GreptimeDbDialect; use strfmt::Format; + use table::metadata::{TableInfoBuilder, TableMetaBuilder}; + use table::test_util::EmptyTable; + use tokio::sync::{mpsc, oneshot}; use super::*; + use crate::frontend::FrontendOptions; + use crate::instance::builder::FrontendBuilder; + + fn parse_one_sql(sql: &str) -> Statement { + parse_stmt(sql, &GreptimeDbDialect {}).unwrap().remove(0) + } + + fn test_query_ctx(process_id: u32) -> QueryContextRef { + Arc::new( + QueryContextBuilder::default() + .channel(Channel::Mysql) + .conn_info(ConnInfo::new(None, Channel::Mysql)) + .process_id(process_id) + .build(), + ) + } + + struct BlockingInsertSelectInterceptor { + started_tx: mpsc::UnboundedSender<()>, + finish_rx: std::sync::Mutex>>, + } + + impl BlockingInsertSelectInterceptor { + fn new(started_tx: mpsc::UnboundedSender<()>, finish_rx: oneshot::Receiver<()>) -> Self { + Self { + started_tx, + finish_rx: std::sync::Mutex::new(Some(finish_rx)), + } + } + } + + impl SqlQueryInterceptor for BlockingInsertSelectInterceptor { + type Error = Error; + + fn pre_execute( + &self, + statement: &Statement, + _plan: Option<&LogicalPlan>, + _query_ctx: QueryContextRef, + ) -> Result<()> { + let Statement::Insert(insert) = statement else { + return Ok(()); + }; + if !insert.has_non_values_query_source() { + return Ok(()); + } + + let finish_rx = self.finish_rx.lock().unwrap().take().unwrap(); + let _ = self.started_tx.send(()); + tokio::task::block_in_place(|| { + tokio::runtime::Handle::current() + .block_on(finish_rx) + .unwrap(); + }); + Ok(()) + } + } + + struct NoopProcedureExecutor; + + #[async_trait::async_trait] + impl ProcedureExecutor for NoopProcedureExecutor { + async fn submit_ddl_task( + &self, + _ctx: &ExecutorContext, + _request: SubmitDdlTaskRequest, + ) -> common_meta::error::Result { + common_meta::error::UnsupportedSnafu { + operation: "submit_ddl_task", + } + .fail() + } + + async fn migrate_region( + &self, + _ctx: &ExecutorContext, + _request: MigrateRegionRequest, + ) -> common_meta::error::Result { + common_meta::error::UnsupportedSnafu { + operation: "migrate_region", + } + .fail() + } + + async fn reconcile( + &self, + _ctx: &ExecutorContext, + _request: ReconcileRequest, + ) -> common_meta::error::Result { + common_meta::error::UnsupportedSnafu { + operation: "reconcile", + } + .fail() + } + + async fn query_procedure_state( + &self, + _ctx: &ExecutorContext, + _pid: &str, + ) -> common_meta::error::Result { + common_meta::error::UnsupportedSnafu { + operation: "query_procedure_state", + } + .fail() + } + + async fn list_procedures( + &self, + _ctx: &ExecutorContext, + ) -> common_meta::error::Result { + common_meta::error::UnsupportedSnafu { + operation: "list_procedures", + } + .fail() + } + } + + fn test_cache_registry( + kv_backend: common_meta::kv_backend::KvBackendRef, + ) -> common_meta::cache::LayeredCacheRegistryRef { + Arc::new( + cache::with_default_composite_cache_registry( + LayeredCacheRegistryBuilder::default() + .add_cache_registry(cache::build_fundamental_cache_registry(kv_backend)), + ) + .unwrap() + .build(), + ) + } + + fn test_table(table_id: u32, table_name: &str) -> table::TableRef { + let schema = Arc::new(GtSchema::new(vec![ + ColumnSchema::new("id", ConcreteDataType::int32_datatype(), false), + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true), + ])); + let table_meta = TableMetaBuilder::empty() + .schema(schema) + .primary_key_indices(vec![0]) + .value_indices(vec![1]) + .next_column_id(1024) + .build() + .unwrap(); + let table_info = TableInfoBuilder::new(table_name, table_meta) + .table_id(table_id) + .build() + .unwrap(); + + EmptyTable::from_table_info(&table_info) + } + + async fn test_instance_with_insert_select_interceptor( + interceptor: SqlQueryInterceptorRef, + ) -> Instance { + let kv_backend = Arc::new(MemoryKvBackend::new()); + let process_manager = Arc::new(ProcessManager::new("test-frontend".to_string(), None)); + let catalog_manager = + catalog::memory::MemoryCatalogManager::new_with_table(test_table(1024, "source")); + catalog_manager + .register_table_sync(catalog::RegisterTableRequest { + catalog: "greptime".to_string(), + schema: "public".to_string(), + table_name: "target".to_string(), + table_id: 1025, + table: test_table(1025, "target"), + }) + .unwrap(); + catalog_manager.register_process_list_table(process_manager.clone()); + + let cache_registry = test_cache_registry(kv_backend.clone()); + let plugins = Plugins::new(); + plugins.insert::>(interceptor); + + FrontendBuilder::new( + FrontendOptions::default(), + kv_backend, + cache_registry, + catalog_manager, + Arc::new(client::client_manager::NodeClients::default()), + Arc::new(NoopProcedureExecutor), + process_manager, + ) + .with_plugin(plugins) + .try_build() + .await + .unwrap() + } + + async fn execute_one_sql( + instance: &Instance, + sql: &str, + query_ctx: QueryContextRef, + ) -> Result { + let mut results = instance.do_query_inner(sql, query_ctx).await; + assert_eq!(1, results.len()); + results.remove(0) + } #[test] fn test_fast_legacy_check_deadlock_prevention() { @@ -1315,6 +1552,87 @@ mod tests { ); } + #[test] + fn test_should_track_statement_process() { + assert!(should_track_statement_process(&parse_one_sql( + "SELECT * FROM demo" + ))); + assert!(should_track_statement_process(&parse_one_sql( + "INSERT INTO demo SELECT * FROM source" + ))); + assert!(!should_track_statement_process(&parse_one_sql( + "INSERT INTO demo VALUES (1)" + ))); + assert!(!should_track_statement_process(&parse_one_sql( + "INSERT INTO demo VALUES (now())" + ))); + } + + #[test] + fn test_should_track_plan_process() { + let select_stmt = parse_one_sql("SELECT * FROM demo"); + let insert_select_stmt = parse_one_sql("INSERT INTO demo SELECT * FROM source"); + let insert_values_stmt = parse_one_sql("INSERT INTO demo VALUES (now())"); + + let empty_plan = LogicalPlanBuilder::empty(false).build().unwrap(); + assert!(should_track_plan_process(Some(&select_stmt), &empty_plan)); + assert!(should_track_plan_process( + Some(&insert_select_stmt), + &insert_dml_plan() + )); + assert!(!should_track_plan_process( + Some(&insert_values_stmt), + &insert_dml_plan() + )); + assert!(!should_track_plan_process(None, &insert_dml_plan())); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_insert_select_is_visible_in_show_processlist() { + let insert_sql = "INSERT INTO target SELECT * FROM source"; + let (started_tx, mut started_rx) = mpsc::unbounded_channel(); + let (finish_tx, finish_rx) = oneshot::channel(); + let interceptor = Arc::new(BlockingInsertSelectInterceptor::new(started_tx, finish_rx)); + let instance = Arc::new(test_instance_with_insert_select_interceptor(interceptor).await); + + let insert_task = tokio::spawn({ + let instance = instance.clone(); + async move { execute_one_sql(&instance, insert_sql, test_query_ctx(4242)).await } + }); + + tokio::time::timeout(Duration::from_secs(5), started_rx.recv()) + .await + .unwrap() + .unwrap(); + + let output = execute_one_sql(&instance, "SHOW PROCESSLIST", test_query_ctx(43)) + .await + .unwrap(); + let process_list = output.data.pretty_print().await; + assert!( + process_list.contains(insert_sql), + "process list did not contain running insert:\n{process_list}" + ); + + finish_tx.send(()).unwrap(); + insert_task.await.unwrap().unwrap(); + } + + fn insert_dml_plan() -> LogicalPlan { + let schema = SchemaRef::new(Schema::new(vec![Field::new( + "value", + DataType::Int64, + true, + )])); + let target = Arc::new(LogicalTableSource::new(schema)); + let input = LogicalPlanBuilder::empty(false).build().unwrap(); + + LogicalPlanBuilder::insert_into(input, "demo", target, InsertOp::Append) + .unwrap() + .build() + .unwrap() + } + #[test] fn test_exec_validation() { let query_ctx = QueryContext::arc(); diff --git a/src/sql/src/statements/insert.rs b/src/sql/src/statements/insert.rs index a7cae6a52a..cfda63f928 100644 --- a/src/sql/src/statements/insert.rs +++ b/src/sql/src/statements/insert.rs @@ -115,6 +115,17 @@ impl Insert { } } + /// Returns true when the insert source is a query rather than `VALUES`. + pub fn has_non_values_query_source(&self) -> bool { + match &self.inner { + Statement::Insert(SpInsert { + source: Some(box query), + .. + }) => !matches!(&*query.body, SetExpr::Values(_)), + _ => false, + } + } + pub fn query_body(&self) -> Result> { Ok(match &self.inner { Statement::Insert(SpInsert { @@ -321,6 +332,7 @@ mod tests { match stmt { Statement::Insert(insert) => { let q = insert.query_body().unwrap().unwrap(); + assert!(insert.has_non_values_query_source()); assert!(matches!( q.inner, Query { @@ -332,4 +344,38 @@ mod tests { _ => unreachable!(), } } + + #[test] + fn test_has_non_values_query_source() { + let cases = [ + ("INSERT INTO my_table SELECT * FROM other_table", true), + ( + "INSERT INTO my_table WITH cte AS (SELECT * FROM other_table) SELECT * FROM cte", + true, + ), + ( + "INSERT INTO my_table SELECT * FROM t1 UNION ALL SELECT * FROM t2", + true, + ), + ("INSERT INTO my_table VALUES(1)", false), + ("INSERT INTO my_table VALUES(now())", false), + ("INSERT INTO my_table VALUES(1 + 1)", false), + ]; + + for (sql, expected) in cases { + let stmt = ParserContext::create_with_dialect( + sql, + &GreptimeDbDialect {}, + ParseOptions::default(), + ) + .unwrap() + .remove(0); + match stmt { + Statement::Insert(insert) => { + assert_eq!(insert.has_non_values_query_source(), expected, "{sql}"); + } + _ => unreachable!(), + } + } + } }