Skip to main content

frontend/
instance.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod builder;
16mod dashboard;
17mod grpc;
18mod influxdb;
19mod jaeger;
20mod log_handler;
21mod logs;
22mod opentsdb;
23mod otlp;
24pub mod prom_store;
25mod promql;
26mod region_query;
27pub mod standalone;
28
29use std::pin::Pin;
30use std::sync::atomic::AtomicBool;
31use std::sync::{Arc, atomic};
32use std::time::{Duration, SystemTime};
33
34use async_stream::stream;
35use async_trait::async_trait;
36use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
37use catalog::CatalogManagerRef;
38use catalog::process_manager::{
39    ProcessManagerRef, QueryStatement as CatalogQueryStatement, SlowQueryTimer,
40};
41use client::OutputData;
42use common_base::Plugins;
43use common_base::cancellation::CancellableFuture;
44use common_error::ext::{BoxedError, ErrorExt};
45use common_event_recorder::EventRecorderRef;
46use common_meta::cache_invalidator::CacheInvalidatorRef;
47use common_meta::key::TableMetadataManagerRef;
48use common_meta::key::table_name::TableNameKey;
49use common_meta::node_manager::NodeManagerRef;
50use common_meta::procedure_executor::ProcedureExecutorRef;
51use common_query::Output;
52use common_recordbatch::RecordBatchStreamWrapper;
53use common_recordbatch::error::StreamTimeoutSnafu;
54use common_telemetry::logging::SlowQueryOptions;
55use common_telemetry::{debug, error, tracing};
56use dashmap::DashMap;
57use datafusion_expr::LogicalPlan;
58use futures::{Stream, StreamExt};
59use lazy_static::lazy_static;
60use operator::delete::DeleterRef;
61use operator::insert::InserterRef;
62use operator::statement::{StatementExecutor, StatementExecutorRef};
63use partition::manager::PartitionRuleManagerRef;
64use pipeline::pipeline_operator::PipelineOperator;
65use prometheus::HistogramTimer;
66use promql_parser::label::Matcher;
67use query::QueryEngineRef;
68use query::metrics::OnDone;
69use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
70use query::query_engine::DescribeResult;
71use query::query_engine::options::{QueryOptions, validate_catalog_and_schema};
72use servers::error::{
73    self as server_error, AuthSnafu, CommonMetaSnafu, ExecuteQuerySnafu,
74    OtlpMetricModeIncompatibleSnafu, ParsePromQLSnafu, UnexpectedResultSnafu,
75};
76use servers::interceptor::{
77    PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef,
78};
79use servers::otlp::metrics::legacy_normalize_otlp_name;
80use servers::prometheus_handler::PrometheusHandler;
81use servers::query_handler::sql::SqlQueryHandler;
82use session::context::{Channel, QueryContextRef};
83use session::table_name::table_idents_to_full_name;
84use snafu::prelude::*;
85use sql::ast::ObjectNamePartExt;
86use sql::dialect::Dialect;
87use sql::parser::{ParseOptions, ParserContext};
88use sql::statements::comment::CommentObject;
89use sql::statements::copy::{CopyDatabase, CopyTable};
90use sql::statements::statement::Statement;
91use sql::statements::tql::Tql;
92use sqlparser::ast::{AnalyzeFormat, ObjectName};
93pub use standalone::StandaloneDatanodeManager;
94use table::requests::{OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM};
95use tracing::Span;
96
97use crate::error::{
98    self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, InvalidSqlSnafu,
99    ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu,
100    StatementTimeoutSnafu, TableOperationSnafu,
101};
102use crate::service_config::InfluxdbMergeMode;
103use crate::stream_wrapper::CancellableStreamWrapper;
104
105lazy_static! {
106    static ref OTLP_LEGACY_DEFAULT_VALUE: String = "legacy".to_string();
107}
108
109/// The frontend instance contains necessary components, and implements many
110/// traits, like [`servers::query_handler::grpc::GrpcQueryHandler`],
111/// [`servers::query_handler::sql::SqlQueryHandler`], etc.
112#[derive(Clone)]
113pub struct Instance {
114    frontend_peer_addr: String,
115    catalog_manager: CatalogManagerRef,
116    pipeline_operator: Arc<PipelineOperator>,
117    statement_executor: Arc<StatementExecutor>,
118    query_engine: QueryEngineRef,
119    plugins: Plugins,
120    inserter: InserterRef,
121    deleter: DeleterRef,
122    table_metadata_manager: TableMetadataManagerRef,
123    event_recorder: Option<EventRecorderRef>,
124    process_manager: ProcessManagerRef,
125    slow_query_options: SlowQueryOptions,
126    influxdb_default_merge_mode: InfluxdbMergeMode,
127    suspend: Arc<AtomicBool>,
128
129    // cache for otlp metrics
130    // first layer key: db-string
131    // key: direct input metric name
132    // value: if runs in legacy mode
133    otlp_metrics_table_legacy_cache: DashMap<String, DashMap<String, bool>>,
134}
135
136impl Instance {
137    pub fn frontend_peer_addr(&self) -> &str {
138        &self.frontend_peer_addr
139    }
140
141    pub fn catalog_manager(&self) -> &CatalogManagerRef {
142        &self.catalog_manager
143    }
144
145    pub fn query_engine(&self) -> &QueryEngineRef {
146        &self.query_engine
147    }
148
149    pub fn plugins(&self) -> &Plugins {
150        &self.plugins
151    }
152
153    pub fn statement_executor(&self) -> &StatementExecutorRef {
154        &self.statement_executor
155    }
156
157    pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
158        &self.table_metadata_manager
159    }
160
161    pub fn inserter(&self) -> &InserterRef {
162        &self.inserter
163    }
164
165    pub fn process_manager(&self) -> &ProcessManagerRef {
166        &self.process_manager
167    }
168
169    pub fn node_manager(&self) -> &NodeManagerRef {
170        self.inserter.node_manager()
171    }
172
173    pub fn partition_manager(&self) -> &PartitionRuleManagerRef {
174        self.inserter.partition_manager()
175    }
176
177    pub fn cache_invalidator(&self) -> &CacheInvalidatorRef {
178        self.statement_executor.cache_invalidator()
179    }
180
181    pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
182        self.statement_executor.procedure_executor()
183    }
184
185    pub fn suspend_state(&self) -> Arc<AtomicBool> {
186        self.suspend.clone()
187    }
188
189    pub(crate) fn is_suspended(&self) -> bool {
190        self.suspend.load(atomic::Ordering::Relaxed)
191    }
192}
193
194fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result<Vec<Statement>> {
195    ParserContext::create_with_dialect(sql, dialect, ParseOptions::default()).context(ParseSqlSnafu)
196}
197
198fn validate_analyze_stream_statement(stmt: &mut Statement) -> Result<()> {
199    let Statement::Explain(explain) = stmt else {
200        return InvalidSqlSnafu {
201            err_msg: "only EXPLAIN ANALYZE VERBOSE statement is supported",
202        }
203        .fail();
204    };
205    ensure!(
206        explain.analyze && explain.verbose,
207        InvalidSqlSnafu {
208            err_msg: "statement must be EXPLAIN ANALYZE VERBOSE"
209        }
210    );
211    match explain.format {
212        None | Some(AnalyzeFormat::JSON) => {
213            // Keep explicit FORMAT JSON accepted, but pass JSON through
214            // QueryContext.explain_format instead of the statement to avoid the
215            // planner's current `EXPLAIN VERBOSE with FORMAT` limitation.
216            explain.format = None;
217            Ok(())
218        }
219        Some(_) => InvalidSqlSnafu {
220            err_msg: "only FORMAT JSON is supported for analyze stream",
221        }
222        .fail(),
223    }
224}
225
226impl Instance {
227    fn statement_slow_query_timer(
228        &self,
229        stmt: &Statement,
230        schema_name: String,
231    ) -> Option<SlowQueryTimer> {
232        if !stmt.is_readonly() || !self.slow_query_options.enable {
233            return None;
234        }
235
236        self.event_recorder.clone().map(|event_recorder| {
237            SlowQueryTimer::new(
238                CatalogQueryStatement::Sql(stmt.clone()),
239                schema_name,
240                self.slow_query_options.threshold,
241                self.slow_query_options.sample_ratio,
242                self.slow_query_options.record_type,
243                event_recorder,
244            )
245        })
246    }
247
248    async fn query_statement(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
249        check_permission(self.plugins.clone(), &stmt, &query_ctx)?;
250
251        let query_interceptor = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
252        let query_interceptor = query_interceptor.as_ref();
253
254        if should_track_statement_process(&stmt) {
255            let catalog_name = query_ctx.current_catalog().to_string();
256            let schema_name = query_ctx.current_schema();
257            let slow_query_timer = self.statement_slow_query_timer(&stmt, schema_name.clone());
258
259            let ticket = self.process_manager.register_query(
260                catalog_name,
261                vec![schema_name],
262                stmt.to_string(),
263                query_ctx.conn_info().to_string(),
264                Some(query_ctx.process_id()),
265                slow_query_timer,
266            );
267
268            let query_fut = self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor);
269
270            CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
271                .await
272                .map_err(|_| error::CancelledSnafu.build())?
273                .map(|output| {
274                    let Output { meta, data } = output;
275
276                    let data = match data {
277                        OutputData::Stream(stream) => OutputData::Stream(Box::pin(
278                            CancellableStreamWrapper::new(stream, ticket),
279                        )),
280                        other => other,
281                    };
282                    Output { data, meta }
283                })
284        } else {
285            self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor)
286                .await
287        }
288    }
289
290    async fn exec_statement_with_timeout(
291        &self,
292        stmt: Statement,
293        query_ctx: QueryContextRef,
294        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
295    ) -> Result<Output> {
296        let timeout = derive_timeout(&stmt, &query_ctx);
297        match timeout {
298            Some(timeout) => {
299                let start = tokio::time::Instant::now();
300                let output = tokio::time::timeout(
301                    timeout,
302                    self.exec_statement(stmt, query_ctx, query_interceptor),
303                )
304                .await
305                .map_err(|_| StatementTimeoutSnafu.build())??;
306                // compute remaining timeout
307                let remaining_timeout = timeout.checked_sub(start.elapsed()).unwrap_or_default();
308                attach_timeout(output, remaining_timeout)
309            }
310            None => {
311                self.exec_statement(stmt, query_ctx, query_interceptor)
312                    .await
313            }
314        }
315    }
316
317    async fn exec_statement(
318        &self,
319        stmt: Statement,
320        query_ctx: QueryContextRef,
321        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
322    ) -> Result<Output> {
323        match stmt {
324            Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
325                // TODO: remove this when format is supported in datafusion
326                if let Statement::Explain(explain) = &stmt
327                    && let Some(format) = explain.format()
328                {
329                    query_ctx.set_explain_format(format.to_string());
330                }
331
332                self.plan_and_exec_sql(stmt, &query_ctx, query_interceptor)
333                    .await
334            }
335            Statement::Tql(tql) => {
336                self.plan_and_exec_tql(&query_ctx, query_interceptor, tql)
337                    .await
338            }
339            _ => {
340                query_interceptor.pre_execute(Some(&stmt), None, query_ctx.clone())?;
341                self.statement_executor
342                    .execute_sql(stmt, query_ctx)
343                    .await
344                    .context(TableOperationSnafu)
345            }
346        }
347    }
348
349    async fn plan_and_exec_sql(
350        &self,
351        stmt: Statement,
352        query_ctx: &QueryContextRef,
353        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
354    ) -> Result<Output> {
355        let stmt = QueryStatement::Sql(stmt);
356        let plan = self
357            .statement_executor
358            .plan(&stmt, query_ctx.clone())
359            .await?;
360        let QueryStatement::Sql(stmt) = stmt else {
361            unreachable!()
362        };
363        query_interceptor.pre_execute(Some(&stmt), Some(&plan), query_ctx.clone())?;
364
365        self.statement_executor
366            .exec_plan(plan, query_ctx.clone())
367            .await
368            .context(TableOperationSnafu)
369    }
370
371    async fn plan_and_exec_tql(
372        &self,
373        query_ctx: &QueryContextRef,
374        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
375        tql: Tql,
376    ) -> Result<Output> {
377        let plan = self
378            .statement_executor
379            .plan_tql(tql.clone(), query_ctx)
380            .await?;
381        query_interceptor.pre_execute(
382            Some(&Statement::Tql(tql)),
383            Some(&plan),
384            query_ctx.clone(),
385        )?;
386        self.statement_executor
387            .exec_plan(plan, query_ctx.clone())
388            .await
389            .context(TableOperationSnafu)
390    }
391
392    async fn check_otlp_legacy(
393        &self,
394        names: &[&String],
395        ctx: QueryContextRef,
396    ) -> server_error::Result<bool> {
397        let db_string = ctx.get_db_string();
398        // fast cache check
399        let cache = self
400            .otlp_metrics_table_legacy_cache
401            .entry(db_string.clone())
402            .or_default();
403        if let Some(flag) = fast_legacy_check(&cache, names)? {
404            return Ok(flag);
405        }
406        // release cache reference to avoid lock contention
407        drop(cache);
408
409        let catalog = ctx.current_catalog();
410        let schema = ctx.current_schema();
411
412        // query legacy table names
413        let normalized_names = names
414            .iter()
415            .map(|n| legacy_normalize_otlp_name(n))
416            .collect::<Vec<_>>();
417        let table_names = normalized_names
418            .iter()
419            .map(|n| TableNameKey::new(catalog, &schema, n))
420            .collect::<Vec<_>>();
421        let table_values = self
422            .table_metadata_manager()
423            .table_name_manager()
424            .batch_get(table_names)
425            .await
426            .context(CommonMetaSnafu)?;
427        let table_ids = table_values
428            .into_iter()
429            .filter_map(|v| v.map(|vi| vi.table_id()))
430            .collect::<Vec<_>>();
431
432        // means no existing table is found, use new mode
433        if table_ids.is_empty() {
434            let cache = self
435                .otlp_metrics_table_legacy_cache
436                .entry(db_string)
437                .or_default();
438            names.iter().for_each(|name| {
439                cache.insert((*name).clone(), false);
440            });
441            return Ok(false);
442        }
443
444        // has existing table, check table options
445        let table_infos = self
446            .table_metadata_manager()
447            .table_info_manager()
448            .batch_get(&table_ids)
449            .await
450            .context(CommonMetaSnafu)?;
451        let options = table_infos
452            .values()
453            .map(|info| {
454                info.table_info
455                    .meta
456                    .options
457                    .extra_options
458                    .get(OTLP_METRIC_COMPAT_KEY)
459                    .unwrap_or(&OTLP_LEGACY_DEFAULT_VALUE)
460            })
461            .collect::<Vec<_>>();
462        let cache = self
463            .otlp_metrics_table_legacy_cache
464            .entry(db_string)
465            .or_default();
466        if !options.is_empty() {
467            // check value consistency
468            let has_prom = options.iter().any(|opt| *opt == OTLP_METRIC_COMPAT_PROM);
469            let has_legacy = options
470                .iter()
471                .any(|opt| *opt == OTLP_LEGACY_DEFAULT_VALUE.as_str());
472            ensure!(!(has_prom && has_legacy), OtlpMetricModeIncompatibleSnafu);
473            let flag = has_legacy;
474            names.iter().for_each(|name| {
475                cache.insert((*name).clone(), flag);
476            });
477            Ok(flag)
478        } else {
479            // no table info, use new mode
480            names.iter().for_each(|name| {
481                cache.insert((*name).clone(), false);
482            });
483            Ok(false)
484        }
485    }
486}
487
488fn fast_legacy_check(
489    cache: &DashMap<String, bool>,
490    names: &[&String],
491) -> server_error::Result<Option<bool>> {
492    let hit_cache = names
493        .iter()
494        .filter_map(|name| cache.get(*name))
495        .collect::<Vec<_>>();
496    if !hit_cache.is_empty() {
497        let hit_legacy = hit_cache.iter().any(|en| *en.value());
498        let hit_prom = hit_cache.iter().any(|en| !*en.value());
499
500        // hit but have true and false, means both legacy and new mode are used
501        // we cannot handle this case, so return error
502        // add doc links in err msg later
503        ensure!(!(hit_legacy && hit_prom), OtlpMetricModeIncompatibleSnafu);
504
505        let flag = hit_legacy;
506        // drop hit_cache to release references before inserting to avoid deadlock
507        drop(hit_cache);
508
509        // set cache for all names
510        names.iter().for_each(|name| {
511            if !cache.contains_key(*name) {
512                cache.insert((*name).clone(), flag);
513            }
514        });
515        Ok(Some(flag))
516    } else {
517        Ok(None)
518    }
519}
520
521/// If the relevant variables are set, the timeout is enforced for all PostgreSQL statements.
522/// For MySQL, it applies only to read-only statements.
523fn derive_timeout(stmt: &Statement, query_ctx: &QueryContextRef) -> Option<Duration> {
524    let query_timeout = query_ctx.query_timeout()?;
525    if query_timeout.is_zero() {
526        return None;
527    }
528    match query_ctx.channel() {
529        Channel::Mysql if stmt.is_readonly() => Some(query_timeout),
530        Channel::Postgres => Some(query_timeout),
531        _ => None,
532    }
533}
534
535/// Derives timeout for plan execution.
536fn derive_timeout_for_plan(plan: &LogicalPlan, query_ctx: &QueryContextRef) -> Option<Duration> {
537    let query_timeout = query_ctx.query_timeout()?;
538    if query_timeout.is_zero() {
539        return None;
540    }
541    match query_ctx.channel() {
542        Channel::Mysql if is_readonly_plan(plan) => Some(query_timeout),
543        Channel::Postgres => Some(query_timeout),
544        _ => None,
545    }
546}
547
548fn attach_timeout(output: Output, mut timeout: Duration) -> Result<Output> {
549    if timeout.is_zero() {
550        return StatementTimeoutSnafu.fail();
551    }
552
553    let output = match output.data {
554        OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
555        OutputData::Stream(mut stream) => {
556            let schema = stream.schema();
557            let s = Box::pin(stream! {
558                let mut start = tokio::time::Instant::now();
559                while let Some(item) = tokio::time::timeout(timeout, stream.next()).await.map_err(|_| StreamTimeoutSnafu.build())? {
560                    yield item;
561
562                    let now = tokio::time::Instant::now();
563                    timeout = timeout.checked_sub(now - start).unwrap_or(Duration::ZERO);
564                    start = now;
565                    // tokio::time::timeout may not return an error immediately when timeout is 0.
566                    if timeout.is_zero() {
567                        StreamTimeoutSnafu.fail()?;
568                    }
569                }
570            }) as Pin<Box<dyn Stream<Item = _> + Send>>;
571            let stream = RecordBatchStreamWrapper {
572                schema,
573                stream: s,
574                output_ordering: None,
575                metrics: Default::default(),
576                span: Span::current(),
577            };
578            Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
579        }
580    };
581
582    Ok(output)
583}
584
585impl Instance {
586    #[tracing::instrument(skip_all, name = "SqlQueryHandler::do_analyze_stream_query")]
587    async fn do_analyze_stream_query_inner(
588        &self,
589        query: &str,
590        query_ctx: QueryContextRef,
591    ) -> Result<Output> {
592        ensure!(!self.is_suspended(), error::SuspendedSnafu);
593
594        let query_interceptor_opt = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
595        let query_interceptor = query_interceptor_opt.as_ref();
596        let query = query_interceptor.pre_parsing(query, query_ctx.clone())?;
597        let mut stmts = parse_stmt(query.as_ref(), query_ctx.sql_dialect())
598            .and_then(|stmts| query_interceptor.post_parsing(stmts, query_ctx.clone()))?;
599
600        ensure!(
601            stmts.len() == 1,
602            InvalidSqlSnafu {
603                err_msg: "only single EXPLAIN ANALYZE VERBOSE statement is supported"
604            }
605        );
606        let mut stmt = stmts.remove(0);
607        validate_analyze_stream_statement(&mut stmt)?;
608        query_ctx.set_explain_format(AnalyzeFormat::JSON.to_string());
609
610        let checker_ref = self.plugins.get::<PermissionCheckerRef>();
611        checker_ref
612            .as_ref()
613            .check_permission(query_ctx.current_user(), PermissionReq::SqlStatement(&stmt))
614            .context(PermissionSnafu)?;
615        check_permission(self.plugins.clone(), &stmt, &query_ctx)?;
616        let catalog_name = query_ctx.current_catalog().to_string();
617        let schema_name = query_ctx.current_schema();
618        let slow_query_timer = self.statement_slow_query_timer(&stmt, schema_name.clone());
619        let ticket = self.process_manager.register_query(
620            catalog_name,
621            vec![schema_name],
622            stmt.to_string(),
623            query_ctx.conn_info().to_string(),
624            Some(query_ctx.process_id()),
625            slow_query_timer,
626        );
627        let query_fut =
628            self.exec_statement_with_timeout(stmt, query_ctx.clone(), query_interceptor);
629        let output = CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
630            .await
631            .map_err(|_| error::CancelledSnafu.build())??;
632        let Output { meta, data } = output;
633        let data = match data {
634            OutputData::Stream(stream) => OutputData::Stream(Box::pin(
635                CancellableStreamWrapper::new_cancel_on_drop(stream, ticket),
636            )),
637            other => other,
638        };
639        query_interceptor.post_execute(Output { data, meta }, query_ctx)
640    }
641
642    #[tracing::instrument(skip_all, name = "SqlQueryHandler::do_query")]
643    async fn do_query_inner(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
644        if self.is_suspended() {
645            return vec![error::SuspendedSnafu {}.fail()];
646        }
647
648        let query_interceptor_opt = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
649        let query_interceptor = query_interceptor_opt.as_ref();
650        let query = match query_interceptor.pre_parsing(query, query_ctx.clone()) {
651            Ok(q) => q,
652            Err(e) => return vec![Err(e)],
653        };
654
655        let checker_ref = self.plugins.get::<PermissionCheckerRef>();
656        let checker = checker_ref.as_ref();
657
658        match parse_stmt(query.as_ref(), query_ctx.sql_dialect())
659            .and_then(|stmts| query_interceptor.post_parsing(stmts, query_ctx.clone()))
660        {
661            Ok(stmts) => {
662                if stmts.is_empty() {
663                    return vec![
664                        InvalidSqlSnafu {
665                            err_msg: "empty statements",
666                        }
667                        .fail(),
668                    ];
669                }
670
671                let mut results = Vec::with_capacity(stmts.len());
672                for stmt in stmts {
673                    if let Err(e) = checker
674                        .check_permission(
675                            query_ctx.current_user(),
676                            PermissionReq::SqlStatement(&stmt),
677                        )
678                        .context(PermissionSnafu)
679                    {
680                        results.push(Err(e));
681                        break;
682                    }
683
684                    match self.query_statement(stmt.clone(), query_ctx.clone()).await {
685                        Ok(output) => {
686                            let output_result =
687                                query_interceptor.post_execute(output, query_ctx.clone());
688                            results.push(output_result);
689                        }
690                        Err(e) => {
691                            if e.status_code().should_log_error() {
692                                error!(e; "Failed to execute query: {stmt}");
693                            } else {
694                                debug!("Failed to execute query: {stmt}, {e}");
695                            }
696                            results.push(Err(e));
697                            break;
698                        }
699                    }
700                }
701                results
702            }
703            Err(e) => {
704                vec![Err(e)]
705            }
706        }
707    }
708
709    async fn exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
710        self.query_engine
711            .execute(plan, query_ctx)
712            .await
713            .context(ExecLogicalPlanSnafu)
714    }
715
716    async fn exec_plan_with_timeout(
717        &self,
718        plan: LogicalPlan,
719        query_ctx: QueryContextRef,
720    ) -> Result<Output> {
721        let timeout = derive_timeout_for_plan(&plan, &query_ctx);
722        match timeout {
723            Some(timeout) => {
724                let start = tokio::time::Instant::now();
725                let output = tokio::time::timeout(timeout, self.exec_plan(plan, query_ctx))
726                    .await
727                    .map_err(|_| StatementTimeoutSnafu.build())??;
728                let remaining_timeout = timeout.checked_sub(start.elapsed()).unwrap_or_default();
729                attach_timeout(output, remaining_timeout)
730            }
731            None => self.exec_plan(plan, query_ctx).await,
732        }
733    }
734
735    async fn do_exec_plan_inner(
736        &self,
737        plan: LogicalPlan,
738        stmt: Option<Statement>,
739        query_ctx: QueryContextRef,
740    ) -> Result<Output> {
741        ensure!(!self.is_suspended(), error::SuspendedSnafu);
742
743        let query_interceptor_opt = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
744        let query_interceptor = query_interceptor_opt.as_ref();
745
746        query_interceptor.pre_execute(stmt.as_ref(), Some(&plan), query_ctx.clone())?;
747
748        let query = stmt
749            .as_ref()
750            .map(|s| s.to_string())
751            .unwrap_or_else(|| plan.display_indent().to_string());
752
753        let plan_is_readonly = is_readonly_plan(&plan);
754        let result = if should_track_plan_process(stmt.as_ref(), &plan) {
755            let catalog_name = query_ctx.current_catalog().to_string();
756            let schema_name = query_ctx.current_schema();
757            let slow_query_timer = if plan_is_readonly {
758                self.slow_query_options
759                    .enable
760                    .then(|| self.event_recorder.clone())
761                    .flatten()
762                    .map(|event_recorder| {
763                        SlowQueryTimer::new(
764                            CatalogQueryStatement::Plan(query.clone()),
765                            schema_name.clone(),
766                            self.slow_query_options.threshold,
767                            self.slow_query_options.sample_ratio,
768                            self.slow_query_options.record_type,
769                            event_recorder,
770                        )
771                    })
772            } else {
773                None
774            };
775
776            let ticket = self.process_manager.register_query(
777                catalog_name,
778                vec![schema_name],
779                query,
780                query_ctx.conn_info().to_string(),
781                Some(query_ctx.process_id()),
782                slow_query_timer,
783            );
784
785            let query_fut = self.exec_plan_with_timeout(plan, query_ctx.clone());
786
787            CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
788                .await
789                .map_err(|_| error::CancelledSnafu.build())?
790                .map(|output| {
791                    let Output { meta, data } = output;
792
793                    let data = match data {
794                        OutputData::Stream(stream) => OutputData::Stream(Box::pin(
795                            CancellableStreamWrapper::new(stream, ticket),
796                        )),
797                        other => other,
798                    };
799                    Output { data, meta }
800                })
801        } else {
802            self.exec_plan_with_timeout(plan, query_ctx.clone()).await
803        };
804
805        result.and_then(|output| query_interceptor.post_execute(output, query_ctx))
806    }
807
808    #[tracing::instrument(skip_all, name = "SqlQueryHandler::do_promql_query")]
809    async fn do_promql_query_inner(
810        &self,
811        query: &PromQuery,
812        query_ctx: QueryContextRef,
813    ) -> Vec<Result<Output>> {
814        if self.is_suspended() {
815            return vec![error::SuspendedSnafu {}.fail()];
816        }
817
818        // check will be done in prometheus handler's do_query
819        let result = PrometheusHandler::do_query(self, query, query_ctx)
820            .await
821            .with_context(|_| ExecutePromqlSnafu {
822                query: format!("{query:?}"),
823            });
824        vec![result]
825    }
826
827    async fn do_describe_inner(
828        &self,
829        stmt: Statement,
830        query_ctx: QueryContextRef,
831    ) -> Result<Option<DescribeResult>> {
832        ensure!(!self.is_suspended(), error::SuspendedSnafu);
833
834        // EXPLAIN / EXPLAIN ANALYZE wrap an inner statement; describe them when the
835        // wrapped statement is something we already plan (so that bind parameters
836        // in the inner query get their types inferred). See #8029.
837        let is_inner_plannable = |s: &Statement| {
838            matches!(
839                s,
840                Statement::Insert(_) | Statement::Query(_) | Statement::Delete(_)
841            )
842        };
843        let plannable = is_inner_plannable(&stmt)
844            || matches!(&stmt, Statement::Explain(explain) if is_inner_plannable(explain.statement.as_ref()));
845
846        if plannable {
847            self.plugins
848                .get::<PermissionCheckerRef>()
849                .as_ref()
850                .check_permission(query_ctx.current_user(), PermissionReq::SqlStatement(&stmt))
851                .context(PermissionSnafu)?;
852
853            let plan = self
854                .query_engine
855                .planner()
856                .plan(&QueryStatement::Sql(stmt), query_ctx.clone())
857                .await
858                .context(PlanStatementSnafu)?;
859            self.query_engine
860                .describe(plan, query_ctx)
861                .await
862                .map(Some)
863                .context(error::DescribeStatementSnafu)
864        } else {
865            Ok(None)
866        }
867    }
868
869    async fn is_valid_schema_inner(&self, catalog: &str, schema: &str) -> Result<bool> {
870        self.catalog_manager
871            .schema_exists(catalog, schema, None)
872            .await
873            .context(error::CatalogSnafu)
874    }
875}
876
877#[async_trait]
878impl SqlQueryHandler for Instance {
879    async fn do_query(
880        &self,
881        query: &str,
882        query_ctx: QueryContextRef,
883    ) -> Vec<server_error::Result<Output>> {
884        self.do_query_inner(query, query_ctx)
885            .await
886            .into_iter()
887            .map(|result| result.map_err(BoxedError::new).context(ExecuteQuerySnafu))
888            .collect()
889    }
890
891    async fn do_analyze_stream_query(
892        &self,
893        query: &str,
894        query_ctx: QueryContextRef,
895    ) -> server_error::Result<Output> {
896        self.do_analyze_stream_query_inner(query, query_ctx)
897            .await
898            .map_err(BoxedError::new)
899            .context(ExecuteQuerySnafu)
900    }
901
902    async fn do_exec_plan(
903        &self,
904        plan: LogicalPlan,
905        stmt: Option<Statement>,
906        query_ctx: QueryContextRef,
907    ) -> server_error::Result<Output> {
908        self.do_exec_plan_inner(plan, stmt, query_ctx)
909            .await
910            .map_err(BoxedError::new)
911            .context(server_error::ExecutePlanSnafu)
912    }
913
914    async fn do_promql_query(
915        &self,
916        query: &PromQuery,
917        query_ctx: QueryContextRef,
918    ) -> Vec<server_error::Result<Output>> {
919        self.do_promql_query_inner(query, query_ctx)
920            .await
921            .into_iter()
922            .map(|result| result.map_err(BoxedError::new).context(ExecuteQuerySnafu))
923            .collect()
924    }
925
926    async fn do_describe(
927        &self,
928        stmt: Statement,
929        query_ctx: QueryContextRef,
930    ) -> server_error::Result<Option<DescribeResult>> {
931        self.do_describe_inner(stmt, query_ctx)
932            .await
933            .map_err(BoxedError::new)
934            .context(server_error::DescribeStatementSnafu)
935    }
936
937    async fn is_valid_schema(&self, catalog: &str, schema: &str) -> server_error::Result<bool> {
938        self.is_valid_schema_inner(catalog, schema)
939            .await
940            .map_err(BoxedError::new)
941            .context(server_error::CheckDatabaseValiditySnafu)
942    }
943}
944
945/// Attaches a timer to the output and observes it once the output is exhausted.
946pub fn attach_timer(output: Output, timer: HistogramTimer) -> Output {
947    match output.data {
948        OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
949        OutputData::Stream(stream) => {
950            let stream = OnDone::new(stream, move || {
951                timer.observe_duration();
952            });
953            Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
954        }
955    }
956}
957
958#[async_trait]
959impl PrometheusHandler for Instance {
960    #[tracing::instrument(skip_all)]
961    async fn do_query(
962        &self,
963        query: &PromQuery,
964        query_ctx: QueryContextRef,
965    ) -> server_error::Result<Output> {
966        let interceptor = self
967            .plugins
968            .get::<PromQueryInterceptorRef<server_error::Error>>();
969
970        self.plugins
971            .get::<PermissionCheckerRef>()
972            .as_ref()
973            .check_permission(query_ctx.current_user(), PermissionReq::PromQuery)
974            .context(AuthSnafu)?;
975
976        let stmt = QueryLanguageParser::parse_promql(query, &query_ctx).with_context(|_| {
977            ParsePromQLSnafu {
978                query: query.clone(),
979            }
980        })?;
981
982        let plan = self
983            .statement_executor
984            .plan(&stmt, query_ctx.clone())
985            .await
986            .map_err(BoxedError::new)
987            .context(ExecuteQuerySnafu)?;
988
989        let QueryStatement::Promql(eval_stmt, _) = &stmt else {
990            unreachable!("query is parsed from promql");
991        };
992
993        interceptor.pre_execute(query, &eval_stmt.expr, Some(&plan), query_ctx.clone())?;
994
995        // Take the EvalStmt from the original QueryStatement and use it to create the CatalogQueryStatement.
996        let query_statement = if let QueryStatement::Promql(eval_stmt, alias) = stmt {
997            CatalogQueryStatement::Promql(eval_stmt, alias)
998        } else {
999            // It should not happen since the query is already parsed successfully.
1000            return UnexpectedResultSnafu {
1001                reason: "The query should always be promql.".to_string(),
1002            }
1003            .fail();
1004        };
1005        let raw_query = query_statement.to_string();
1006
1007        let slow_query_timer = self
1008            .slow_query_options
1009            .enable
1010            .then(|| self.event_recorder.clone())
1011            .flatten()
1012            .map(|event_recorder| {
1013                SlowQueryTimer::new(
1014                    query_statement,
1015                    query_ctx.current_schema(),
1016                    self.slow_query_options.threshold,
1017                    self.slow_query_options.sample_ratio,
1018                    self.slow_query_options.record_type,
1019                    event_recorder,
1020                )
1021            });
1022
1023        let ticket = self.process_manager.register_query(
1024            query_ctx.current_catalog().to_string(),
1025            vec![query_ctx.current_schema()],
1026            raw_query,
1027            query_ctx.conn_info().to_string(),
1028            Some(query_ctx.process_id()),
1029            slow_query_timer,
1030        );
1031
1032        let query_fut = self.statement_executor.exec_plan(plan, query_ctx.clone());
1033
1034        let output = CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
1035            .await
1036            .map_err(|_| servers::error::CancelledSnafu.build())?
1037            .map(|output| {
1038                let Output { meta, data } = output;
1039                let data = match data {
1040                    OutputData::Stream(stream) => {
1041                        OutputData::Stream(Box::pin(CancellableStreamWrapper::new(stream, ticket)))
1042                    }
1043                    other => other,
1044                };
1045                Output { data, meta }
1046            })
1047            .map_err(BoxedError::new)
1048            .context(ExecuteQuerySnafu)?;
1049
1050        Ok(interceptor.post_execute(output, query_ctx)?)
1051    }
1052
1053    async fn query_metric_names(
1054        &self,
1055        matchers: Vec<Matcher>,
1056        ctx: &QueryContextRef,
1057    ) -> server_error::Result<Vec<String>> {
1058        self.handle_query_metric_names(matchers, ctx)
1059            .await
1060            .map_err(BoxedError::new)
1061            .context(ExecuteQuerySnafu)
1062    }
1063
1064    async fn query_label_values(
1065        &self,
1066        metric: String,
1067        label_name: String,
1068        matchers: Vec<Matcher>,
1069        start: SystemTime,
1070        end: SystemTime,
1071        ctx: &QueryContextRef,
1072    ) -> server_error::Result<Vec<String>> {
1073        self.handle_query_label_values(metric, label_name, matchers, start, end, ctx)
1074            .await
1075            .map_err(BoxedError::new)
1076            .context(ExecuteQuerySnafu)
1077    }
1078
1079    fn catalog_manager(&self) -> CatalogManagerRef {
1080        self.catalog_manager.clone()
1081    }
1082}
1083
1084/// Validate `stmt.database` permission if it's presented.
1085macro_rules! validate_db_permission {
1086    ($stmt: expr, $query_ctx: expr) => {
1087        if let Some(database) = &$stmt.database {
1088            validate_catalog_and_schema($query_ctx.current_catalog(), database, $query_ctx)
1089                .map_err(BoxedError::new)
1090                .context(SqlExecInterceptedSnafu)?;
1091        }
1092    };
1093}
1094
1095pub fn check_permission(
1096    plugins: Plugins,
1097    stmt: &Statement,
1098    query_ctx: &QueryContextRef,
1099) -> Result<()> {
1100    let need_validate = plugins
1101        .get::<QueryOptions>()
1102        .map(|opts| opts.disallow_cross_catalog_query)
1103        .unwrap_or_default();
1104
1105    if !need_validate {
1106        return Ok(());
1107    }
1108
1109    match stmt {
1110        // Will be checked in execution.
1111        // TODO(dennis): add a hook for admin commands.
1112        Statement::Admin(_) => {}
1113        // These are executed by query engine, and will be checked there.
1114        Statement::Query(_)
1115        | Statement::Explain(_)
1116        | Statement::Tql(_)
1117        | Statement::Delete(_)
1118        | Statement::DeclareCursor(_)
1119        | Statement::Copy(sql::statements::copy::Copy::CopyQueryTo(_)) => {}
1120        // database ops won't be checked
1121        Statement::CreateDatabase(_)
1122        | Statement::ShowDatabases(_)
1123        | Statement::DropDatabase(_)
1124        | Statement::AlterDatabase(_)
1125        | Statement::DropFlow(_)
1126        | Statement::Use(_) => {}
1127        #[cfg(feature = "enterprise")]
1128        Statement::DropTrigger(_) => {}
1129        Statement::ShowCreateDatabase(stmt) => {
1130            validate_database(&stmt.database_name, query_ctx)?;
1131        }
1132        Statement::ShowCreateTable(stmt) => {
1133            validate_param(&stmt.table_name, query_ctx)?;
1134        }
1135        Statement::ShowCreateFlow(stmt) => {
1136            validate_flow(&stmt.flow_name, query_ctx)?;
1137        }
1138        #[cfg(feature = "enterprise")]
1139        Statement::ShowCreateTrigger(stmt) => {
1140            validate_param(&stmt.trigger_name, query_ctx)?;
1141        }
1142        Statement::ShowCreateView(stmt) => {
1143            validate_param(&stmt.view_name, query_ctx)?;
1144        }
1145        Statement::CreateExternalTable(stmt) => {
1146            validate_param(&stmt.name, query_ctx)?;
1147        }
1148        Statement::CreateFlow(stmt) => {
1149            // TODO: should also validate source table name here?
1150            validate_param(&stmt.sink_table_name, query_ctx)?;
1151        }
1152        #[cfg(feature = "enterprise")]
1153        Statement::CreateTrigger(stmt) => {
1154            validate_param(&stmt.trigger_name, query_ctx)?;
1155        }
1156        Statement::CreateView(stmt) => {
1157            validate_param(&stmt.name, query_ctx)?;
1158        }
1159        Statement::AlterTable(stmt) => {
1160            validate_param(stmt.table_name(), query_ctx)?;
1161        }
1162        #[cfg(feature = "enterprise")]
1163        Statement::AlterTrigger(_) => {}
1164        // set/show variable now only alter/show variable in session
1165        Statement::SetVariables(_) | Statement::ShowVariables(_) => {}
1166        // show charset and show collation won't be checked
1167        Statement::ShowCharset(_) | Statement::ShowCollation(_) => {}
1168
1169        Statement::Comment(comment) => match &comment.object {
1170            CommentObject::Table(table) => validate_param(table, query_ctx)?,
1171            CommentObject::Column { table, .. } => validate_param(table, query_ctx)?,
1172            CommentObject::Flow(flow) => validate_flow(flow, query_ctx)?,
1173        },
1174
1175        Statement::Insert(insert) => {
1176            let name = insert.table_name().context(ParseSqlSnafu)?;
1177            validate_param(name, query_ctx)?;
1178        }
1179        Statement::CreateTable(stmt) => {
1180            validate_param(&stmt.name, query_ctx)?;
1181        }
1182        Statement::CreateTableLike(stmt) => {
1183            validate_param(&stmt.table_name, query_ctx)?;
1184            validate_param(&stmt.source_name, query_ctx)?;
1185        }
1186        Statement::DropTable(drop_stmt) => {
1187            for table_name in drop_stmt.table_names() {
1188                validate_param(table_name, query_ctx)?;
1189            }
1190        }
1191        Statement::DropView(stmt) => {
1192            validate_param(&stmt.view_name, query_ctx)?;
1193        }
1194        Statement::ShowTables(stmt) => {
1195            validate_db_permission!(stmt, query_ctx);
1196        }
1197        Statement::ShowTableStatus(stmt) => {
1198            validate_db_permission!(stmt, query_ctx);
1199        }
1200        Statement::ShowColumns(stmt) => {
1201            validate_db_permission!(stmt, query_ctx);
1202        }
1203        Statement::ShowIndex(stmt) => {
1204            validate_db_permission!(stmt, query_ctx);
1205        }
1206        Statement::ShowRegion(stmt) => {
1207            validate_db_permission!(stmt, query_ctx);
1208        }
1209        Statement::ShowViews(stmt) => {
1210            validate_db_permission!(stmt, query_ctx);
1211        }
1212        Statement::ShowFlows(stmt) => {
1213            validate_db_permission!(stmt, query_ctx);
1214        }
1215        #[cfg(feature = "enterprise")]
1216        Statement::ShowTriggers(_stmt) => {
1217            // The trigger is organized based on the catalog dimension, so there
1218            // is no need to check the permission of the database(schema).
1219        }
1220        Statement::ShowStatus(_stmt) => {}
1221        Statement::ShowSearchPath(_stmt) => {}
1222        Statement::DescribeTable(stmt) => {
1223            validate_param(stmt.name(), query_ctx)?;
1224        }
1225        Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => match stmt {
1226            CopyTable::To(copy_table_to) => validate_param(&copy_table_to.table_name, query_ctx)?,
1227            CopyTable::From(copy_table_from) => {
1228                validate_param(&copy_table_from.table_name, query_ctx)?
1229            }
1230        },
1231        Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
1232            match copy_database {
1233                CopyDatabase::To(stmt) => validate_database(&stmt.database_name, query_ctx)?,
1234                CopyDatabase::From(stmt) => validate_database(&stmt.database_name, query_ctx)?,
1235            }
1236        }
1237        Statement::TruncateTable(stmt) => {
1238            validate_param(stmt.table_name(), query_ctx)?;
1239        }
1240        // cursor operations are always allowed once it's created
1241        Statement::FetchCursor(_) | Statement::CloseCursor(_) => {}
1242        // User can only kill process in their own catalog.
1243        Statement::Kill(_) => {}
1244        // SHOW PROCESSLIST
1245        Statement::ShowProcesslist(_) => {}
1246    }
1247    Ok(())
1248}
1249
1250fn validate_param(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1251    let (catalog, schema, _) = table_idents_to_full_name(name, query_ctx)
1252        .map_err(BoxedError::new)
1253        .context(ExternalSnafu)?;
1254
1255    validate_catalog_and_schema(&catalog, &schema, query_ctx)
1256        .map_err(BoxedError::new)
1257        .context(SqlExecInterceptedSnafu)
1258}
1259
1260fn validate_flow(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1261    let catalog = match &name.0[..] {
1262        [_flow] => query_ctx.current_catalog().to_string(),
1263        [catalog, _flow] => catalog.to_string_unquoted(),
1264        _ => {
1265            return InvalidSqlSnafu {
1266                err_msg: format!(
1267                    "expect flow name to be <catalog>.<flow_name> or <flow_name>, actual: {name}",
1268                ),
1269            }
1270            .fail();
1271        }
1272    };
1273
1274    let schema = query_ctx.current_schema();
1275
1276    validate_catalog_and_schema(&catalog, &schema, query_ctx)
1277        .map_err(BoxedError::new)
1278        .context(SqlExecInterceptedSnafu)
1279}
1280
1281fn validate_database(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1282    let (catalog, schema) = match &name.0[..] {
1283        [schema] => (
1284            query_ctx.current_catalog().to_string(),
1285            schema.to_string_unquoted(),
1286        ),
1287        [catalog, schema] => (catalog.to_string_unquoted(), schema.to_string_unquoted()),
1288        _ => InvalidSqlSnafu {
1289            err_msg: format!(
1290                "expect database name to be <catalog>.<schema> or <schema>, actual: {name}",
1291            ),
1292        }
1293        .fail()?,
1294    };
1295
1296    validate_catalog_and_schema(&catalog, &schema, query_ctx)
1297        .map_err(BoxedError::new)
1298        .context(SqlExecInterceptedSnafu)
1299}
1300
1301fn is_readonly_plan(plan: &LogicalPlan) -> bool {
1302    !matches!(plan, LogicalPlan::Dml(_) | LogicalPlan::Ddl(_))
1303}
1304
1305fn should_track_statement_process(stmt: &Statement) -> bool {
1306    stmt.is_readonly()
1307        || matches!(stmt, Statement::Insert(insert) if insert.has_non_values_query_source())
1308}
1309
1310fn should_track_plan_process(stmt: Option<&Statement>, plan: &LogicalPlan) -> bool {
1311    is_readonly_plan(plan)
1312        || matches!(stmt, Some(Statement::Insert(insert)) if insert.has_non_values_query_source())
1313}
1314
1315#[cfg(test)]
1316mod tests {
1317    use std::collections::HashMap;
1318    use std::future::Future;
1319    use std::pin::Pin;
1320    use std::sync::atomic::{AtomicBool, Ordering};
1321    use std::sync::{Arc, Barrier};
1322    use std::task::{Context, Poll};
1323    use std::thread;
1324    use std::time::{Duration, Instant};
1325
1326    use api::v1::meta::{ProcedureDetailResponse, ReconcileRequest, ReconcileResponse};
1327    use catalog::process_manager::ProcessManager;
1328    use common_base::Plugins;
1329    use common_error::ext::{BoxedError, PlainError};
1330    use common_error::status_code::StatusCode;
1331    use common_meta::cache::LayeredCacheRegistryBuilder;
1332    use common_meta::kv_backend::memory::MemoryKvBackend;
1333    use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutor};
1334    use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
1335    use common_meta::rpc::procedure::{
1336        MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
1337    };
1338    use common_query::Output;
1339    use common_recordbatch::{
1340        OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream,
1341    };
1342    use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
1343    use datafusion_expr::dml::InsertOp;
1344    use datafusion_expr::{LogicalPlanBuilder, LogicalTableSource};
1345    use datatypes::prelude::ConcreteDataType;
1346    use datatypes::schema::{ColumnSchema, Schema as GtSchema, SchemaRef as GtSchemaRef};
1347    use query::query_engine::options::QueryOptions;
1348    use session::context::{Channel, ConnInfo, QueryContext, QueryContextBuilder};
1349    use snafu::{Location, Snafu};
1350    use sql::dialect::GreptimeDbDialect;
1351    use store_api::data_source::DataSource;
1352    use store_api::storage::ScanRequest;
1353    use strfmt::Format;
1354    use table::metadata::{FilterPushDownType, TableInfo, TableInfoBuilder, TableMetaBuilder};
1355    use table::test_util::EmptyTable;
1356    use table::{Table, TableRef};
1357    use tokio::sync::{mpsc, oneshot};
1358
1359    use super::*;
1360    use crate::frontend::FrontendOptions;
1361    use crate::instance::builder::FrontendBuilder;
1362
1363    fn parse_test_sql(sql: &str) -> Vec<Statement> {
1364        parse_stmt(sql, &GreptimeDbDialect {}).unwrap()
1365    }
1366
1367    #[test]
1368    fn test_validate_analyze_stream_statement_strictness() {
1369        for sql in [
1370            "select 1",
1371            "explain analyze select 1",
1372            "explain analyze verbose format text select 1",
1373            "explain analyze verbose format graphviz select 1",
1374        ] {
1375            let mut stmts = parse_test_sql(sql);
1376            assert!(
1377                validate_analyze_stream_statement(&mut stmts[0]).is_err(),
1378                "{sql}"
1379            );
1380        }
1381
1382        for sql in [
1383            "explain analyze verbose select 1",
1384            "explain analyze verbose format json select 1",
1385        ] {
1386            let mut stmts = parse_test_sql(sql);
1387            assert!(
1388                validate_analyze_stream_statement(&mut stmts[0]).is_ok(),
1389                "{sql}"
1390            );
1391            let Statement::Explain(explain) = &stmts[0] else {
1392                unreachable!();
1393            };
1394            assert!(explain.format.is_none());
1395        }
1396
1397        assert_eq!(
1398            parse_test_sql("explain analyze verbose select 1; select 2").len(),
1399            2
1400        );
1401    }
1402
1403    #[derive(Debug, Snafu)]
1404    enum TestError {
1405        #[snafu(display("Failed to build test cache registry"))]
1406        BuildCacheRegistry {
1407            source: cache::error::Error,
1408            #[snafu(implicit)]
1409            location: Location,
1410        },
1411
1412        #[snafu(display("Failed to build test table meta for table: {table_name}"))]
1413        BuildTableMeta {
1414            table_name: String,
1415            source: table::metadata::TableMetaBuilderError,
1416            #[snafu(implicit)]
1417            location: Location,
1418        },
1419
1420        #[snafu(display("Failed to build test table info for table: {table_name}"))]
1421        BuildTableInfo {
1422            table_name: String,
1423            source: table::metadata::TableInfoBuilderError,
1424            #[snafu(implicit)]
1425            location: Location,
1426        },
1427
1428        #[snafu(display("Failed to register test table: {table_name}"))]
1429        RegisterTable {
1430            table_name: String,
1431            source: catalog::error::Error,
1432            #[snafu(implicit)]
1433            location: Location,
1434        },
1435
1436        #[snafu(display("Failed to build test frontend instance"))]
1437        BuildFrontend {
1438            source: crate::error::Error,
1439            #[snafu(implicit)]
1440            location: Location,
1441        },
1442
1443        #[snafu(display("Expected exactly one output for SQL `{sql}`, got {actual}"))]
1444        UnexpectedOutputCount {
1445            sql: String,
1446            actual: usize,
1447            #[snafu(implicit)]
1448            location: Location,
1449        },
1450
1451        #[snafu(display("Failed to execute SQL `{sql}`"))]
1452        ExecuteSql {
1453            sql: String,
1454            source: crate::error::Error,
1455            #[snafu(implicit)]
1456            location: Location,
1457        },
1458
1459        #[snafu(display("Timed out waiting for insert-select start notification"))]
1460        InsertStartTimeout {
1461            source: tokio::time::error::Elapsed,
1462            #[snafu(implicit)]
1463            location: Location,
1464        },
1465
1466        #[snafu(display("Insert-select start notification channel closed"))]
1467        InsertStartChannelClosed {
1468            #[snafu(implicit)]
1469            location: Location,
1470        },
1471
1472        #[snafu(display("Failed to release blocking insert-select interceptor"))]
1473        ReleaseBlockedInsert {
1474            #[snafu(implicit)]
1475            location: Location,
1476        },
1477
1478        #[snafu(display("Timed out waiting for insert-select source to be polled"))]
1479        SourcePollTimeout {
1480            source: tokio::time::error::Elapsed,
1481            #[snafu(implicit)]
1482            location: Location,
1483        },
1484
1485        #[snafu(display("Insert-select source poll notification channel closed"))]
1486        SourcePollChannelClosed {
1487            source: oneshot::error::RecvError,
1488            #[snafu(implicit)]
1489            location: Location,
1490        },
1491
1492        #[snafu(display("Timed out waiting for insert task to finish"))]
1493        InsertTaskTimeout {
1494            source: tokio::time::error::Elapsed,
1495            #[snafu(implicit)]
1496            location: Location,
1497        },
1498
1499        #[snafu(display("Insert task panicked"))]
1500        InsertTaskPanic {
1501            source: tokio::task::JoinError,
1502            #[snafu(implicit)]
1503            location: Location,
1504        },
1505
1506        #[snafu(display("Expected insert-select to be cancelled"))]
1507        InsertSelectNotCancelled {
1508            #[snafu(implicit)]
1509            location: Location,
1510        },
1511    }
1512
1513    type TestResult<T> = std::result::Result<T, TestError>;
1514
1515    fn parse_one_sql(sql: &str) -> Statement {
1516        parse_stmt(sql, &GreptimeDbDialect {}).unwrap().remove(0)
1517    }
1518
1519    fn test_query_ctx(process_id: u32) -> QueryContextRef {
1520        Arc::new(
1521            QueryContextBuilder::default()
1522                .channel(Channel::Mysql)
1523                .conn_info(ConnInfo::new(None, Channel::Mysql))
1524                .process_id(process_id)
1525                .build(),
1526        )
1527    }
1528
1529    struct BlockingInsertSelectInterceptor {
1530        started_tx: mpsc::UnboundedSender<()>,
1531        finish_rx: std::sync::Mutex<Option<oneshot::Receiver<()>>>,
1532    }
1533
1534    impl BlockingInsertSelectInterceptor {
1535        fn new(started_tx: mpsc::UnboundedSender<()>, finish_rx: oneshot::Receiver<()>) -> Self {
1536            Self {
1537                started_tx,
1538                finish_rx: std::sync::Mutex::new(Some(finish_rx)),
1539            }
1540        }
1541    }
1542
1543    impl SqlQueryInterceptor for BlockingInsertSelectInterceptor {
1544        type Error = Error;
1545
1546        fn pre_execute(
1547            &self,
1548            statement: Option<&Statement>,
1549            _plan: Option<&LogicalPlan>,
1550            _query_ctx: QueryContextRef,
1551        ) -> Result<()> {
1552            let Some(Statement::Insert(insert)) = statement else {
1553                return Ok(());
1554            };
1555            if !insert.has_non_values_query_source() {
1556                return Ok(());
1557            }
1558
1559            let finish_rx = self.finish_rx.lock().unwrap().take().unwrap();
1560            let _ = self.started_tx.send(());
1561            tokio::task::block_in_place(|| {
1562                tokio::runtime::Handle::current()
1563                    .block_on(finish_rx)
1564                    .unwrap();
1565            });
1566            Ok(())
1567        }
1568    }
1569
1570    struct PendingRecordBatchStream {
1571        schema: GtSchemaRef,
1572        polled_tx: Option<oneshot::Sender<()>>,
1573        _finish_tx: oneshot::Sender<()>,
1574        finish_rx: Pin<Box<oneshot::Receiver<()>>>,
1575    }
1576
1577    impl RecordBatchStream for PendingRecordBatchStream {
1578        fn schema(&self) -> GtSchemaRef {
1579            self.schema.clone()
1580        }
1581
1582        fn output_ordering(&self) -> Option<&[OrderOption]> {
1583            None
1584        }
1585
1586        fn metrics(&self) -> Option<common_recordbatch::adapter::RecordBatchMetrics> {
1587            None
1588        }
1589    }
1590
1591    impl Stream for PendingRecordBatchStream {
1592        type Item = common_recordbatch::error::Result<RecordBatch>;
1593
1594        fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1595            if let Some(polled_tx) = self.polled_tx.take() {
1596                let _ = polled_tx.send(());
1597            }
1598
1599            match self.finish_rx.as_mut().poll(cx) {
1600                Poll::Ready(_) => Poll::Ready(None),
1601                Poll::Pending => Poll::Pending,
1602            }
1603        }
1604    }
1605
1606    impl Unpin for PendingRecordBatchStream {}
1607
1608    struct PendingDataSource {
1609        schema: GtSchemaRef,
1610        polled_tx: std::sync::Mutex<Option<oneshot::Sender<()>>>,
1611    }
1612
1613    impl DataSource for PendingDataSource {
1614        fn get_stream(
1615            &self,
1616            _request: ScanRequest,
1617        ) -> std::result::Result<SendableRecordBatchStream, BoxedError> {
1618            let (finish_tx, finish_rx) = oneshot::channel();
1619            let mut polled_tx = self.polled_tx.lock().map_err(|_| {
1620                BoxedError::new(PlainError::new(
1621                    "pending data source lock poisoned".to_string(),
1622                    StatusCode::Unexpected,
1623                ))
1624            })?;
1625            Ok(Box::pin(PendingRecordBatchStream {
1626                schema: self.schema.clone(),
1627                polled_tx: polled_tx.take(),
1628                _finish_tx: finish_tx,
1629                finish_rx: Box::pin(finish_rx),
1630            }))
1631        }
1632    }
1633
1634    struct NoopProcedureExecutor;
1635
1636    #[async_trait::async_trait]
1637    impl ProcedureExecutor for NoopProcedureExecutor {
1638        async fn submit_ddl_task(
1639            &self,
1640            _ctx: &ExecutorContext,
1641            _request: SubmitDdlTaskRequest,
1642        ) -> common_meta::error::Result<SubmitDdlTaskResponse> {
1643            common_meta::error::UnsupportedSnafu {
1644                operation: "submit_ddl_task",
1645            }
1646            .fail()
1647        }
1648
1649        async fn migrate_region(
1650            &self,
1651            _ctx: &ExecutorContext,
1652            _request: MigrateRegionRequest,
1653        ) -> common_meta::error::Result<MigrateRegionResponse> {
1654            common_meta::error::UnsupportedSnafu {
1655                operation: "migrate_region",
1656            }
1657            .fail()
1658        }
1659
1660        async fn reconcile(
1661            &self,
1662            _ctx: &ExecutorContext,
1663            _request: ReconcileRequest,
1664        ) -> common_meta::error::Result<ReconcileResponse> {
1665            common_meta::error::UnsupportedSnafu {
1666                operation: "reconcile",
1667            }
1668            .fail()
1669        }
1670
1671        async fn query_procedure_state(
1672            &self,
1673            _ctx: &ExecutorContext,
1674            _pid: &str,
1675        ) -> common_meta::error::Result<ProcedureStateResponse> {
1676            common_meta::error::UnsupportedSnafu {
1677                operation: "query_procedure_state",
1678            }
1679            .fail()
1680        }
1681
1682        async fn list_procedures(
1683            &self,
1684            _ctx: &ExecutorContext,
1685        ) -> common_meta::error::Result<ProcedureDetailResponse> {
1686            common_meta::error::UnsupportedSnafu {
1687                operation: "list_procedures",
1688            }
1689            .fail()
1690        }
1691    }
1692
1693    fn test_cache_registry(
1694        kv_backend: common_meta::kv_backend::KvBackendRef,
1695    ) -> TestResult<common_meta::cache::LayeredCacheRegistryRef> {
1696        Ok(Arc::new(
1697            cache::with_default_composite_cache_registry(
1698                LayeredCacheRegistryBuilder::default()
1699                    .add_cache_registry(cache::build_fundamental_cache_registry(kv_backend)),
1700            )
1701            .context(BuildCacheRegistrySnafu)?
1702            .build(),
1703        ))
1704    }
1705
1706    fn test_table_info(table_id: u32, table_name: &str) -> TestResult<TableInfo> {
1707        let schema = Arc::new(GtSchema::new(vec![
1708            ColumnSchema::new("id", ConcreteDataType::int32_datatype(), false),
1709            ColumnSchema::new(
1710                "ts",
1711                ConcreteDataType::timestamp_millisecond_datatype(),
1712                false,
1713            )
1714            .with_time_index(true),
1715        ]));
1716        let table_meta = TableMetaBuilder::empty()
1717            .schema(schema)
1718            .primary_key_indices(vec![0])
1719            .value_indices(vec![1])
1720            .next_column_id(1024)
1721            .build()
1722            .with_context(|_| BuildTableMetaSnafu {
1723                table_name: table_name.to_string(),
1724            })?;
1725
1726        TableInfoBuilder::new(table_name, table_meta)
1727            .table_id(table_id)
1728            .build()
1729            .with_context(|_| BuildTableInfoSnafu {
1730                table_name: table_name.to_string(),
1731            })
1732    }
1733
1734    fn test_table(table_id: u32, table_name: &str) -> TestResult<table::TableRef> {
1735        let table_info = test_table_info(table_id, table_name)?;
1736        Ok(EmptyTable::from_table_info(&table_info))
1737    }
1738
1739    fn pending_table(
1740        table_id: u32,
1741        table_name: &str,
1742        polled_tx: oneshot::Sender<()>,
1743    ) -> TestResult<table::TableRef> {
1744        let table_info = test_table_info(table_id, table_name)?;
1745        let data_source = Arc::new(PendingDataSource {
1746            schema: table_info.meta.schema.clone(),
1747            polled_tx: std::sync::Mutex::new(Some(polled_tx)),
1748        });
1749
1750        Ok(Arc::new(Table::new(
1751            Arc::new(table_info),
1752            FilterPushDownType::Unsupported,
1753            data_source,
1754        )))
1755    }
1756
1757    async fn test_instance_with_tables(
1758        source_table: TableRef,
1759        target_table: TableRef,
1760    ) -> TestResult<Instance> {
1761        test_instance_with_plugins(source_table, target_table, Plugins::new()).await
1762    }
1763
1764    async fn test_instance_with_insert_select_interceptor(
1765        interceptor: SqlQueryInterceptorRef<Error>,
1766    ) -> TestResult<Instance> {
1767        let plugins = Plugins::new();
1768        plugins.insert::<SqlQueryInterceptorRef<Error>>(interceptor);
1769
1770        test_instance_with_plugins(
1771            test_table(1024, "source")?,
1772            test_table(1025, "target")?,
1773            plugins,
1774        )
1775        .await
1776    }
1777
1778    async fn test_instance_with_plugins(
1779        source_table: TableRef,
1780        target_table: TableRef,
1781        plugins: Plugins,
1782    ) -> TestResult<Instance> {
1783        let kv_backend = Arc::new(MemoryKvBackend::new());
1784        let process_manager = Arc::new(ProcessManager::new("test-frontend".to_string(), None));
1785        let catalog_manager = catalog::memory::MemoryCatalogManager::new_with_table(source_table);
1786        let target_table_name = "target";
1787        catalog_manager
1788            .register_table_sync(catalog::RegisterTableRequest {
1789                catalog: "greptime".to_string(),
1790                schema: "public".to_string(),
1791                table_name: target_table_name.to_string(),
1792                table_id: 1025,
1793                table: target_table,
1794            })
1795            .with_context(|_| RegisterTableSnafu {
1796                table_name: target_table_name.to_string(),
1797            })?;
1798        catalog_manager.register_process_list_table(process_manager.clone());
1799
1800        let cache_registry = test_cache_registry(kv_backend.clone())?;
1801
1802        FrontendBuilder::new(
1803            FrontendOptions::default(),
1804            kv_backend,
1805            cache_registry,
1806            catalog_manager,
1807            Arc::new(client::client_manager::NodeClients::default()),
1808            Arc::new(NoopProcedureExecutor),
1809            process_manager,
1810        )
1811        .with_plugin(plugins)
1812        .try_build()
1813        .await
1814        .context(BuildFrontendSnafu)
1815    }
1816
1817    async fn execute_one_sql(
1818        instance: &Instance,
1819        sql: &str,
1820        query_ctx: QueryContextRef,
1821    ) -> TestResult<Output> {
1822        let mut results = instance.do_query_inner(sql, query_ctx).await;
1823        ensure!(
1824            results.len() == 1,
1825            UnexpectedOutputCountSnafu {
1826                sql: sql.to_string(),
1827                actual: results.len(),
1828            }
1829        );
1830        results.remove(0).with_context(|_| ExecuteSqlSnafu {
1831            sql: sql.to_string(),
1832        })
1833    }
1834
1835    #[test]
1836    fn test_fast_legacy_check_deadlock_prevention() {
1837        // Create a DashMap to simulate the cache
1838        let cache = DashMap::new();
1839
1840        // Pre-populate cache with some entries
1841        cache.insert("metric1".to_string(), true); // legacy mode
1842        cache.insert("metric2".to_string(), false); // prom mode
1843        cache.insert("metric3".to_string(), true); // legacy mode
1844
1845        // Test case 1: Normal operation with cache hits
1846        let metric1 = "metric1".to_string();
1847        let metric4 = "metric4".to_string();
1848        let names1 = vec![&metric1, &metric4];
1849        let result = fast_legacy_check(&cache, &names1);
1850        assert!(result.is_ok());
1851        assert_eq!(result.unwrap(), Some(true)); // should return legacy mode
1852
1853        // Verify that metric4 was added to cache
1854        assert!(cache.contains_key("metric4"));
1855        assert!(*cache.get("metric4").unwrap().value());
1856
1857        // Test case 2: No cache hits
1858        let metric5 = "metric5".to_string();
1859        let metric6 = "metric6".to_string();
1860        let names2 = vec![&metric5, &metric6];
1861        let result = fast_legacy_check(&cache, &names2);
1862        assert!(result.is_ok());
1863        assert_eq!(result.unwrap(), None); // should return None as no cache hits
1864
1865        // Test case 3: Incompatible modes should return error
1866        let cache_incompatible = DashMap::new();
1867        cache_incompatible.insert("metric1".to_string(), true); // legacy
1868        cache_incompatible.insert("metric2".to_string(), false); // prom
1869        let metric1_test = "metric1".to_string();
1870        let metric2_test = "metric2".to_string();
1871        let names3 = vec![&metric1_test, &metric2_test];
1872        let result = fast_legacy_check(&cache_incompatible, &names3);
1873        assert!(result.is_err()); // should error due to incompatible modes
1874
1875        // Test case 4: Intensive concurrent access to test deadlock prevention
1876        // This test specifically targets the scenario where multiple threads
1877        // access the same cache entries simultaneously
1878        let cache_concurrent = Arc::new(DashMap::new());
1879        cache_concurrent.insert("shared_metric".to_string(), true);
1880
1881        let num_threads = 8;
1882        let operations_per_thread = 100;
1883        let barrier = Arc::new(Barrier::new(num_threads));
1884        let success_flag = Arc::new(AtomicBool::new(true));
1885
1886        let handles: Vec<_> = (0..num_threads)
1887            .map(|thread_id| {
1888                let cache_clone = Arc::clone(&cache_concurrent);
1889                let barrier_clone = Arc::clone(&barrier);
1890                let success_flag_clone = Arc::clone(&success_flag);
1891
1892                thread::spawn(move || {
1893                    // Wait for all threads to be ready
1894                    barrier_clone.wait();
1895
1896                    let start_time = Instant::now();
1897                    for i in 0..operations_per_thread {
1898                        // Each operation references existing cache entry and adds new ones
1899                        let shared_metric = "shared_metric".to_string();
1900                        let new_metric = format!("thread_{}_metric_{}", thread_id, i);
1901                        let names = vec![&shared_metric, &new_metric];
1902
1903                        match fast_legacy_check(&cache_clone, &names) {
1904                            Ok(_) => {}
1905                            Err(_) => {
1906                                success_flag_clone.store(false, Ordering::Relaxed);
1907                                return;
1908                            }
1909                        }
1910
1911                        // If the test takes too long, it likely means deadlock
1912                        if start_time.elapsed() > Duration::from_secs(10) {
1913                            success_flag_clone.store(false, Ordering::Relaxed);
1914                            return;
1915                        }
1916                    }
1917                })
1918            })
1919            .collect();
1920
1921        // Join all threads with timeout
1922        let start_time = Instant::now();
1923        for (i, handle) in handles.into_iter().enumerate() {
1924            let join_result = handle.join();
1925
1926            // Check if we're taking too long (potential deadlock)
1927            if start_time.elapsed() > Duration::from_secs(30) {
1928                panic!("Test timed out - possible deadlock detected!");
1929            }
1930
1931            if join_result.is_err() {
1932                panic!("Thread {} panicked during execution", i);
1933            }
1934        }
1935
1936        // Verify all operations completed successfully
1937        assert!(
1938            success_flag.load(Ordering::Relaxed),
1939            "Some operations failed"
1940        );
1941
1942        // Verify that many new entries were added (proving operations completed)
1943        let final_count = cache_concurrent.len();
1944        assert!(
1945            final_count > 1 + num_threads * operations_per_thread / 2,
1946            "Expected more cache entries, got {}",
1947            final_count
1948        );
1949    }
1950
1951    #[test]
1952    fn test_should_track_statement_process() {
1953        assert!(should_track_statement_process(&parse_one_sql(
1954            "SELECT * FROM demo"
1955        )));
1956        assert!(should_track_statement_process(&parse_one_sql(
1957            "INSERT INTO demo SELECT * FROM source"
1958        )));
1959        assert!(!should_track_statement_process(&parse_one_sql(
1960            "INSERT INTO demo VALUES (1)"
1961        )));
1962        assert!(!should_track_statement_process(&parse_one_sql(
1963            "INSERT INTO demo VALUES (now())"
1964        )));
1965    }
1966
1967    #[test]
1968    fn test_should_track_plan_process() {
1969        let select_stmt = parse_one_sql("SELECT * FROM demo");
1970        let insert_select_stmt = parse_one_sql("INSERT INTO demo SELECT * FROM source");
1971        let insert_values_stmt = parse_one_sql("INSERT INTO demo VALUES (now())");
1972
1973        let empty_plan = LogicalPlanBuilder::empty(false).build().unwrap();
1974        assert!(should_track_plan_process(Some(&select_stmt), &empty_plan));
1975        assert!(should_track_plan_process(
1976            Some(&insert_select_stmt),
1977            &insert_dml_plan()
1978        ));
1979        assert!(!should_track_plan_process(
1980            Some(&insert_values_stmt),
1981            &insert_dml_plan()
1982        ));
1983        assert!(!should_track_plan_process(None, &insert_dml_plan()));
1984    }
1985
1986    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1987    async fn test_insert_select_is_visible_in_show_processlist() -> TestResult<()> {
1988        let insert_sql = "INSERT INTO target SELECT * FROM source";
1989        let (started_tx, mut started_rx) = mpsc::unbounded_channel();
1990        let (finish_tx, finish_rx) = oneshot::channel();
1991        let interceptor = Arc::new(BlockingInsertSelectInterceptor::new(started_tx, finish_rx));
1992        let instance = Arc::new(test_instance_with_insert_select_interceptor(interceptor).await?);
1993
1994        let insert_task = tokio::spawn({
1995            let instance = instance.clone();
1996            async move { execute_one_sql(&instance, insert_sql, test_query_ctx(4242)).await }
1997        });
1998
1999        tokio::time::timeout(Duration::from_secs(5), started_rx.recv())
2000            .await
2001            .context(InsertStartTimeoutSnafu)?
2002            .context(InsertStartChannelClosedSnafu)?;
2003
2004        let output = execute_one_sql(&instance, "SHOW PROCESSLIST", test_query_ctx(43)).await?;
2005        let process_list = output.data.pretty_print().await;
2006        assert!(
2007            process_list.contains(insert_sql),
2008            "process list did not contain running insert:\n{process_list}"
2009        );
2010
2011        finish_tx
2012            .send(())
2013            .map_err(|_| ReleaseBlockedInsertSnafu.build())?;
2014        insert_task.await.context(InsertTaskPanicSnafu)??;
2015
2016        Ok(())
2017    }
2018
2019    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2020    async fn test_kill_query_cancels_insert_select() -> TestResult<()> {
2021        assert_kill_cancels_insert_select("KILL QUERY 4242").await
2022    }
2023
2024    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2025    async fn test_kill_process_id_cancels_insert_select() -> TestResult<()> {
2026        assert_kill_cancels_insert_select("KILL 'test-frontend/4242'").await
2027    }
2028
2029    async fn assert_kill_cancels_insert_select(kill_sql: &str) -> TestResult<()> {
2030        let insert_sql = "INSERT INTO target SELECT * FROM source";
2031        let (source_polled_tx, source_polled_rx) = oneshot::channel();
2032        let instance = Arc::new(
2033            test_instance_with_tables(
2034                pending_table(1024, "source", source_polled_tx)?,
2035                test_table(1025, "target")?,
2036            )
2037            .await?,
2038        );
2039
2040        let insert_task = tokio::spawn({
2041            let instance = instance.clone();
2042            async move { execute_one_sql(&instance, insert_sql, test_query_ctx(4242)).await }
2043        });
2044
2045        tokio::time::timeout(Duration::from_secs(5), source_polled_rx)
2046            .await
2047            .context(SourcePollTimeoutSnafu)?
2048            .context(SourcePollChannelClosedSnafu)?;
2049
2050        let output = execute_one_sql(&instance, kill_sql, test_query_ctx(43)).await?;
2051        assert!(matches!(output.data, OutputData::AffectedRows(1)));
2052
2053        let insert_result = tokio::time::timeout(Duration::from_secs(5), insert_task)
2054            .await
2055            .context(InsertTaskTimeoutSnafu)?
2056            .context(InsertTaskPanicSnafu)?;
2057        let err = match insert_result {
2058            Ok(_) => return InsertSelectNotCancelledSnafu.fail(),
2059            Err(TestError::ExecuteSql { source, .. }) => source,
2060            Err(err) => return Err(err),
2061        };
2062        assert_eq!(StatusCode::Cancelled, err.status_code());
2063
2064        let output = execute_one_sql(&instance, "SHOW PROCESSLIST", test_query_ctx(43)).await?;
2065        let process_list = output.data.pretty_print().await;
2066        assert!(
2067            !process_list.contains(insert_sql),
2068            "process list still contains killed insert:\n{process_list}"
2069        );
2070
2071        Ok(())
2072    }
2073
2074    fn insert_dml_plan() -> LogicalPlan {
2075        let schema = SchemaRef::new(Schema::new(vec![Field::new(
2076            "value",
2077            DataType::Int64,
2078            true,
2079        )]));
2080        let target = Arc::new(LogicalTableSource::new(schema));
2081        let input = LogicalPlanBuilder::empty(false).build().unwrap();
2082
2083        LogicalPlanBuilder::insert_into(input, "demo", target, InsertOp::Append)
2084            .unwrap()
2085            .build()
2086            .unwrap()
2087    }
2088
2089    #[test]
2090    fn test_exec_validation() {
2091        let query_ctx = QueryContext::arc();
2092        let plugins: Plugins = Plugins::new();
2093        plugins.insert(QueryOptions {
2094            disallow_cross_catalog_query: true,
2095        });
2096
2097        let sql = r#"
2098        SELECT * FROM demo;
2099        EXPLAIN SELECT * FROM demo;
2100        CREATE DATABASE test_database;
2101        SHOW DATABASES;
2102        "#;
2103        let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
2104        assert_eq!(stmts.len(), 4);
2105        for stmt in stmts {
2106            let re = check_permission(plugins.clone(), &stmt, &query_ctx);
2107            re.unwrap();
2108        }
2109
2110        let sql = r#"
2111        SHOW CREATE TABLE demo;
2112        ALTER TABLE demo ADD COLUMN new_col INT;
2113        "#;
2114        let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
2115        assert_eq!(stmts.len(), 2);
2116        for stmt in stmts {
2117            let re = check_permission(plugins.clone(), &stmt, &query_ctx);
2118            re.unwrap();
2119        }
2120
2121        fn replace_test(template_sql: &str, plugins: Plugins, query_ctx: &QueryContextRef) {
2122            // test right
2123            let right = vec![("", ""), ("", "public."), ("greptime.", "public.")];
2124            for (catalog, schema) in right {
2125                let sql = do_fmt(template_sql, catalog, schema);
2126                do_test(&sql, plugins.clone(), query_ctx, true);
2127            }
2128
2129            let wrong = vec![
2130                ("wrongcatalog.", "public."),
2131                ("wrongcatalog.", "wrongschema."),
2132            ];
2133            for (catalog, schema) in wrong {
2134                let sql = do_fmt(template_sql, catalog, schema);
2135                do_test(&sql, plugins.clone(), query_ctx, false);
2136            }
2137        }
2138
2139        fn do_fmt(template: &str, catalog: &str, schema: &str) -> String {
2140            let vars = HashMap::from([
2141                ("catalog".to_string(), catalog),
2142                ("schema".to_string(), schema),
2143            ]);
2144            template.format(&vars).unwrap()
2145        }
2146
2147        fn do_test(sql: &str, plugins: Plugins, query_ctx: &QueryContextRef, is_ok: bool) {
2148            let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
2149            let re = check_permission(plugins, stmt, query_ctx);
2150            if is_ok {
2151                re.unwrap();
2152            } else {
2153                assert!(re.is_err());
2154            }
2155        }
2156
2157        // test insert
2158        let sql = "INSERT INTO {catalog}{schema}monitor(host) VALUES ('host1');";
2159        replace_test(sql, plugins.clone(), &query_ctx);
2160
2161        // test create table
2162        let sql = r#"CREATE TABLE {catalog}{schema}demo(
2163                            host STRING,
2164                            ts TIMESTAMP,
2165                            TIME INDEX (ts),
2166                            PRIMARY KEY(host)
2167                        ) engine=mito;"#;
2168        replace_test(sql, plugins.clone(), &query_ctx);
2169
2170        // test drop table
2171        let sql = "DROP TABLE {catalog}{schema}demo;";
2172        replace_test(sql, plugins.clone(), &query_ctx);
2173
2174        // test show tables
2175        let sql = "SHOW TABLES FROM public";
2176        let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
2177        check_permission(plugins.clone(), &stmt[0], &query_ctx).unwrap();
2178
2179        let sql = "SHOW TABLES FROM private";
2180        let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
2181        let re = check_permission(plugins.clone(), &stmt[0], &query_ctx);
2182        assert!(re.is_ok());
2183
2184        // test describe table
2185        let sql = "DESC TABLE {catalog}{schema}demo;";
2186        replace_test(sql, plugins.clone(), &query_ctx);
2187
2188        let comment_flow_cases = [
2189            ("COMMENT ON FLOW my_flow IS 'comment';", true),
2190            ("COMMENT ON FLOW greptime.my_flow IS 'comment';", true),
2191            ("COMMENT ON FLOW wrongcatalog.my_flow IS 'comment';", false),
2192        ];
2193        for (sql, is_ok) in comment_flow_cases {
2194            let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
2195            let result = check_permission(plugins.clone(), stmt, &query_ctx);
2196            assert_eq!(result.is_ok(), is_ok);
2197        }
2198
2199        let show_flow_cases = [
2200            ("SHOW CREATE FLOW my_flow;", true),
2201            ("SHOW CREATE FLOW greptime.my_flow;", true),
2202            ("SHOW CREATE FLOW wrongcatalog.my_flow;", false),
2203        ];
2204        for (sql, is_ok) in show_flow_cases {
2205            let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
2206            let result = check_permission(plugins.clone(), stmt, &query_ctx);
2207            assert_eq!(result.is_ok(), is_ok);
2208        }
2209    }
2210}