fix: track INSERT SELECT in process manager (#8138)

* fix: track insert select in process list

Signed-off-by: QuakeWang <wangfuzheng0814@foxmail.com>

* fix: avoid generic process tracking future

Signed-off-by: QuakeWang <wangfuzheng0814@foxmail.com>

---------

Signed-off-by: QuakeWang <wangfuzheng0814@foxmail.com>
This commit is contained in:
QuakeWang
2026-05-21 15:16:22 +08:00
committed by GitHub
parent 59b738d7f2
commit ba679dddfa
3 changed files with 424 additions and 31 deletions

View File

@@ -33,6 +33,8 @@ use table::metadata::{TableId, TableInfoRef};
use crate::error::{CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu};
use crate::information_schema::InformationSchemaProvider;
#[cfg(any(test, feature = "testing"))]
use crate::process_manager::ProcessManagerRef;
use crate::system_schema::SystemSchemaProvider;
use crate::{CatalogManager, DeregisterTableRequest, RegisterSchemaRequest, RegisterTableRequest};
@@ -258,6 +260,33 @@ impl MemoryCatalogManager {
manager
}
#[cfg(any(test, feature = "testing"))]
pub fn register_process_list_table(&self, process_manager: ProcessManagerRef) {
let backend = Arc::new(MemoryKvBackend::new());
let manager = Arc::new(self.clone());
let information_schema_provider = InformationSchemaProvider::new(
DEFAULT_CATALOG_NAME.to_string(),
Arc::downgrade(&manager) as Weak<dyn CatalogManager>,
Arc::new(FlowMetadataManager::new(backend.clone())),
Some(process_manager),
backend,
);
let process_list = information_schema_provider
.table(crate::information_schema::PROCESS_LIST)
.unwrap();
self.catalogs
.write()
.unwrap()
.get_mut(DEFAULT_CATALOG_NAME)
.unwrap()
.get_mut(INFORMATION_SCHEMA_NAME)
.unwrap()
.insert(
crate::information_schema::PROCESS_LIST.to_string(),
process_list,
);
}
fn schema_exist_sync(&self, catalog: &str, schema: &str) -> Result<bool> {
Ok(self
.catalogs

View File

@@ -202,21 +202,25 @@ impl Instance {
let query_interceptor = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
let query_interceptor = query_interceptor.as_ref();
if stmt.is_readonly() {
let slow_query_timer = self
.slow_query_options
.enable
.then(|| self.event_recorder.clone())
.flatten()
.map(|event_recorder| {
SlowQueryTimer::new(
CatalogQueryStatement::Sql(stmt.clone()),
self.slow_query_options.threshold,
self.slow_query_options.sample_ratio,
self.slow_query_options.record_type,
event_recorder,
)
});
let is_readonly_stmt = stmt.is_readonly();
if should_track_statement_process(&stmt) {
let slow_query_timer = if is_readonly_stmt {
self.slow_query_options
.enable
.then(|| self.event_recorder.clone())
.flatten()
.map(|event_recorder| {
SlowQueryTimer::new(
CatalogQueryStatement::Sql(stmt.clone()),
self.slow_query_options.threshold,
self.slow_query_options.sample_ratio,
self.slow_query_options.record_type,
event_recorder,
)
})
} else {
None
};
let ticket = self.process_manager.register_query(
query_ctx.current_catalog().to_string(),
@@ -654,21 +658,25 @@ impl Instance {
.map(|s| s.to_string())
.unwrap_or_else(|| plan.display_indent().to_string());
let result = if is_readonly_plan(&plan) {
let slow_query_timer = self
.slow_query_options
.enable
.then(|| self.event_recorder.clone())
.flatten()
.map(|event_recorder| {
SlowQueryTimer::new(
CatalogQueryStatement::Plan(query.clone()),
self.slow_query_options.threshold,
self.slow_query_options.sample_ratio,
self.slow_query_options.record_type,
event_recorder,
)
});
let plan_is_readonly = is_readonly_plan(&plan);
let result = if should_track_plan_process(stmt.as_ref(), &plan) {
let slow_query_timer = if plan_is_readonly {
self.slow_query_options
.enable
.then(|| self.event_recorder.clone())
.flatten()
.map(|event_recorder| {
SlowQueryTimer::new(
CatalogQueryStatement::Plan(query.clone()),
self.slow_query_options.threshold,
self.slow_query_options.sample_ratio,
self.slow_query_options.record_type,
event_recorder,
)
})
} else {
None
};
let ticket = self.process_manager.register_query(
query_ctx.current_catalog().to_string(),
@@ -1183,6 +1191,16 @@ fn is_readonly_plan(plan: &LogicalPlan) -> bool {
!matches!(plan, LogicalPlan::Dml(_) | LogicalPlan::Ddl(_))
}
fn should_track_statement_process(stmt: &Statement) -> bool {
stmt.is_readonly()
|| matches!(stmt, Statement::Insert(insert) if insert.has_non_values_query_source())
}
fn should_track_plan_process(stmt: Option<&Statement>, plan: &LogicalPlan) -> bool {
is_readonly_plan(plan)
|| matches!(stmt, Some(Statement::Insert(insert)) if insert.has_non_values_query_source())
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
@@ -1191,13 +1209,232 @@ mod tests {
use std::thread;
use std::time::{Duration, Instant};
use api::v1::meta::{ProcedureDetailResponse, ReconcileRequest, ReconcileResponse};
use catalog::process_manager::ProcessManager;
use common_base::Plugins;
use common_meta::cache::LayeredCacheRegistryBuilder;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutor};
use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use common_meta::rpc::procedure::{
MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
};
use common_query::Output;
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_expr::dml::InsertOp;
use datafusion_expr::{LogicalPlanBuilder, LogicalTableSource};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema as GtSchema};
use query::query_engine::options::QueryOptions;
use session::context::QueryContext;
use session::context::{Channel, ConnInfo, QueryContext, QueryContextBuilder};
use sql::dialect::GreptimeDbDialect;
use strfmt::Format;
use table::metadata::{TableInfoBuilder, TableMetaBuilder};
use table::test_util::EmptyTable;
use tokio::sync::{mpsc, oneshot};
use super::*;
use crate::frontend::FrontendOptions;
use crate::instance::builder::FrontendBuilder;
fn parse_one_sql(sql: &str) -> Statement {
parse_stmt(sql, &GreptimeDbDialect {}).unwrap().remove(0)
}
fn test_query_ctx(process_id: u32) -> QueryContextRef {
Arc::new(
QueryContextBuilder::default()
.channel(Channel::Mysql)
.conn_info(ConnInfo::new(None, Channel::Mysql))
.process_id(process_id)
.build(),
)
}
struct BlockingInsertSelectInterceptor {
started_tx: mpsc::UnboundedSender<()>,
finish_rx: std::sync::Mutex<Option<oneshot::Receiver<()>>>,
}
impl BlockingInsertSelectInterceptor {
fn new(started_tx: mpsc::UnboundedSender<()>, finish_rx: oneshot::Receiver<()>) -> Self {
Self {
started_tx,
finish_rx: std::sync::Mutex::new(Some(finish_rx)),
}
}
}
impl SqlQueryInterceptor for BlockingInsertSelectInterceptor {
type Error = Error;
fn pre_execute(
&self,
statement: &Statement,
_plan: Option<&LogicalPlan>,
_query_ctx: QueryContextRef,
) -> Result<()> {
let Statement::Insert(insert) = statement else {
return Ok(());
};
if !insert.has_non_values_query_source() {
return Ok(());
}
let finish_rx = self.finish_rx.lock().unwrap().take().unwrap();
let _ = self.started_tx.send(());
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current()
.block_on(finish_rx)
.unwrap();
});
Ok(())
}
}
struct NoopProcedureExecutor;
#[async_trait::async_trait]
impl ProcedureExecutor for NoopProcedureExecutor {
async fn submit_ddl_task(
&self,
_ctx: &ExecutorContext,
_request: SubmitDdlTaskRequest,
) -> common_meta::error::Result<SubmitDdlTaskResponse> {
common_meta::error::UnsupportedSnafu {
operation: "submit_ddl_task",
}
.fail()
}
async fn migrate_region(
&self,
_ctx: &ExecutorContext,
_request: MigrateRegionRequest,
) -> common_meta::error::Result<MigrateRegionResponse> {
common_meta::error::UnsupportedSnafu {
operation: "migrate_region",
}
.fail()
}
async fn reconcile(
&self,
_ctx: &ExecutorContext,
_request: ReconcileRequest,
) -> common_meta::error::Result<ReconcileResponse> {
common_meta::error::UnsupportedSnafu {
operation: "reconcile",
}
.fail()
}
async fn query_procedure_state(
&self,
_ctx: &ExecutorContext,
_pid: &str,
) -> common_meta::error::Result<ProcedureStateResponse> {
common_meta::error::UnsupportedSnafu {
operation: "query_procedure_state",
}
.fail()
}
async fn list_procedures(
&self,
_ctx: &ExecutorContext,
) -> common_meta::error::Result<ProcedureDetailResponse> {
common_meta::error::UnsupportedSnafu {
operation: "list_procedures",
}
.fail()
}
}
fn test_cache_registry(
kv_backend: common_meta::kv_backend::KvBackendRef,
) -> common_meta::cache::LayeredCacheRegistryRef {
Arc::new(
cache::with_default_composite_cache_registry(
LayeredCacheRegistryBuilder::default()
.add_cache_registry(cache::build_fundamental_cache_registry(kv_backend)),
)
.unwrap()
.build(),
)
}
fn test_table(table_id: u32, table_name: &str) -> table::TableRef {
let schema = Arc::new(GtSchema::new(vec![
ColumnSchema::new("id", ConcreteDataType::int32_datatype(), false),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
]));
let table_meta = TableMetaBuilder::empty()
.schema(schema)
.primary_key_indices(vec![0])
.value_indices(vec![1])
.next_column_id(1024)
.build()
.unwrap();
let table_info = TableInfoBuilder::new(table_name, table_meta)
.table_id(table_id)
.build()
.unwrap();
EmptyTable::from_table_info(&table_info)
}
async fn test_instance_with_insert_select_interceptor(
interceptor: SqlQueryInterceptorRef<Error>,
) -> Instance {
let kv_backend = Arc::new(MemoryKvBackend::new());
let process_manager = Arc::new(ProcessManager::new("test-frontend".to_string(), None));
let catalog_manager =
catalog::memory::MemoryCatalogManager::new_with_table(test_table(1024, "source"));
catalog_manager
.register_table_sync(catalog::RegisterTableRequest {
catalog: "greptime".to_string(),
schema: "public".to_string(),
table_name: "target".to_string(),
table_id: 1025,
table: test_table(1025, "target"),
})
.unwrap();
catalog_manager.register_process_list_table(process_manager.clone());
let cache_registry = test_cache_registry(kv_backend.clone());
let plugins = Plugins::new();
plugins.insert::<SqlQueryInterceptorRef<Error>>(interceptor);
FrontendBuilder::new(
FrontendOptions::default(),
kv_backend,
cache_registry,
catalog_manager,
Arc::new(client::client_manager::NodeClients::default()),
Arc::new(NoopProcedureExecutor),
process_manager,
)
.with_plugin(plugins)
.try_build()
.await
.unwrap()
}
async fn execute_one_sql(
instance: &Instance,
sql: &str,
query_ctx: QueryContextRef,
) -> Result<Output> {
let mut results = instance.do_query_inner(sql, query_ctx).await;
assert_eq!(1, results.len());
results.remove(0)
}
#[test]
fn test_fast_legacy_check_deadlock_prevention() {
@@ -1315,6 +1552,87 @@ mod tests {
);
}
#[test]
fn test_should_track_statement_process() {
assert!(should_track_statement_process(&parse_one_sql(
"SELECT * FROM demo"
)));
assert!(should_track_statement_process(&parse_one_sql(
"INSERT INTO demo SELECT * FROM source"
)));
assert!(!should_track_statement_process(&parse_one_sql(
"INSERT INTO demo VALUES (1)"
)));
assert!(!should_track_statement_process(&parse_one_sql(
"INSERT INTO demo VALUES (now())"
)));
}
#[test]
fn test_should_track_plan_process() {
let select_stmt = parse_one_sql("SELECT * FROM demo");
let insert_select_stmt = parse_one_sql("INSERT INTO demo SELECT * FROM source");
let insert_values_stmt = parse_one_sql("INSERT INTO demo VALUES (now())");
let empty_plan = LogicalPlanBuilder::empty(false).build().unwrap();
assert!(should_track_plan_process(Some(&select_stmt), &empty_plan));
assert!(should_track_plan_process(
Some(&insert_select_stmt),
&insert_dml_plan()
));
assert!(!should_track_plan_process(
Some(&insert_values_stmt),
&insert_dml_plan()
));
assert!(!should_track_plan_process(None, &insert_dml_plan()));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_insert_select_is_visible_in_show_processlist() {
let insert_sql = "INSERT INTO target SELECT * FROM source";
let (started_tx, mut started_rx) = mpsc::unbounded_channel();
let (finish_tx, finish_rx) = oneshot::channel();
let interceptor = Arc::new(BlockingInsertSelectInterceptor::new(started_tx, finish_rx));
let instance = Arc::new(test_instance_with_insert_select_interceptor(interceptor).await);
let insert_task = tokio::spawn({
let instance = instance.clone();
async move { execute_one_sql(&instance, insert_sql, test_query_ctx(4242)).await }
});
tokio::time::timeout(Duration::from_secs(5), started_rx.recv())
.await
.unwrap()
.unwrap();
let output = execute_one_sql(&instance, "SHOW PROCESSLIST", test_query_ctx(43))
.await
.unwrap();
let process_list = output.data.pretty_print().await;
assert!(
process_list.contains(insert_sql),
"process list did not contain running insert:\n{process_list}"
);
finish_tx.send(()).unwrap();
insert_task.await.unwrap().unwrap();
}
fn insert_dml_plan() -> LogicalPlan {
let schema = SchemaRef::new(Schema::new(vec![Field::new(
"value",
DataType::Int64,
true,
)]));
let target = Arc::new(LogicalTableSource::new(schema));
let input = LogicalPlanBuilder::empty(false).build().unwrap();
LogicalPlanBuilder::insert_into(input, "demo", target, InsertOp::Append)
.unwrap()
.build()
.unwrap()
}
#[test]
fn test_exec_validation() {
let query_ctx = QueryContext::arc();

View File

@@ -115,6 +115,17 @@ impl Insert {
}
}
/// Returns true when the insert source is a query rather than `VALUES`.
pub fn has_non_values_query_source(&self) -> bool {
match &self.inner {
Statement::Insert(SpInsert {
source: Some(box query),
..
}) => !matches!(&*query.body, SetExpr::Values(_)),
_ => false,
}
}
pub fn query_body(&self) -> Result<Option<GtQuery>> {
Ok(match &self.inner {
Statement::Insert(SpInsert {
@@ -321,6 +332,7 @@ mod tests {
match stmt {
Statement::Insert(insert) => {
let q = insert.query_body().unwrap().unwrap();
assert!(insert.has_non_values_query_source());
assert!(matches!(
q.inner,
Query {
@@ -332,4 +344,38 @@ mod tests {
_ => unreachable!(),
}
}
#[test]
fn test_has_non_values_query_source() {
let cases = [
("INSERT INTO my_table SELECT * FROM other_table", true),
(
"INSERT INTO my_table WITH cte AS (SELECT * FROM other_table) SELECT * FROM cte",
true,
),
(
"INSERT INTO my_table SELECT * FROM t1 UNION ALL SELECT * FROM t2",
true,
),
("INSERT INTO my_table VALUES(1)", false),
("INSERT INTO my_table VALUES(now())", false),
("INSERT INTO my_table VALUES(1 + 1)", false),
];
for (sql, expected) in cases {
let stmt = ParserContext::create_with_dialect(
sql,
&GreptimeDbDialect {},
ParseOptions::default(),
)
.unwrap()
.remove(0);
match stmt {
Statement::Insert(insert) => {
assert_eq!(insert.has_non_values_query_source(), expected, "{sql}");
}
_ => unreachable!(),
}
}
}
}