Skip to main content

frontend/
instance.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod builder;
16mod dashboard;
17mod grpc;
18mod influxdb;
19mod jaeger;
20mod log_handler;
21mod logs;
22mod opentsdb;
23mod otlp;
24pub mod prom_store;
25mod promql;
26mod region_query;
27pub mod standalone;
28
29use std::pin::Pin;
30use std::sync::atomic::AtomicBool;
31use std::sync::{Arc, atomic};
32use std::time::{Duration, SystemTime};
33
34use async_stream::stream;
35use async_trait::async_trait;
36use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
37use catalog::CatalogManagerRef;
38use catalog::process_manager::{
39    ProcessManagerRef, QueryStatement as CatalogQueryStatement, SlowQueryTimer,
40};
41use client::OutputData;
42use common_base::Plugins;
43use common_base::cancellation::CancellableFuture;
44use common_error::ext::{BoxedError, ErrorExt};
45use common_event_recorder::EventRecorderRef;
46use common_meta::cache_invalidator::CacheInvalidatorRef;
47use common_meta::key::TableMetadataManagerRef;
48use common_meta::key::table_name::TableNameKey;
49use common_meta::node_manager::NodeManagerRef;
50use common_meta::procedure_executor::ProcedureExecutorRef;
51use common_query::Output;
52use common_recordbatch::RecordBatchStreamWrapper;
53use common_recordbatch::error::StreamTimeoutSnafu;
54use common_telemetry::logging::SlowQueryOptions;
55use common_telemetry::{debug, error, tracing};
56use dashmap::DashMap;
57use datafusion_expr::LogicalPlan;
58use futures::{Stream, StreamExt};
59use lazy_static::lazy_static;
60use operator::delete::DeleterRef;
61use operator::insert::InserterRef;
62use operator::statement::{StatementExecutor, StatementExecutorRef};
63use partition::manager::PartitionRuleManagerRef;
64use pipeline::pipeline_operator::PipelineOperator;
65use prometheus::HistogramTimer;
66use promql_parser::label::Matcher;
67use query::QueryEngineRef;
68use query::metrics::OnDone;
69use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
70use query::query_engine::DescribeResult;
71use query::query_engine::options::{QueryOptions, validate_catalog_and_schema};
72use servers::error::{
73    self as server_error, AuthSnafu, CommonMetaSnafu, ExecuteQuerySnafu,
74    OtlpMetricModeIncompatibleSnafu, ParsePromQLSnafu, UnexpectedResultSnafu,
75};
76use servers::interceptor::{
77    PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef,
78};
79use servers::otlp::metrics::legacy_normalize_otlp_name;
80use servers::prometheus_handler::PrometheusHandler;
81use servers::query_handler::sql::SqlQueryHandler;
82use session::context::{Channel, QueryContextRef};
83use session::table_name::table_idents_to_full_name;
84use snafu::prelude::*;
85use sql::ast::ObjectNamePartExt;
86use sql::dialect::Dialect;
87use sql::parser::{ParseOptions, ParserContext};
88use sql::statements::comment::CommentObject;
89use sql::statements::copy::{CopyDatabase, CopyTable};
90use sql::statements::statement::Statement;
91use sql::statements::tql::Tql;
92use sqlparser::ast::ObjectName;
93pub use standalone::StandaloneDatanodeManager;
94use table::requests::{OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM};
95use tracing::Span;
96
97use crate::error::{
98    self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, InvalidSqlSnafu,
99    ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu,
100    StatementTimeoutSnafu, TableOperationSnafu,
101};
102use crate::service_config::InfluxdbMergeMode;
103use crate::stream_wrapper::CancellableStreamWrapper;
104
105lazy_static! {
106    static ref OTLP_LEGACY_DEFAULT_VALUE: String = "legacy".to_string();
107}
108
109/// The frontend instance contains necessary components, and implements many
110/// traits, like [`servers::query_handler::grpc::GrpcQueryHandler`],
111/// [`servers::query_handler::sql::SqlQueryHandler`], etc.
112#[derive(Clone)]
113pub struct Instance {
114    frontend_peer_addr: String,
115    catalog_manager: CatalogManagerRef,
116    pipeline_operator: Arc<PipelineOperator>,
117    statement_executor: Arc<StatementExecutor>,
118    query_engine: QueryEngineRef,
119    plugins: Plugins,
120    inserter: InserterRef,
121    deleter: DeleterRef,
122    table_metadata_manager: TableMetadataManagerRef,
123    event_recorder: Option<EventRecorderRef>,
124    process_manager: ProcessManagerRef,
125    slow_query_options: SlowQueryOptions,
126    influxdb_default_merge_mode: InfluxdbMergeMode,
127    suspend: Arc<AtomicBool>,
128
129    // cache for otlp metrics
130    // first layer key: db-string
131    // key: direct input metric name
132    // value: if runs in legacy mode
133    otlp_metrics_table_legacy_cache: DashMap<String, DashMap<String, bool>>,
134}
135
136impl Instance {
137    pub fn frontend_peer_addr(&self) -> &str {
138        &self.frontend_peer_addr
139    }
140
141    pub fn catalog_manager(&self) -> &CatalogManagerRef {
142        &self.catalog_manager
143    }
144
145    pub fn query_engine(&self) -> &QueryEngineRef {
146        &self.query_engine
147    }
148
149    pub fn plugins(&self) -> &Plugins {
150        &self.plugins
151    }
152
153    pub fn statement_executor(&self) -> &StatementExecutorRef {
154        &self.statement_executor
155    }
156
157    pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
158        &self.table_metadata_manager
159    }
160
161    pub fn inserter(&self) -> &InserterRef {
162        &self.inserter
163    }
164
165    pub fn process_manager(&self) -> &ProcessManagerRef {
166        &self.process_manager
167    }
168
169    pub fn node_manager(&self) -> &NodeManagerRef {
170        self.inserter.node_manager()
171    }
172
173    pub fn partition_manager(&self) -> &PartitionRuleManagerRef {
174        self.inserter.partition_manager()
175    }
176
177    pub fn cache_invalidator(&self) -> &CacheInvalidatorRef {
178        self.statement_executor.cache_invalidator()
179    }
180
181    pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
182        self.statement_executor.procedure_executor()
183    }
184
185    pub fn suspend_state(&self) -> Arc<AtomicBool> {
186        self.suspend.clone()
187    }
188
189    pub(crate) fn is_suspended(&self) -> bool {
190        self.suspend.load(atomic::Ordering::Relaxed)
191    }
192}
193
194fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result<Vec<Statement>> {
195    ParserContext::create_with_dialect(sql, dialect, ParseOptions::default()).context(ParseSqlSnafu)
196}
197
198impl Instance {
199    async fn query_statement(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
200        check_permission(self.plugins.clone(), &stmt, &query_ctx)?;
201
202        let query_interceptor = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
203        let query_interceptor = query_interceptor.as_ref();
204
205        let is_readonly_stmt = stmt.is_readonly();
206        if should_track_statement_process(&stmt) {
207            let slow_query_timer = if is_readonly_stmt {
208                self.slow_query_options
209                    .enable
210                    .then(|| self.event_recorder.clone())
211                    .flatten()
212                    .map(|event_recorder| {
213                        SlowQueryTimer::new(
214                            CatalogQueryStatement::Sql(stmt.clone()),
215                            self.slow_query_options.threshold,
216                            self.slow_query_options.sample_ratio,
217                            self.slow_query_options.record_type,
218                            event_recorder,
219                        )
220                    })
221            } else {
222                None
223            };
224
225            let ticket = self.process_manager.register_query(
226                query_ctx.current_catalog().to_string(),
227                vec![query_ctx.current_schema()],
228                stmt.to_string(),
229                query_ctx.conn_info().to_string(),
230                Some(query_ctx.process_id()),
231                slow_query_timer,
232            );
233
234            let query_fut = self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor);
235
236            CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
237                .await
238                .map_err(|_| error::CancelledSnafu.build())?
239                .map(|output| {
240                    let Output { meta, data } = output;
241
242                    let data = match data {
243                        OutputData::Stream(stream) => OutputData::Stream(Box::pin(
244                            CancellableStreamWrapper::new(stream, ticket),
245                        )),
246                        other => other,
247                    };
248                    Output { data, meta }
249                })
250        } else {
251            self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor)
252                .await
253        }
254    }
255
256    async fn exec_statement_with_timeout(
257        &self,
258        stmt: Statement,
259        query_ctx: QueryContextRef,
260        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
261    ) -> Result<Output> {
262        let timeout = derive_timeout(&stmt, &query_ctx);
263        match timeout {
264            Some(timeout) => {
265                let start = tokio::time::Instant::now();
266                let output = tokio::time::timeout(
267                    timeout,
268                    self.exec_statement(stmt, query_ctx, query_interceptor),
269                )
270                .await
271                .map_err(|_| StatementTimeoutSnafu.build())??;
272                // compute remaining timeout
273                let remaining_timeout = timeout.checked_sub(start.elapsed()).unwrap_or_default();
274                attach_timeout(output, remaining_timeout)
275            }
276            None => {
277                self.exec_statement(stmt, query_ctx, query_interceptor)
278                    .await
279            }
280        }
281    }
282
283    async fn exec_statement(
284        &self,
285        stmt: Statement,
286        query_ctx: QueryContextRef,
287        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
288    ) -> Result<Output> {
289        match stmt {
290            Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
291                // TODO: remove this when format is supported in datafusion
292                if let Statement::Explain(explain) = &stmt
293                    && let Some(format) = explain.format()
294                {
295                    query_ctx.set_explain_format(format.to_string());
296                }
297
298                self.plan_and_exec_sql(stmt, &query_ctx, query_interceptor)
299                    .await
300            }
301            Statement::Tql(tql) => {
302                self.plan_and_exec_tql(&query_ctx, query_interceptor, tql)
303                    .await
304            }
305            _ => {
306                query_interceptor.pre_execute(Some(&stmt), None, query_ctx.clone())?;
307                self.statement_executor
308                    .execute_sql(stmt, query_ctx)
309                    .await
310                    .context(TableOperationSnafu)
311            }
312        }
313    }
314
315    async fn plan_and_exec_sql(
316        &self,
317        stmt: Statement,
318        query_ctx: &QueryContextRef,
319        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
320    ) -> Result<Output> {
321        let stmt = QueryStatement::Sql(stmt);
322        let plan = self
323            .statement_executor
324            .plan(&stmt, query_ctx.clone())
325            .await?;
326        let QueryStatement::Sql(stmt) = stmt else {
327            unreachable!()
328        };
329        query_interceptor.pre_execute(Some(&stmt), Some(&plan), query_ctx.clone())?;
330
331        self.statement_executor
332            .exec_plan(plan, query_ctx.clone())
333            .await
334            .context(TableOperationSnafu)
335    }
336
337    async fn plan_and_exec_tql(
338        &self,
339        query_ctx: &QueryContextRef,
340        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
341        tql: Tql,
342    ) -> Result<Output> {
343        let plan = self
344            .statement_executor
345            .plan_tql(tql.clone(), query_ctx)
346            .await?;
347        query_interceptor.pre_execute(
348            Some(&Statement::Tql(tql)),
349            Some(&plan),
350            query_ctx.clone(),
351        )?;
352        self.statement_executor
353            .exec_plan(plan, query_ctx.clone())
354            .await
355            .context(TableOperationSnafu)
356    }
357
358    async fn check_otlp_legacy(
359        &self,
360        names: &[&String],
361        ctx: QueryContextRef,
362    ) -> server_error::Result<bool> {
363        let db_string = ctx.get_db_string();
364        // fast cache check
365        let cache = self
366            .otlp_metrics_table_legacy_cache
367            .entry(db_string.clone())
368            .or_default();
369        if let Some(flag) = fast_legacy_check(&cache, names)? {
370            return Ok(flag);
371        }
372        // release cache reference to avoid lock contention
373        drop(cache);
374
375        let catalog = ctx.current_catalog();
376        let schema = ctx.current_schema();
377
378        // query legacy table names
379        let normalized_names = names
380            .iter()
381            .map(|n| legacy_normalize_otlp_name(n))
382            .collect::<Vec<_>>();
383        let table_names = normalized_names
384            .iter()
385            .map(|n| TableNameKey::new(catalog, &schema, n))
386            .collect::<Vec<_>>();
387        let table_values = self
388            .table_metadata_manager()
389            .table_name_manager()
390            .batch_get(table_names)
391            .await
392            .context(CommonMetaSnafu)?;
393        let table_ids = table_values
394            .into_iter()
395            .filter_map(|v| v.map(|vi| vi.table_id()))
396            .collect::<Vec<_>>();
397
398        // means no existing table is found, use new mode
399        if table_ids.is_empty() {
400            let cache = self
401                .otlp_metrics_table_legacy_cache
402                .entry(db_string)
403                .or_default();
404            names.iter().for_each(|name| {
405                cache.insert((*name).clone(), false);
406            });
407            return Ok(false);
408        }
409
410        // has existing table, check table options
411        let table_infos = self
412            .table_metadata_manager()
413            .table_info_manager()
414            .batch_get(&table_ids)
415            .await
416            .context(CommonMetaSnafu)?;
417        let options = table_infos
418            .values()
419            .map(|info| {
420                info.table_info
421                    .meta
422                    .options
423                    .extra_options
424                    .get(OTLP_METRIC_COMPAT_KEY)
425                    .unwrap_or(&OTLP_LEGACY_DEFAULT_VALUE)
426            })
427            .collect::<Vec<_>>();
428        let cache = self
429            .otlp_metrics_table_legacy_cache
430            .entry(db_string)
431            .or_default();
432        if !options.is_empty() {
433            // check value consistency
434            let has_prom = options.iter().any(|opt| *opt == OTLP_METRIC_COMPAT_PROM);
435            let has_legacy = options
436                .iter()
437                .any(|opt| *opt == OTLP_LEGACY_DEFAULT_VALUE.as_str());
438            ensure!(!(has_prom && has_legacy), OtlpMetricModeIncompatibleSnafu);
439            let flag = has_legacy;
440            names.iter().for_each(|name| {
441                cache.insert((*name).clone(), flag);
442            });
443            Ok(flag)
444        } else {
445            // no table info, use new mode
446            names.iter().for_each(|name| {
447                cache.insert((*name).clone(), false);
448            });
449            Ok(false)
450        }
451    }
452}
453
454fn fast_legacy_check(
455    cache: &DashMap<String, bool>,
456    names: &[&String],
457) -> server_error::Result<Option<bool>> {
458    let hit_cache = names
459        .iter()
460        .filter_map(|name| cache.get(*name))
461        .collect::<Vec<_>>();
462    if !hit_cache.is_empty() {
463        let hit_legacy = hit_cache.iter().any(|en| *en.value());
464        let hit_prom = hit_cache.iter().any(|en| !*en.value());
465
466        // hit but have true and false, means both legacy and new mode are used
467        // we cannot handle this case, so return error
468        // add doc links in err msg later
469        ensure!(!(hit_legacy && hit_prom), OtlpMetricModeIncompatibleSnafu);
470
471        let flag = hit_legacy;
472        // drop hit_cache to release references before inserting to avoid deadlock
473        drop(hit_cache);
474
475        // set cache for all names
476        names.iter().for_each(|name| {
477            if !cache.contains_key(*name) {
478                cache.insert((*name).clone(), flag);
479            }
480        });
481        Ok(Some(flag))
482    } else {
483        Ok(None)
484    }
485}
486
487/// If the relevant variables are set, the timeout is enforced for all PostgreSQL statements.
488/// For MySQL, it applies only to read-only statements.
489fn derive_timeout(stmt: &Statement, query_ctx: &QueryContextRef) -> Option<Duration> {
490    let query_timeout = query_ctx.query_timeout()?;
491    if query_timeout.is_zero() {
492        return None;
493    }
494    match query_ctx.channel() {
495        Channel::Mysql if stmt.is_readonly() => Some(query_timeout),
496        Channel::Postgres => Some(query_timeout),
497        _ => None,
498    }
499}
500
501/// Derives timeout for plan execution.
502fn derive_timeout_for_plan(plan: &LogicalPlan, query_ctx: &QueryContextRef) -> Option<Duration> {
503    let query_timeout = query_ctx.query_timeout()?;
504    if query_timeout.is_zero() {
505        return None;
506    }
507    match query_ctx.channel() {
508        Channel::Mysql if is_readonly_plan(plan) => Some(query_timeout),
509        Channel::Postgres => Some(query_timeout),
510        _ => None,
511    }
512}
513
514fn attach_timeout(output: Output, mut timeout: Duration) -> Result<Output> {
515    if timeout.is_zero() {
516        return StatementTimeoutSnafu.fail();
517    }
518
519    let output = match output.data {
520        OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
521        OutputData::Stream(mut stream) => {
522            let schema = stream.schema();
523            let s = Box::pin(stream! {
524                let mut start = tokio::time::Instant::now();
525                while let Some(item) = tokio::time::timeout(timeout, stream.next()).await.map_err(|_| StreamTimeoutSnafu.build())? {
526                    yield item;
527
528                    let now = tokio::time::Instant::now();
529                    timeout = timeout.checked_sub(now - start).unwrap_or(Duration::ZERO);
530                    start = now;
531                    // tokio::time::timeout may not return an error immediately when timeout is 0.
532                    if timeout.is_zero() {
533                        StreamTimeoutSnafu.fail()?;
534                    }
535                }
536            }) as Pin<Box<dyn Stream<Item = _> + Send>>;
537            let stream = RecordBatchStreamWrapper {
538                schema,
539                stream: s,
540                output_ordering: None,
541                metrics: Default::default(),
542                span: Span::current(),
543            };
544            Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
545        }
546    };
547
548    Ok(output)
549}
550
551impl Instance {
552    #[tracing::instrument(skip_all, name = "SqlQueryHandler::do_query")]
553    async fn do_query_inner(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
554        if self.is_suspended() {
555            return vec![error::SuspendedSnafu {}.fail()];
556        }
557
558        let query_interceptor_opt = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
559        let query_interceptor = query_interceptor_opt.as_ref();
560        let query = match query_interceptor.pre_parsing(query, query_ctx.clone()) {
561            Ok(q) => q,
562            Err(e) => return vec![Err(e)],
563        };
564
565        let checker_ref = self.plugins.get::<PermissionCheckerRef>();
566        let checker = checker_ref.as_ref();
567
568        match parse_stmt(query.as_ref(), query_ctx.sql_dialect())
569            .and_then(|stmts| query_interceptor.post_parsing(stmts, query_ctx.clone()))
570        {
571            Ok(stmts) => {
572                if stmts.is_empty() {
573                    return vec![
574                        InvalidSqlSnafu {
575                            err_msg: "empty statements",
576                        }
577                        .fail(),
578                    ];
579                }
580
581                let mut results = Vec::with_capacity(stmts.len());
582                for stmt in stmts {
583                    if let Err(e) = checker
584                        .check_permission(
585                            query_ctx.current_user(),
586                            PermissionReq::SqlStatement(&stmt),
587                        )
588                        .context(PermissionSnafu)
589                    {
590                        results.push(Err(e));
591                        break;
592                    }
593
594                    match self.query_statement(stmt.clone(), query_ctx.clone()).await {
595                        Ok(output) => {
596                            let output_result =
597                                query_interceptor.post_execute(output, query_ctx.clone());
598                            results.push(output_result);
599                        }
600                        Err(e) => {
601                            if e.status_code().should_log_error() {
602                                error!(e; "Failed to execute query: {stmt}");
603                            } else {
604                                debug!("Failed to execute query: {stmt}, {e}");
605                            }
606                            results.push(Err(e));
607                            break;
608                        }
609                    }
610                }
611                results
612            }
613            Err(e) => {
614                vec![Err(e)]
615            }
616        }
617    }
618
619    async fn exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
620        self.query_engine
621            .execute(plan, query_ctx)
622            .await
623            .context(ExecLogicalPlanSnafu)
624    }
625
626    async fn exec_plan_with_timeout(
627        &self,
628        plan: LogicalPlan,
629        query_ctx: QueryContextRef,
630    ) -> Result<Output> {
631        let timeout = derive_timeout_for_plan(&plan, &query_ctx);
632        match timeout {
633            Some(timeout) => {
634                let start = tokio::time::Instant::now();
635                let output = tokio::time::timeout(timeout, self.exec_plan(plan, query_ctx))
636                    .await
637                    .map_err(|_| StatementTimeoutSnafu.build())??;
638                let remaining_timeout = timeout.checked_sub(start.elapsed()).unwrap_or_default();
639                attach_timeout(output, remaining_timeout)
640            }
641            None => self.exec_plan(plan, query_ctx).await,
642        }
643    }
644
645    async fn do_exec_plan_inner(
646        &self,
647        plan: LogicalPlan,
648        stmt: Option<Statement>,
649        query_ctx: QueryContextRef,
650    ) -> Result<Output> {
651        ensure!(!self.is_suspended(), error::SuspendedSnafu);
652
653        let query_interceptor_opt = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
654        let query_interceptor = query_interceptor_opt.as_ref();
655
656        query_interceptor.pre_execute(stmt.as_ref(), Some(&plan), query_ctx.clone())?;
657
658        let query = stmt
659            .as_ref()
660            .map(|s| s.to_string())
661            .unwrap_or_else(|| plan.display_indent().to_string());
662
663        let plan_is_readonly = is_readonly_plan(&plan);
664        let result = if should_track_plan_process(stmt.as_ref(), &plan) {
665            let slow_query_timer = if plan_is_readonly {
666                self.slow_query_options
667                    .enable
668                    .then(|| self.event_recorder.clone())
669                    .flatten()
670                    .map(|event_recorder| {
671                        SlowQueryTimer::new(
672                            CatalogQueryStatement::Plan(query.clone()),
673                            self.slow_query_options.threshold,
674                            self.slow_query_options.sample_ratio,
675                            self.slow_query_options.record_type,
676                            event_recorder,
677                        )
678                    })
679            } else {
680                None
681            };
682
683            let ticket = self.process_manager.register_query(
684                query_ctx.current_catalog().to_string(),
685                vec![query_ctx.current_schema()],
686                query,
687                query_ctx.conn_info().to_string(),
688                Some(query_ctx.process_id()),
689                slow_query_timer,
690            );
691
692            let query_fut = self.exec_plan_with_timeout(plan, query_ctx.clone());
693
694            CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
695                .await
696                .map_err(|_| error::CancelledSnafu.build())?
697                .map(|output| {
698                    let Output { meta, data } = output;
699
700                    let data = match data {
701                        OutputData::Stream(stream) => OutputData::Stream(Box::pin(
702                            CancellableStreamWrapper::new(stream, ticket),
703                        )),
704                        other => other,
705                    };
706                    Output { data, meta }
707                })
708        } else {
709            self.exec_plan_with_timeout(plan, query_ctx.clone()).await
710        };
711
712        result.and_then(|output| query_interceptor.post_execute(output, query_ctx))
713    }
714
715    #[tracing::instrument(skip_all, name = "SqlQueryHandler::do_promql_query")]
716    async fn do_promql_query_inner(
717        &self,
718        query: &PromQuery,
719        query_ctx: QueryContextRef,
720    ) -> Vec<Result<Output>> {
721        if self.is_suspended() {
722            return vec![error::SuspendedSnafu {}.fail()];
723        }
724
725        // check will be done in prometheus handler's do_query
726        let result = PrometheusHandler::do_query(self, query, query_ctx)
727            .await
728            .with_context(|_| ExecutePromqlSnafu {
729                query: format!("{query:?}"),
730            });
731        vec![result]
732    }
733
734    async fn do_describe_inner(
735        &self,
736        stmt: Statement,
737        query_ctx: QueryContextRef,
738    ) -> Result<Option<DescribeResult>> {
739        ensure!(!self.is_suspended(), error::SuspendedSnafu);
740
741        // EXPLAIN / EXPLAIN ANALYZE wrap an inner statement; describe them when the
742        // wrapped statement is something we already plan (so that bind parameters
743        // in the inner query get their types inferred). See #8029.
744        let is_inner_plannable = |s: &Statement| {
745            matches!(
746                s,
747                Statement::Insert(_) | Statement::Query(_) | Statement::Delete(_)
748            )
749        };
750        let plannable = is_inner_plannable(&stmt)
751            || matches!(&stmt, Statement::Explain(explain) if is_inner_plannable(explain.statement.as_ref()));
752
753        if plannable {
754            self.plugins
755                .get::<PermissionCheckerRef>()
756                .as_ref()
757                .check_permission(query_ctx.current_user(), PermissionReq::SqlStatement(&stmt))
758                .context(PermissionSnafu)?;
759
760            let plan = self
761                .query_engine
762                .planner()
763                .plan(&QueryStatement::Sql(stmt), query_ctx.clone())
764                .await
765                .context(PlanStatementSnafu)?;
766            self.query_engine
767                .describe(plan, query_ctx)
768                .await
769                .map(Some)
770                .context(error::DescribeStatementSnafu)
771        } else {
772            Ok(None)
773        }
774    }
775
776    async fn is_valid_schema_inner(&self, catalog: &str, schema: &str) -> Result<bool> {
777        self.catalog_manager
778            .schema_exists(catalog, schema, None)
779            .await
780            .context(error::CatalogSnafu)
781    }
782}
783
784#[async_trait]
785impl SqlQueryHandler for Instance {
786    async fn do_query(
787        &self,
788        query: &str,
789        query_ctx: QueryContextRef,
790    ) -> Vec<server_error::Result<Output>> {
791        self.do_query_inner(query, query_ctx)
792            .await
793            .into_iter()
794            .map(|result| result.map_err(BoxedError::new).context(ExecuteQuerySnafu))
795            .collect()
796    }
797
798    async fn do_exec_plan(
799        &self,
800        plan: LogicalPlan,
801        stmt: Option<Statement>,
802        query_ctx: QueryContextRef,
803    ) -> server_error::Result<Output> {
804        self.do_exec_plan_inner(plan, stmt, query_ctx)
805            .await
806            .map_err(BoxedError::new)
807            .context(server_error::ExecutePlanSnafu)
808    }
809
810    async fn do_promql_query(
811        &self,
812        query: &PromQuery,
813        query_ctx: QueryContextRef,
814    ) -> Vec<server_error::Result<Output>> {
815        self.do_promql_query_inner(query, query_ctx)
816            .await
817            .into_iter()
818            .map(|result| result.map_err(BoxedError::new).context(ExecuteQuerySnafu))
819            .collect()
820    }
821
822    async fn do_describe(
823        &self,
824        stmt: Statement,
825        query_ctx: QueryContextRef,
826    ) -> server_error::Result<Option<DescribeResult>> {
827        self.do_describe_inner(stmt, query_ctx)
828            .await
829            .map_err(BoxedError::new)
830            .context(server_error::DescribeStatementSnafu)
831    }
832
833    async fn is_valid_schema(&self, catalog: &str, schema: &str) -> server_error::Result<bool> {
834        self.is_valid_schema_inner(catalog, schema)
835            .await
836            .map_err(BoxedError::new)
837            .context(server_error::CheckDatabaseValiditySnafu)
838    }
839}
840
841/// Attaches a timer to the output and observes it once the output is exhausted.
842pub fn attach_timer(output: Output, timer: HistogramTimer) -> Output {
843    match output.data {
844        OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
845        OutputData::Stream(stream) => {
846            let stream = OnDone::new(stream, move || {
847                timer.observe_duration();
848            });
849            Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
850        }
851    }
852}
853
854#[async_trait]
855impl PrometheusHandler for Instance {
856    #[tracing::instrument(skip_all)]
857    async fn do_query(
858        &self,
859        query: &PromQuery,
860        query_ctx: QueryContextRef,
861    ) -> server_error::Result<Output> {
862        let interceptor = self
863            .plugins
864            .get::<PromQueryInterceptorRef<server_error::Error>>();
865
866        self.plugins
867            .get::<PermissionCheckerRef>()
868            .as_ref()
869            .check_permission(query_ctx.current_user(), PermissionReq::PromQuery)
870            .context(AuthSnafu)?;
871
872        let stmt = QueryLanguageParser::parse_promql(query, &query_ctx).with_context(|_| {
873            ParsePromQLSnafu {
874                query: query.clone(),
875            }
876        })?;
877
878        let plan = self
879            .statement_executor
880            .plan(&stmt, query_ctx.clone())
881            .await
882            .map_err(BoxedError::new)
883            .context(ExecuteQuerySnafu)?;
884
885        let QueryStatement::Promql(eval_stmt, _) = &stmt else {
886            unreachable!("query is parsed from promql");
887        };
888
889        interceptor.pre_execute(query, &eval_stmt.expr, Some(&plan), query_ctx.clone())?;
890
891        // Take the EvalStmt from the original QueryStatement and use it to create the CatalogQueryStatement.
892        let query_statement = if let QueryStatement::Promql(eval_stmt, alias) = stmt {
893            CatalogQueryStatement::Promql(eval_stmt, alias)
894        } else {
895            // It should not happen since the query is already parsed successfully.
896            return UnexpectedResultSnafu {
897                reason: "The query should always be promql.".to_string(),
898            }
899            .fail();
900        };
901        let raw_query = query_statement.to_string();
902
903        let slow_query_timer = self
904            .slow_query_options
905            .enable
906            .then(|| self.event_recorder.clone())
907            .flatten()
908            .map(|event_recorder| {
909                SlowQueryTimer::new(
910                    query_statement,
911                    self.slow_query_options.threshold,
912                    self.slow_query_options.sample_ratio,
913                    self.slow_query_options.record_type,
914                    event_recorder,
915                )
916            });
917
918        let ticket = self.process_manager.register_query(
919            query_ctx.current_catalog().to_string(),
920            vec![query_ctx.current_schema()],
921            raw_query,
922            query_ctx.conn_info().to_string(),
923            Some(query_ctx.process_id()),
924            slow_query_timer,
925        );
926
927        let query_fut = self.statement_executor.exec_plan(plan, query_ctx.clone());
928
929        let output = CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
930            .await
931            .map_err(|_| servers::error::CancelledSnafu.build())?
932            .map(|output| {
933                let Output { meta, data } = output;
934                let data = match data {
935                    OutputData::Stream(stream) => {
936                        OutputData::Stream(Box::pin(CancellableStreamWrapper::new(stream, ticket)))
937                    }
938                    other => other,
939                };
940                Output { data, meta }
941            })
942            .map_err(BoxedError::new)
943            .context(ExecuteQuerySnafu)?;
944
945        Ok(interceptor.post_execute(output, query_ctx)?)
946    }
947
948    async fn query_metric_names(
949        &self,
950        matchers: Vec<Matcher>,
951        ctx: &QueryContextRef,
952    ) -> server_error::Result<Vec<String>> {
953        self.handle_query_metric_names(matchers, ctx)
954            .await
955            .map_err(BoxedError::new)
956            .context(ExecuteQuerySnafu)
957    }
958
959    async fn query_label_values(
960        &self,
961        metric: String,
962        label_name: String,
963        matchers: Vec<Matcher>,
964        start: SystemTime,
965        end: SystemTime,
966        ctx: &QueryContextRef,
967    ) -> server_error::Result<Vec<String>> {
968        self.handle_query_label_values(metric, label_name, matchers, start, end, ctx)
969            .await
970            .map_err(BoxedError::new)
971            .context(ExecuteQuerySnafu)
972    }
973
974    fn catalog_manager(&self) -> CatalogManagerRef {
975        self.catalog_manager.clone()
976    }
977}
978
979/// Validate `stmt.database` permission if it's presented.
980macro_rules! validate_db_permission {
981    ($stmt: expr, $query_ctx: expr) => {
982        if let Some(database) = &$stmt.database {
983            validate_catalog_and_schema($query_ctx.current_catalog(), database, $query_ctx)
984                .map_err(BoxedError::new)
985                .context(SqlExecInterceptedSnafu)?;
986        }
987    };
988}
989
990pub fn check_permission(
991    plugins: Plugins,
992    stmt: &Statement,
993    query_ctx: &QueryContextRef,
994) -> Result<()> {
995    let need_validate = plugins
996        .get::<QueryOptions>()
997        .map(|opts| opts.disallow_cross_catalog_query)
998        .unwrap_or_default();
999
1000    if !need_validate {
1001        return Ok(());
1002    }
1003
1004    match stmt {
1005        // Will be checked in execution.
1006        // TODO(dennis): add a hook for admin commands.
1007        Statement::Admin(_) => {}
1008        // These are executed by query engine, and will be checked there.
1009        Statement::Query(_)
1010        | Statement::Explain(_)
1011        | Statement::Tql(_)
1012        | Statement::Delete(_)
1013        | Statement::DeclareCursor(_)
1014        | Statement::Copy(sql::statements::copy::Copy::CopyQueryTo(_)) => {}
1015        // database ops won't be checked
1016        Statement::CreateDatabase(_)
1017        | Statement::ShowDatabases(_)
1018        | Statement::DropDatabase(_)
1019        | Statement::AlterDatabase(_)
1020        | Statement::DropFlow(_)
1021        | Statement::Use(_) => {}
1022        #[cfg(feature = "enterprise")]
1023        Statement::DropTrigger(_) => {}
1024        Statement::ShowCreateDatabase(stmt) => {
1025            validate_database(&stmt.database_name, query_ctx)?;
1026        }
1027        Statement::ShowCreateTable(stmt) => {
1028            validate_param(&stmt.table_name, query_ctx)?;
1029        }
1030        Statement::ShowCreateFlow(stmt) => {
1031            validate_flow(&stmt.flow_name, query_ctx)?;
1032        }
1033        #[cfg(feature = "enterprise")]
1034        Statement::ShowCreateTrigger(stmt) => {
1035            validate_param(&stmt.trigger_name, query_ctx)?;
1036        }
1037        Statement::ShowCreateView(stmt) => {
1038            validate_param(&stmt.view_name, query_ctx)?;
1039        }
1040        Statement::CreateExternalTable(stmt) => {
1041            validate_param(&stmt.name, query_ctx)?;
1042        }
1043        Statement::CreateFlow(stmt) => {
1044            // TODO: should also validate source table name here?
1045            validate_param(&stmt.sink_table_name, query_ctx)?;
1046        }
1047        #[cfg(feature = "enterprise")]
1048        Statement::CreateTrigger(stmt) => {
1049            validate_param(&stmt.trigger_name, query_ctx)?;
1050        }
1051        Statement::CreateView(stmt) => {
1052            validate_param(&stmt.name, query_ctx)?;
1053        }
1054        Statement::AlterTable(stmt) => {
1055            validate_param(stmt.table_name(), query_ctx)?;
1056        }
1057        #[cfg(feature = "enterprise")]
1058        Statement::AlterTrigger(_) => {}
1059        // set/show variable now only alter/show variable in session
1060        Statement::SetVariables(_) | Statement::ShowVariables(_) => {}
1061        // show charset and show collation won't be checked
1062        Statement::ShowCharset(_) | Statement::ShowCollation(_) => {}
1063
1064        Statement::Comment(comment) => match &comment.object {
1065            CommentObject::Table(table) => validate_param(table, query_ctx)?,
1066            CommentObject::Column { table, .. } => validate_param(table, query_ctx)?,
1067            CommentObject::Flow(flow) => validate_flow(flow, query_ctx)?,
1068        },
1069
1070        Statement::Insert(insert) => {
1071            let name = insert.table_name().context(ParseSqlSnafu)?;
1072            validate_param(name, query_ctx)?;
1073        }
1074        Statement::CreateTable(stmt) => {
1075            validate_param(&stmt.name, query_ctx)?;
1076        }
1077        Statement::CreateTableLike(stmt) => {
1078            validate_param(&stmt.table_name, query_ctx)?;
1079            validate_param(&stmt.source_name, query_ctx)?;
1080        }
1081        Statement::DropTable(drop_stmt) => {
1082            for table_name in drop_stmt.table_names() {
1083                validate_param(table_name, query_ctx)?;
1084            }
1085        }
1086        Statement::DropView(stmt) => {
1087            validate_param(&stmt.view_name, query_ctx)?;
1088        }
1089        Statement::ShowTables(stmt) => {
1090            validate_db_permission!(stmt, query_ctx);
1091        }
1092        Statement::ShowTableStatus(stmt) => {
1093            validate_db_permission!(stmt, query_ctx);
1094        }
1095        Statement::ShowColumns(stmt) => {
1096            validate_db_permission!(stmt, query_ctx);
1097        }
1098        Statement::ShowIndex(stmt) => {
1099            validate_db_permission!(stmt, query_ctx);
1100        }
1101        Statement::ShowRegion(stmt) => {
1102            validate_db_permission!(stmt, query_ctx);
1103        }
1104        Statement::ShowViews(stmt) => {
1105            validate_db_permission!(stmt, query_ctx);
1106        }
1107        Statement::ShowFlows(stmt) => {
1108            validate_db_permission!(stmt, query_ctx);
1109        }
1110        #[cfg(feature = "enterprise")]
1111        Statement::ShowTriggers(_stmt) => {
1112            // The trigger is organized based on the catalog dimension, so there
1113            // is no need to check the permission of the database(schema).
1114        }
1115        Statement::ShowStatus(_stmt) => {}
1116        Statement::ShowSearchPath(_stmt) => {}
1117        Statement::DescribeTable(stmt) => {
1118            validate_param(stmt.name(), query_ctx)?;
1119        }
1120        Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => match stmt {
1121            CopyTable::To(copy_table_to) => validate_param(&copy_table_to.table_name, query_ctx)?,
1122            CopyTable::From(copy_table_from) => {
1123                validate_param(&copy_table_from.table_name, query_ctx)?
1124            }
1125        },
1126        Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
1127            match copy_database {
1128                CopyDatabase::To(stmt) => validate_database(&stmt.database_name, query_ctx)?,
1129                CopyDatabase::From(stmt) => validate_database(&stmt.database_name, query_ctx)?,
1130            }
1131        }
1132        Statement::TruncateTable(stmt) => {
1133            validate_param(stmt.table_name(), query_ctx)?;
1134        }
1135        // cursor operations are always allowed once it's created
1136        Statement::FetchCursor(_) | Statement::CloseCursor(_) => {}
1137        // User can only kill process in their own catalog.
1138        Statement::Kill(_) => {}
1139        // SHOW PROCESSLIST
1140        Statement::ShowProcesslist(_) => {}
1141    }
1142    Ok(())
1143}
1144
1145fn validate_param(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1146    let (catalog, schema, _) = table_idents_to_full_name(name, query_ctx)
1147        .map_err(BoxedError::new)
1148        .context(ExternalSnafu)?;
1149
1150    validate_catalog_and_schema(&catalog, &schema, query_ctx)
1151        .map_err(BoxedError::new)
1152        .context(SqlExecInterceptedSnafu)
1153}
1154
1155fn validate_flow(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1156    let catalog = match &name.0[..] {
1157        [_flow] => query_ctx.current_catalog().to_string(),
1158        [catalog, _flow] => catalog.to_string_unquoted(),
1159        _ => {
1160            return InvalidSqlSnafu {
1161                err_msg: format!(
1162                    "expect flow name to be <catalog>.<flow_name> or <flow_name>, actual: {name}",
1163                ),
1164            }
1165            .fail();
1166        }
1167    };
1168
1169    let schema = query_ctx.current_schema();
1170
1171    validate_catalog_and_schema(&catalog, &schema, query_ctx)
1172        .map_err(BoxedError::new)
1173        .context(SqlExecInterceptedSnafu)
1174}
1175
1176fn validate_database(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1177    let (catalog, schema) = match &name.0[..] {
1178        [schema] => (
1179            query_ctx.current_catalog().to_string(),
1180            schema.to_string_unquoted(),
1181        ),
1182        [catalog, schema] => (catalog.to_string_unquoted(), schema.to_string_unquoted()),
1183        _ => InvalidSqlSnafu {
1184            err_msg: format!(
1185                "expect database name to be <catalog>.<schema> or <schema>, actual: {name}",
1186            ),
1187        }
1188        .fail()?,
1189    };
1190
1191    validate_catalog_and_schema(&catalog, &schema, query_ctx)
1192        .map_err(BoxedError::new)
1193        .context(SqlExecInterceptedSnafu)
1194}
1195
1196fn is_readonly_plan(plan: &LogicalPlan) -> bool {
1197    !matches!(plan, LogicalPlan::Dml(_) | LogicalPlan::Ddl(_))
1198}
1199
1200fn should_track_statement_process(stmt: &Statement) -> bool {
1201    stmt.is_readonly()
1202        || matches!(stmt, Statement::Insert(insert) if insert.has_non_values_query_source())
1203}
1204
1205fn should_track_plan_process(stmt: Option<&Statement>, plan: &LogicalPlan) -> bool {
1206    is_readonly_plan(plan)
1207        || matches!(stmt, Some(Statement::Insert(insert)) if insert.has_non_values_query_source())
1208}
1209
1210#[cfg(test)]
1211mod tests {
1212    use std::collections::HashMap;
1213    use std::future::Future;
1214    use std::pin::Pin;
1215    use std::sync::atomic::{AtomicBool, Ordering};
1216    use std::sync::{Arc, Barrier};
1217    use std::task::{Context, Poll};
1218    use std::thread;
1219    use std::time::{Duration, Instant};
1220
1221    use api::v1::meta::{ProcedureDetailResponse, ReconcileRequest, ReconcileResponse};
1222    use catalog::process_manager::ProcessManager;
1223    use common_base::Plugins;
1224    use common_error::ext::{BoxedError, PlainError};
1225    use common_error::status_code::StatusCode;
1226    use common_meta::cache::LayeredCacheRegistryBuilder;
1227    use common_meta::kv_backend::memory::MemoryKvBackend;
1228    use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutor};
1229    use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
1230    use common_meta::rpc::procedure::{
1231        MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
1232    };
1233    use common_query::Output;
1234    use common_recordbatch::{
1235        OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream,
1236    };
1237    use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
1238    use datafusion_expr::dml::InsertOp;
1239    use datafusion_expr::{LogicalPlanBuilder, LogicalTableSource};
1240    use datatypes::prelude::ConcreteDataType;
1241    use datatypes::schema::{ColumnSchema, Schema as GtSchema, SchemaRef as GtSchemaRef};
1242    use query::query_engine::options::QueryOptions;
1243    use session::context::{Channel, ConnInfo, QueryContext, QueryContextBuilder};
1244    use snafu::{Location, Snafu};
1245    use sql::dialect::GreptimeDbDialect;
1246    use store_api::data_source::DataSource;
1247    use store_api::storage::ScanRequest;
1248    use strfmt::Format;
1249    use table::metadata::{FilterPushDownType, TableInfo, TableInfoBuilder, TableMetaBuilder};
1250    use table::test_util::EmptyTable;
1251    use table::{Table, TableRef};
1252    use tokio::sync::{mpsc, oneshot};
1253
1254    use super::*;
1255    use crate::frontend::FrontendOptions;
1256    use crate::instance::builder::FrontendBuilder;
1257
1258    #[derive(Debug, Snafu)]
1259    enum TestError {
1260        #[snafu(display("Failed to build test cache registry"))]
1261        BuildCacheRegistry {
1262            source: cache::error::Error,
1263            #[snafu(implicit)]
1264            location: Location,
1265        },
1266
1267        #[snafu(display("Failed to build test table meta for table: {table_name}"))]
1268        BuildTableMeta {
1269            table_name: String,
1270            source: table::metadata::TableMetaBuilderError,
1271            #[snafu(implicit)]
1272            location: Location,
1273        },
1274
1275        #[snafu(display("Failed to build test table info for table: {table_name}"))]
1276        BuildTableInfo {
1277            table_name: String,
1278            source: table::metadata::TableInfoBuilderError,
1279            #[snafu(implicit)]
1280            location: Location,
1281        },
1282
1283        #[snafu(display("Failed to register test table: {table_name}"))]
1284        RegisterTable {
1285            table_name: String,
1286            source: catalog::error::Error,
1287            #[snafu(implicit)]
1288            location: Location,
1289        },
1290
1291        #[snafu(display("Failed to build test frontend instance"))]
1292        BuildFrontend {
1293            source: crate::error::Error,
1294            #[snafu(implicit)]
1295            location: Location,
1296        },
1297
1298        #[snafu(display("Expected exactly one output for SQL `{sql}`, got {actual}"))]
1299        UnexpectedOutputCount {
1300            sql: String,
1301            actual: usize,
1302            #[snafu(implicit)]
1303            location: Location,
1304        },
1305
1306        #[snafu(display("Failed to execute SQL `{sql}`"))]
1307        ExecuteSql {
1308            sql: String,
1309            source: crate::error::Error,
1310            #[snafu(implicit)]
1311            location: Location,
1312        },
1313
1314        #[snafu(display("Timed out waiting for insert-select start notification"))]
1315        InsertStartTimeout {
1316            source: tokio::time::error::Elapsed,
1317            #[snafu(implicit)]
1318            location: Location,
1319        },
1320
1321        #[snafu(display("Insert-select start notification channel closed"))]
1322        InsertStartChannelClosed {
1323            #[snafu(implicit)]
1324            location: Location,
1325        },
1326
1327        #[snafu(display("Failed to release blocking insert-select interceptor"))]
1328        ReleaseBlockedInsert {
1329            #[snafu(implicit)]
1330            location: Location,
1331        },
1332
1333        #[snafu(display("Timed out waiting for insert-select source to be polled"))]
1334        SourcePollTimeout {
1335            source: tokio::time::error::Elapsed,
1336            #[snafu(implicit)]
1337            location: Location,
1338        },
1339
1340        #[snafu(display("Insert-select source poll notification channel closed"))]
1341        SourcePollChannelClosed {
1342            source: oneshot::error::RecvError,
1343            #[snafu(implicit)]
1344            location: Location,
1345        },
1346
1347        #[snafu(display("Timed out waiting for insert task to finish"))]
1348        InsertTaskTimeout {
1349            source: tokio::time::error::Elapsed,
1350            #[snafu(implicit)]
1351            location: Location,
1352        },
1353
1354        #[snafu(display("Insert task panicked"))]
1355        InsertTaskPanic {
1356            source: tokio::task::JoinError,
1357            #[snafu(implicit)]
1358            location: Location,
1359        },
1360
1361        #[snafu(display("Expected insert-select to be cancelled"))]
1362        InsertSelectNotCancelled {
1363            #[snafu(implicit)]
1364            location: Location,
1365        },
1366    }
1367
1368    type TestResult<T> = std::result::Result<T, TestError>;
1369
1370    fn parse_one_sql(sql: &str) -> Statement {
1371        parse_stmt(sql, &GreptimeDbDialect {}).unwrap().remove(0)
1372    }
1373
1374    fn test_query_ctx(process_id: u32) -> QueryContextRef {
1375        Arc::new(
1376            QueryContextBuilder::default()
1377                .channel(Channel::Mysql)
1378                .conn_info(ConnInfo::new(None, Channel::Mysql))
1379                .process_id(process_id)
1380                .build(),
1381        )
1382    }
1383
1384    struct BlockingInsertSelectInterceptor {
1385        started_tx: mpsc::UnboundedSender<()>,
1386        finish_rx: std::sync::Mutex<Option<oneshot::Receiver<()>>>,
1387    }
1388
1389    impl BlockingInsertSelectInterceptor {
1390        fn new(started_tx: mpsc::UnboundedSender<()>, finish_rx: oneshot::Receiver<()>) -> Self {
1391            Self {
1392                started_tx,
1393                finish_rx: std::sync::Mutex::new(Some(finish_rx)),
1394            }
1395        }
1396    }
1397
1398    impl SqlQueryInterceptor for BlockingInsertSelectInterceptor {
1399        type Error = Error;
1400
1401        fn pre_execute(
1402            &self,
1403            statement: Option<&Statement>,
1404            _plan: Option<&LogicalPlan>,
1405            _query_ctx: QueryContextRef,
1406        ) -> Result<()> {
1407            let Some(Statement::Insert(insert)) = statement else {
1408                return Ok(());
1409            };
1410            if !insert.has_non_values_query_source() {
1411                return Ok(());
1412            }
1413
1414            let finish_rx = self.finish_rx.lock().unwrap().take().unwrap();
1415            let _ = self.started_tx.send(());
1416            tokio::task::block_in_place(|| {
1417                tokio::runtime::Handle::current()
1418                    .block_on(finish_rx)
1419                    .unwrap();
1420            });
1421            Ok(())
1422        }
1423    }
1424
1425    struct PendingRecordBatchStream {
1426        schema: GtSchemaRef,
1427        polled_tx: Option<oneshot::Sender<()>>,
1428        _finish_tx: oneshot::Sender<()>,
1429        finish_rx: Pin<Box<oneshot::Receiver<()>>>,
1430    }
1431
1432    impl RecordBatchStream for PendingRecordBatchStream {
1433        fn schema(&self) -> GtSchemaRef {
1434            self.schema.clone()
1435        }
1436
1437        fn output_ordering(&self) -> Option<&[OrderOption]> {
1438            None
1439        }
1440
1441        fn metrics(&self) -> Option<common_recordbatch::adapter::RecordBatchMetrics> {
1442            None
1443        }
1444    }
1445
1446    impl Stream for PendingRecordBatchStream {
1447        type Item = common_recordbatch::error::Result<RecordBatch>;
1448
1449        fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1450            if let Some(polled_tx) = self.polled_tx.take() {
1451                let _ = polled_tx.send(());
1452            }
1453
1454            match self.finish_rx.as_mut().poll(cx) {
1455                Poll::Ready(_) => Poll::Ready(None),
1456                Poll::Pending => Poll::Pending,
1457            }
1458        }
1459    }
1460
1461    impl Unpin for PendingRecordBatchStream {}
1462
1463    struct PendingDataSource {
1464        schema: GtSchemaRef,
1465        polled_tx: std::sync::Mutex<Option<oneshot::Sender<()>>>,
1466    }
1467
1468    impl DataSource for PendingDataSource {
1469        fn get_stream(
1470            &self,
1471            _request: ScanRequest,
1472        ) -> std::result::Result<SendableRecordBatchStream, BoxedError> {
1473            let (finish_tx, finish_rx) = oneshot::channel();
1474            let mut polled_tx = self.polled_tx.lock().map_err(|_| {
1475                BoxedError::new(PlainError::new(
1476                    "pending data source lock poisoned".to_string(),
1477                    StatusCode::Unexpected,
1478                ))
1479            })?;
1480            Ok(Box::pin(PendingRecordBatchStream {
1481                schema: self.schema.clone(),
1482                polled_tx: polled_tx.take(),
1483                _finish_tx: finish_tx,
1484                finish_rx: Box::pin(finish_rx),
1485            }))
1486        }
1487    }
1488
1489    struct NoopProcedureExecutor;
1490
1491    #[async_trait::async_trait]
1492    impl ProcedureExecutor for NoopProcedureExecutor {
1493        async fn submit_ddl_task(
1494            &self,
1495            _ctx: &ExecutorContext,
1496            _request: SubmitDdlTaskRequest,
1497        ) -> common_meta::error::Result<SubmitDdlTaskResponse> {
1498            common_meta::error::UnsupportedSnafu {
1499                operation: "submit_ddl_task",
1500            }
1501            .fail()
1502        }
1503
1504        async fn migrate_region(
1505            &self,
1506            _ctx: &ExecutorContext,
1507            _request: MigrateRegionRequest,
1508        ) -> common_meta::error::Result<MigrateRegionResponse> {
1509            common_meta::error::UnsupportedSnafu {
1510                operation: "migrate_region",
1511            }
1512            .fail()
1513        }
1514
1515        async fn reconcile(
1516            &self,
1517            _ctx: &ExecutorContext,
1518            _request: ReconcileRequest,
1519        ) -> common_meta::error::Result<ReconcileResponse> {
1520            common_meta::error::UnsupportedSnafu {
1521                operation: "reconcile",
1522            }
1523            .fail()
1524        }
1525
1526        async fn query_procedure_state(
1527            &self,
1528            _ctx: &ExecutorContext,
1529            _pid: &str,
1530        ) -> common_meta::error::Result<ProcedureStateResponse> {
1531            common_meta::error::UnsupportedSnafu {
1532                operation: "query_procedure_state",
1533            }
1534            .fail()
1535        }
1536
1537        async fn list_procedures(
1538            &self,
1539            _ctx: &ExecutorContext,
1540        ) -> common_meta::error::Result<ProcedureDetailResponse> {
1541            common_meta::error::UnsupportedSnafu {
1542                operation: "list_procedures",
1543            }
1544            .fail()
1545        }
1546    }
1547
1548    fn test_cache_registry(
1549        kv_backend: common_meta::kv_backend::KvBackendRef,
1550    ) -> TestResult<common_meta::cache::LayeredCacheRegistryRef> {
1551        Ok(Arc::new(
1552            cache::with_default_composite_cache_registry(
1553                LayeredCacheRegistryBuilder::default()
1554                    .add_cache_registry(cache::build_fundamental_cache_registry(kv_backend)),
1555            )
1556            .context(BuildCacheRegistrySnafu)?
1557            .build(),
1558        ))
1559    }
1560
1561    fn test_table_info(table_id: u32, table_name: &str) -> TestResult<TableInfo> {
1562        let schema = Arc::new(GtSchema::new(vec![
1563            ColumnSchema::new("id", ConcreteDataType::int32_datatype(), false),
1564            ColumnSchema::new(
1565                "ts",
1566                ConcreteDataType::timestamp_millisecond_datatype(),
1567                false,
1568            )
1569            .with_time_index(true),
1570        ]));
1571        let table_meta = TableMetaBuilder::empty()
1572            .schema(schema)
1573            .primary_key_indices(vec![0])
1574            .value_indices(vec![1])
1575            .next_column_id(1024)
1576            .build()
1577            .with_context(|_| BuildTableMetaSnafu {
1578                table_name: table_name.to_string(),
1579            })?;
1580
1581        TableInfoBuilder::new(table_name, table_meta)
1582            .table_id(table_id)
1583            .build()
1584            .with_context(|_| BuildTableInfoSnafu {
1585                table_name: table_name.to_string(),
1586            })
1587    }
1588
1589    fn test_table(table_id: u32, table_name: &str) -> TestResult<table::TableRef> {
1590        let table_info = test_table_info(table_id, table_name)?;
1591        Ok(EmptyTable::from_table_info(&table_info))
1592    }
1593
1594    fn pending_table(
1595        table_id: u32,
1596        table_name: &str,
1597        polled_tx: oneshot::Sender<()>,
1598    ) -> TestResult<table::TableRef> {
1599        let table_info = test_table_info(table_id, table_name)?;
1600        let data_source = Arc::new(PendingDataSource {
1601            schema: table_info.meta.schema.clone(),
1602            polled_tx: std::sync::Mutex::new(Some(polled_tx)),
1603        });
1604
1605        Ok(Arc::new(Table::new(
1606            Arc::new(table_info),
1607            FilterPushDownType::Unsupported,
1608            data_source,
1609        )))
1610    }
1611
1612    async fn test_instance_with_tables(
1613        source_table: TableRef,
1614        target_table: TableRef,
1615    ) -> TestResult<Instance> {
1616        test_instance_with_plugins(source_table, target_table, Plugins::new()).await
1617    }
1618
1619    async fn test_instance_with_insert_select_interceptor(
1620        interceptor: SqlQueryInterceptorRef<Error>,
1621    ) -> TestResult<Instance> {
1622        let plugins = Plugins::new();
1623        plugins.insert::<SqlQueryInterceptorRef<Error>>(interceptor);
1624
1625        test_instance_with_plugins(
1626            test_table(1024, "source")?,
1627            test_table(1025, "target")?,
1628            plugins,
1629        )
1630        .await
1631    }
1632
1633    async fn test_instance_with_plugins(
1634        source_table: TableRef,
1635        target_table: TableRef,
1636        plugins: Plugins,
1637    ) -> TestResult<Instance> {
1638        let kv_backend = Arc::new(MemoryKvBackend::new());
1639        let process_manager = Arc::new(ProcessManager::new("test-frontend".to_string(), None));
1640        let catalog_manager = catalog::memory::MemoryCatalogManager::new_with_table(source_table);
1641        let target_table_name = "target";
1642        catalog_manager
1643            .register_table_sync(catalog::RegisterTableRequest {
1644                catalog: "greptime".to_string(),
1645                schema: "public".to_string(),
1646                table_name: target_table_name.to_string(),
1647                table_id: 1025,
1648                table: target_table,
1649            })
1650            .with_context(|_| RegisterTableSnafu {
1651                table_name: target_table_name.to_string(),
1652            })?;
1653        catalog_manager.register_process_list_table(process_manager.clone());
1654
1655        let cache_registry = test_cache_registry(kv_backend.clone())?;
1656
1657        FrontendBuilder::new(
1658            FrontendOptions::default(),
1659            kv_backend,
1660            cache_registry,
1661            catalog_manager,
1662            Arc::new(client::client_manager::NodeClients::default()),
1663            Arc::new(NoopProcedureExecutor),
1664            process_manager,
1665        )
1666        .with_plugin(plugins)
1667        .try_build()
1668        .await
1669        .context(BuildFrontendSnafu)
1670    }
1671
1672    async fn execute_one_sql(
1673        instance: &Instance,
1674        sql: &str,
1675        query_ctx: QueryContextRef,
1676    ) -> TestResult<Output> {
1677        let mut results = instance.do_query_inner(sql, query_ctx).await;
1678        ensure!(
1679            results.len() == 1,
1680            UnexpectedOutputCountSnafu {
1681                sql: sql.to_string(),
1682                actual: results.len(),
1683            }
1684        );
1685        results.remove(0).with_context(|_| ExecuteSqlSnafu {
1686            sql: sql.to_string(),
1687        })
1688    }
1689
1690    #[test]
1691    fn test_fast_legacy_check_deadlock_prevention() {
1692        // Create a DashMap to simulate the cache
1693        let cache = DashMap::new();
1694
1695        // Pre-populate cache with some entries
1696        cache.insert("metric1".to_string(), true); // legacy mode
1697        cache.insert("metric2".to_string(), false); // prom mode
1698        cache.insert("metric3".to_string(), true); // legacy mode
1699
1700        // Test case 1: Normal operation with cache hits
1701        let metric1 = "metric1".to_string();
1702        let metric4 = "metric4".to_string();
1703        let names1 = vec![&metric1, &metric4];
1704        let result = fast_legacy_check(&cache, &names1);
1705        assert!(result.is_ok());
1706        assert_eq!(result.unwrap(), Some(true)); // should return legacy mode
1707
1708        // Verify that metric4 was added to cache
1709        assert!(cache.contains_key("metric4"));
1710        assert!(*cache.get("metric4").unwrap().value());
1711
1712        // Test case 2: No cache hits
1713        let metric5 = "metric5".to_string();
1714        let metric6 = "metric6".to_string();
1715        let names2 = vec![&metric5, &metric6];
1716        let result = fast_legacy_check(&cache, &names2);
1717        assert!(result.is_ok());
1718        assert_eq!(result.unwrap(), None); // should return None as no cache hits
1719
1720        // Test case 3: Incompatible modes should return error
1721        let cache_incompatible = DashMap::new();
1722        cache_incompatible.insert("metric1".to_string(), true); // legacy
1723        cache_incompatible.insert("metric2".to_string(), false); // prom
1724        let metric1_test = "metric1".to_string();
1725        let metric2_test = "metric2".to_string();
1726        let names3 = vec![&metric1_test, &metric2_test];
1727        let result = fast_legacy_check(&cache_incompatible, &names3);
1728        assert!(result.is_err()); // should error due to incompatible modes
1729
1730        // Test case 4: Intensive concurrent access to test deadlock prevention
1731        // This test specifically targets the scenario where multiple threads
1732        // access the same cache entries simultaneously
1733        let cache_concurrent = Arc::new(DashMap::new());
1734        cache_concurrent.insert("shared_metric".to_string(), true);
1735
1736        let num_threads = 8;
1737        let operations_per_thread = 100;
1738        let barrier = Arc::new(Barrier::new(num_threads));
1739        let success_flag = Arc::new(AtomicBool::new(true));
1740
1741        let handles: Vec<_> = (0..num_threads)
1742            .map(|thread_id| {
1743                let cache_clone = Arc::clone(&cache_concurrent);
1744                let barrier_clone = Arc::clone(&barrier);
1745                let success_flag_clone = Arc::clone(&success_flag);
1746
1747                thread::spawn(move || {
1748                    // Wait for all threads to be ready
1749                    barrier_clone.wait();
1750
1751                    let start_time = Instant::now();
1752                    for i in 0..operations_per_thread {
1753                        // Each operation references existing cache entry and adds new ones
1754                        let shared_metric = "shared_metric".to_string();
1755                        let new_metric = format!("thread_{}_metric_{}", thread_id, i);
1756                        let names = vec![&shared_metric, &new_metric];
1757
1758                        match fast_legacy_check(&cache_clone, &names) {
1759                            Ok(_) => {}
1760                            Err(_) => {
1761                                success_flag_clone.store(false, Ordering::Relaxed);
1762                                return;
1763                            }
1764                        }
1765
1766                        // If the test takes too long, it likely means deadlock
1767                        if start_time.elapsed() > Duration::from_secs(10) {
1768                            success_flag_clone.store(false, Ordering::Relaxed);
1769                            return;
1770                        }
1771                    }
1772                })
1773            })
1774            .collect();
1775
1776        // Join all threads with timeout
1777        let start_time = Instant::now();
1778        for (i, handle) in handles.into_iter().enumerate() {
1779            let join_result = handle.join();
1780
1781            // Check if we're taking too long (potential deadlock)
1782            if start_time.elapsed() > Duration::from_secs(30) {
1783                panic!("Test timed out - possible deadlock detected!");
1784            }
1785
1786            if join_result.is_err() {
1787                panic!("Thread {} panicked during execution", i);
1788            }
1789        }
1790
1791        // Verify all operations completed successfully
1792        assert!(
1793            success_flag.load(Ordering::Relaxed),
1794            "Some operations failed"
1795        );
1796
1797        // Verify that many new entries were added (proving operations completed)
1798        let final_count = cache_concurrent.len();
1799        assert!(
1800            final_count > 1 + num_threads * operations_per_thread / 2,
1801            "Expected more cache entries, got {}",
1802            final_count
1803        );
1804    }
1805
1806    #[test]
1807    fn test_should_track_statement_process() {
1808        assert!(should_track_statement_process(&parse_one_sql(
1809            "SELECT * FROM demo"
1810        )));
1811        assert!(should_track_statement_process(&parse_one_sql(
1812            "INSERT INTO demo SELECT * FROM source"
1813        )));
1814        assert!(!should_track_statement_process(&parse_one_sql(
1815            "INSERT INTO demo VALUES (1)"
1816        )));
1817        assert!(!should_track_statement_process(&parse_one_sql(
1818            "INSERT INTO demo VALUES (now())"
1819        )));
1820    }
1821
1822    #[test]
1823    fn test_should_track_plan_process() {
1824        let select_stmt = parse_one_sql("SELECT * FROM demo");
1825        let insert_select_stmt = parse_one_sql("INSERT INTO demo SELECT * FROM source");
1826        let insert_values_stmt = parse_one_sql("INSERT INTO demo VALUES (now())");
1827
1828        let empty_plan = LogicalPlanBuilder::empty(false).build().unwrap();
1829        assert!(should_track_plan_process(Some(&select_stmt), &empty_plan));
1830        assert!(should_track_plan_process(
1831            Some(&insert_select_stmt),
1832            &insert_dml_plan()
1833        ));
1834        assert!(!should_track_plan_process(
1835            Some(&insert_values_stmt),
1836            &insert_dml_plan()
1837        ));
1838        assert!(!should_track_plan_process(None, &insert_dml_plan()));
1839    }
1840
1841    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1842    async fn test_insert_select_is_visible_in_show_processlist() -> TestResult<()> {
1843        let insert_sql = "INSERT INTO target SELECT * FROM source";
1844        let (started_tx, mut started_rx) = mpsc::unbounded_channel();
1845        let (finish_tx, finish_rx) = oneshot::channel();
1846        let interceptor = Arc::new(BlockingInsertSelectInterceptor::new(started_tx, finish_rx));
1847        let instance = Arc::new(test_instance_with_insert_select_interceptor(interceptor).await?);
1848
1849        let insert_task = tokio::spawn({
1850            let instance = instance.clone();
1851            async move { execute_one_sql(&instance, insert_sql, test_query_ctx(4242)).await }
1852        });
1853
1854        tokio::time::timeout(Duration::from_secs(5), started_rx.recv())
1855            .await
1856            .context(InsertStartTimeoutSnafu)?
1857            .context(InsertStartChannelClosedSnafu)?;
1858
1859        let output = execute_one_sql(&instance, "SHOW PROCESSLIST", test_query_ctx(43)).await?;
1860        let process_list = output.data.pretty_print().await;
1861        assert!(
1862            process_list.contains(insert_sql),
1863            "process list did not contain running insert:\n{process_list}"
1864        );
1865
1866        finish_tx
1867            .send(())
1868            .map_err(|_| ReleaseBlockedInsertSnafu.build())?;
1869        insert_task.await.context(InsertTaskPanicSnafu)??;
1870
1871        Ok(())
1872    }
1873
1874    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1875    async fn test_kill_query_cancels_insert_select() -> TestResult<()> {
1876        assert_kill_cancels_insert_select("KILL QUERY 4242").await
1877    }
1878
1879    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1880    async fn test_kill_process_id_cancels_insert_select() -> TestResult<()> {
1881        assert_kill_cancels_insert_select("KILL 'test-frontend/4242'").await
1882    }
1883
1884    async fn assert_kill_cancels_insert_select(kill_sql: &str) -> TestResult<()> {
1885        let insert_sql = "INSERT INTO target SELECT * FROM source";
1886        let (source_polled_tx, source_polled_rx) = oneshot::channel();
1887        let instance = Arc::new(
1888            test_instance_with_tables(
1889                pending_table(1024, "source", source_polled_tx)?,
1890                test_table(1025, "target")?,
1891            )
1892            .await?,
1893        );
1894
1895        let insert_task = tokio::spawn({
1896            let instance = instance.clone();
1897            async move { execute_one_sql(&instance, insert_sql, test_query_ctx(4242)).await }
1898        });
1899
1900        tokio::time::timeout(Duration::from_secs(5), source_polled_rx)
1901            .await
1902            .context(SourcePollTimeoutSnafu)?
1903            .context(SourcePollChannelClosedSnafu)?;
1904
1905        let output = execute_one_sql(&instance, kill_sql, test_query_ctx(43)).await?;
1906        assert!(matches!(output.data, OutputData::AffectedRows(1)));
1907
1908        let insert_result = tokio::time::timeout(Duration::from_secs(5), insert_task)
1909            .await
1910            .context(InsertTaskTimeoutSnafu)?
1911            .context(InsertTaskPanicSnafu)?;
1912        let err = match insert_result {
1913            Ok(_) => return InsertSelectNotCancelledSnafu.fail(),
1914            Err(TestError::ExecuteSql { source, .. }) => source,
1915            Err(err) => return Err(err),
1916        };
1917        assert_eq!(StatusCode::Cancelled, err.status_code());
1918
1919        let output = execute_one_sql(&instance, "SHOW PROCESSLIST", test_query_ctx(43)).await?;
1920        let process_list = output.data.pretty_print().await;
1921        assert!(
1922            !process_list.contains(insert_sql),
1923            "process list still contains killed insert:\n{process_list}"
1924        );
1925
1926        Ok(())
1927    }
1928
1929    fn insert_dml_plan() -> LogicalPlan {
1930        let schema = SchemaRef::new(Schema::new(vec![Field::new(
1931            "value",
1932            DataType::Int64,
1933            true,
1934        )]));
1935        let target = Arc::new(LogicalTableSource::new(schema));
1936        let input = LogicalPlanBuilder::empty(false).build().unwrap();
1937
1938        LogicalPlanBuilder::insert_into(input, "demo", target, InsertOp::Append)
1939            .unwrap()
1940            .build()
1941            .unwrap()
1942    }
1943
1944    #[test]
1945    fn test_exec_validation() {
1946        let query_ctx = QueryContext::arc();
1947        let plugins: Plugins = Plugins::new();
1948        plugins.insert(QueryOptions {
1949            disallow_cross_catalog_query: true,
1950        });
1951
1952        let sql = r#"
1953        SELECT * FROM demo;
1954        EXPLAIN SELECT * FROM demo;
1955        CREATE DATABASE test_database;
1956        SHOW DATABASES;
1957        "#;
1958        let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1959        assert_eq!(stmts.len(), 4);
1960        for stmt in stmts {
1961            let re = check_permission(plugins.clone(), &stmt, &query_ctx);
1962            re.unwrap();
1963        }
1964
1965        let sql = r#"
1966        SHOW CREATE TABLE demo;
1967        ALTER TABLE demo ADD COLUMN new_col INT;
1968        "#;
1969        let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1970        assert_eq!(stmts.len(), 2);
1971        for stmt in stmts {
1972            let re = check_permission(plugins.clone(), &stmt, &query_ctx);
1973            re.unwrap();
1974        }
1975
1976        fn replace_test(template_sql: &str, plugins: Plugins, query_ctx: &QueryContextRef) {
1977            // test right
1978            let right = vec![("", ""), ("", "public."), ("greptime.", "public.")];
1979            for (catalog, schema) in right {
1980                let sql = do_fmt(template_sql, catalog, schema);
1981                do_test(&sql, plugins.clone(), query_ctx, true);
1982            }
1983
1984            let wrong = vec![
1985                ("wrongcatalog.", "public."),
1986                ("wrongcatalog.", "wrongschema."),
1987            ];
1988            for (catalog, schema) in wrong {
1989                let sql = do_fmt(template_sql, catalog, schema);
1990                do_test(&sql, plugins.clone(), query_ctx, false);
1991            }
1992        }
1993
1994        fn do_fmt(template: &str, catalog: &str, schema: &str) -> String {
1995            let vars = HashMap::from([
1996                ("catalog".to_string(), catalog),
1997                ("schema".to_string(), schema),
1998            ]);
1999            template.format(&vars).unwrap()
2000        }
2001
2002        fn do_test(sql: &str, plugins: Plugins, query_ctx: &QueryContextRef, is_ok: bool) {
2003            let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
2004            let re = check_permission(plugins, stmt, query_ctx);
2005            if is_ok {
2006                re.unwrap();
2007            } else {
2008                assert!(re.is_err());
2009            }
2010        }
2011
2012        // test insert
2013        let sql = "INSERT INTO {catalog}{schema}monitor(host) VALUES ('host1');";
2014        replace_test(sql, plugins.clone(), &query_ctx);
2015
2016        // test create table
2017        let sql = r#"CREATE TABLE {catalog}{schema}demo(
2018                            host STRING,
2019                            ts TIMESTAMP,
2020                            TIME INDEX (ts),
2021                            PRIMARY KEY(host)
2022                        ) engine=mito;"#;
2023        replace_test(sql, plugins.clone(), &query_ctx);
2024
2025        // test drop table
2026        let sql = "DROP TABLE {catalog}{schema}demo;";
2027        replace_test(sql, plugins.clone(), &query_ctx);
2028
2029        // test show tables
2030        let sql = "SHOW TABLES FROM public";
2031        let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
2032        check_permission(plugins.clone(), &stmt[0], &query_ctx).unwrap();
2033
2034        let sql = "SHOW TABLES FROM private";
2035        let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
2036        let re = check_permission(plugins.clone(), &stmt[0], &query_ctx);
2037        assert!(re.is_ok());
2038
2039        // test describe table
2040        let sql = "DESC TABLE {catalog}{schema}demo;";
2041        replace_test(sql, plugins.clone(), &query_ctx);
2042
2043        let comment_flow_cases = [
2044            ("COMMENT ON FLOW my_flow IS 'comment';", true),
2045            ("COMMENT ON FLOW greptime.my_flow IS 'comment';", true),
2046            ("COMMENT ON FLOW wrongcatalog.my_flow IS 'comment';", false),
2047        ];
2048        for (sql, is_ok) in comment_flow_cases {
2049            let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
2050            let result = check_permission(plugins.clone(), stmt, &query_ctx);
2051            assert_eq!(result.is_ok(), is_ok);
2052        }
2053
2054        let show_flow_cases = [
2055            ("SHOW CREATE FLOW my_flow;", true),
2056            ("SHOW CREATE FLOW greptime.my_flow;", true),
2057            ("SHOW CREATE FLOW wrongcatalog.my_flow;", false),
2058        ];
2059        for (sql, is_ok) in show_flow_cases {
2060            let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
2061            let result = check_permission(plugins.clone(), stmt, &query_ctx);
2062            assert_eq!(result.is_ok(), is_ok);
2063        }
2064    }
2065}