Skip to main content

frontend/
instance.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod builder;
16mod dashboard;
17mod grpc;
18mod influxdb;
19mod jaeger;
20mod log_handler;
21mod logs;
22mod opentsdb;
23mod otlp;
24pub mod prom_store;
25mod promql;
26mod region_query;
27pub mod standalone;
28
29use std::pin::Pin;
30use std::sync::atomic::AtomicBool;
31use std::sync::{Arc, atomic};
32use std::time::{Duration, SystemTime};
33
34use async_stream::stream;
35use async_trait::async_trait;
36use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
37use catalog::CatalogManagerRef;
38use catalog::process_manager::{
39    ProcessManagerRef, QueryStatement as CatalogQueryStatement, SlowQueryTimer,
40};
41use client::OutputData;
42use common_base::Plugins;
43use common_base::cancellation::CancellableFuture;
44use common_error::ext::{BoxedError, ErrorExt};
45use common_event_recorder::EventRecorderRef;
46use common_meta::cache_invalidator::CacheInvalidatorRef;
47use common_meta::key::TableMetadataManagerRef;
48use common_meta::key::table_name::TableNameKey;
49use common_meta::node_manager::NodeManagerRef;
50use common_meta::procedure_executor::ProcedureExecutorRef;
51use common_query::Output;
52use common_recordbatch::RecordBatchStreamWrapper;
53use common_recordbatch::error::StreamTimeoutSnafu;
54use common_telemetry::logging::SlowQueryOptions;
55use common_telemetry::{debug, error, tracing};
56use dashmap::DashMap;
57use datafusion_expr::LogicalPlan;
58use futures::{Stream, StreamExt};
59use lazy_static::lazy_static;
60use operator::delete::DeleterRef;
61use operator::insert::InserterRef;
62use operator::statement::{StatementExecutor, StatementExecutorRef};
63use partition::manager::PartitionRuleManagerRef;
64use pipeline::pipeline_operator::PipelineOperator;
65use prometheus::HistogramTimer;
66use promql_parser::label::Matcher;
67use query::QueryEngineRef;
68use query::metrics::OnDone;
69use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
70use query::query_engine::DescribeResult;
71use query::query_engine::options::{QueryOptions, validate_catalog_and_schema};
72use servers::error::{
73    self as server_error, AuthSnafu, CommonMetaSnafu, ExecuteQuerySnafu,
74    OtlpMetricModeIncompatibleSnafu, ParsePromQLSnafu, UnexpectedResultSnafu,
75};
76use servers::interceptor::{
77    PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef,
78};
79use servers::otlp::metrics::legacy_normalize_otlp_name;
80use servers::prometheus_handler::PrometheusHandler;
81use servers::query_handler::sql::SqlQueryHandler;
82use session::context::{Channel, QueryContextRef};
83use session::table_name::table_idents_to_full_name;
84use snafu::prelude::*;
85use sql::ast::ObjectNamePartExt;
86use sql::dialect::Dialect;
87use sql::parser::{ParseOptions, ParserContext};
88use sql::statements::comment::CommentObject;
89use sql::statements::copy::{CopyDatabase, CopyTable};
90use sql::statements::statement::Statement;
91use sql::statements::tql::Tql;
92use sqlparser::ast::ObjectName;
93pub use standalone::StandaloneDatanodeManager;
94use table::requests::{OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM};
95use tracing::Span;
96
97use crate::error::{
98    self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, InvalidSqlSnafu,
99    ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu,
100    StatementTimeoutSnafu, TableOperationSnafu,
101};
102use crate::service_config::InfluxdbMergeMode;
103use crate::stream_wrapper::CancellableStreamWrapper;
104
105lazy_static! {
106    static ref OTLP_LEGACY_DEFAULT_VALUE: String = "legacy".to_string();
107}
108
109/// The frontend instance contains necessary components, and implements many
110/// traits, like [`servers::query_handler::grpc::GrpcQueryHandler`],
111/// [`servers::query_handler::sql::SqlQueryHandler`], etc.
112#[derive(Clone)]
113pub struct Instance {
114    frontend_peer_addr: String,
115    catalog_manager: CatalogManagerRef,
116    pipeline_operator: Arc<PipelineOperator>,
117    statement_executor: Arc<StatementExecutor>,
118    query_engine: QueryEngineRef,
119    plugins: Plugins,
120    inserter: InserterRef,
121    deleter: DeleterRef,
122    table_metadata_manager: TableMetadataManagerRef,
123    event_recorder: Option<EventRecorderRef>,
124    process_manager: ProcessManagerRef,
125    slow_query_options: SlowQueryOptions,
126    influxdb_default_merge_mode: InfluxdbMergeMode,
127    suspend: Arc<AtomicBool>,
128
129    // cache for otlp metrics
130    // first layer key: db-string
131    // key: direct input metric name
132    // value: if runs in legacy mode
133    otlp_metrics_table_legacy_cache: DashMap<String, DashMap<String, bool>>,
134}
135
136impl Instance {
137    pub fn frontend_peer_addr(&self) -> &str {
138        &self.frontend_peer_addr
139    }
140
141    pub fn catalog_manager(&self) -> &CatalogManagerRef {
142        &self.catalog_manager
143    }
144
145    pub fn query_engine(&self) -> &QueryEngineRef {
146        &self.query_engine
147    }
148
149    pub fn plugins(&self) -> &Plugins {
150        &self.plugins
151    }
152
153    pub fn statement_executor(&self) -> &StatementExecutorRef {
154        &self.statement_executor
155    }
156
157    pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
158        &self.table_metadata_manager
159    }
160
161    pub fn inserter(&self) -> &InserterRef {
162        &self.inserter
163    }
164
165    pub fn process_manager(&self) -> &ProcessManagerRef {
166        &self.process_manager
167    }
168
169    pub fn node_manager(&self) -> &NodeManagerRef {
170        self.inserter.node_manager()
171    }
172
173    pub fn partition_manager(&self) -> &PartitionRuleManagerRef {
174        self.inserter.partition_manager()
175    }
176
177    pub fn cache_invalidator(&self) -> &CacheInvalidatorRef {
178        self.statement_executor.cache_invalidator()
179    }
180
181    pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
182        self.statement_executor.procedure_executor()
183    }
184
185    pub fn suspend_state(&self) -> Arc<AtomicBool> {
186        self.suspend.clone()
187    }
188
189    pub(crate) fn is_suspended(&self) -> bool {
190        self.suspend.load(atomic::Ordering::Relaxed)
191    }
192}
193
194fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result<Vec<Statement>> {
195    ParserContext::create_with_dialect(sql, dialect, ParseOptions::default()).context(ParseSqlSnafu)
196}
197
198impl Instance {
199    async fn query_statement(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
200        check_permission(self.plugins.clone(), &stmt, &query_ctx)?;
201
202        let query_interceptor = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
203        let query_interceptor = query_interceptor.as_ref();
204
205        let is_readonly_stmt = stmt.is_readonly();
206        if should_track_statement_process(&stmt) {
207            let slow_query_timer = if is_readonly_stmt {
208                self.slow_query_options
209                    .enable
210                    .then(|| self.event_recorder.clone())
211                    .flatten()
212                    .map(|event_recorder| {
213                        SlowQueryTimer::new(
214                            CatalogQueryStatement::Sql(stmt.clone()),
215                            self.slow_query_options.threshold,
216                            self.slow_query_options.sample_ratio,
217                            self.slow_query_options.record_type,
218                            event_recorder,
219                        )
220                    })
221            } else {
222                None
223            };
224
225            let ticket = self.process_manager.register_query(
226                query_ctx.current_catalog().to_string(),
227                vec![query_ctx.current_schema()],
228                stmt.to_string(),
229                query_ctx.conn_info().to_string(),
230                Some(query_ctx.process_id()),
231                slow_query_timer,
232            );
233
234            let query_fut = self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor);
235
236            CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
237                .await
238                .map_err(|_| error::CancelledSnafu.build())?
239                .map(|output| {
240                    let Output { meta, data } = output;
241
242                    let data = match data {
243                        OutputData::Stream(stream) => OutputData::Stream(Box::pin(
244                            CancellableStreamWrapper::new(stream, ticket),
245                        )),
246                        other => other,
247                    };
248                    Output { data, meta }
249                })
250        } else {
251            self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor)
252                .await
253        }
254    }
255
256    async fn exec_statement_with_timeout(
257        &self,
258        stmt: Statement,
259        query_ctx: QueryContextRef,
260        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
261    ) -> Result<Output> {
262        let timeout = derive_timeout(&stmt, &query_ctx);
263        match timeout {
264            Some(timeout) => {
265                let start = tokio::time::Instant::now();
266                let output = tokio::time::timeout(
267                    timeout,
268                    self.exec_statement(stmt, query_ctx, query_interceptor),
269                )
270                .await
271                .map_err(|_| StatementTimeoutSnafu.build())??;
272                // compute remaining timeout
273                let remaining_timeout = timeout.checked_sub(start.elapsed()).unwrap_or_default();
274                attach_timeout(output, remaining_timeout)
275            }
276            None => {
277                self.exec_statement(stmt, query_ctx, query_interceptor)
278                    .await
279            }
280        }
281    }
282
283    async fn exec_statement(
284        &self,
285        stmt: Statement,
286        query_ctx: QueryContextRef,
287        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
288    ) -> Result<Output> {
289        match stmt {
290            Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
291                // TODO: remove this when format is supported in datafusion
292                if let Statement::Explain(explain) = &stmt
293                    && let Some(format) = explain.format()
294                {
295                    query_ctx.set_explain_format(format.to_string());
296                }
297
298                self.plan_and_exec_sql(stmt, &query_ctx, query_interceptor)
299                    .await
300            }
301            Statement::Tql(tql) => {
302                self.plan_and_exec_tql(&query_ctx, query_interceptor, tql)
303                    .await
304            }
305            _ => {
306                query_interceptor.pre_execute(&stmt, None, query_ctx.clone())?;
307                self.statement_executor
308                    .execute_sql(stmt, query_ctx)
309                    .await
310                    .context(TableOperationSnafu)
311            }
312        }
313    }
314
315    async fn plan_and_exec_sql(
316        &self,
317        stmt: Statement,
318        query_ctx: &QueryContextRef,
319        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
320    ) -> Result<Output> {
321        let stmt = QueryStatement::Sql(stmt);
322        let plan = self
323            .statement_executor
324            .plan(&stmt, query_ctx.clone())
325            .await?;
326        let QueryStatement::Sql(stmt) = stmt else {
327            unreachable!()
328        };
329        query_interceptor.pre_execute(&stmt, Some(&plan), query_ctx.clone())?;
330
331        self.statement_executor
332            .exec_plan(plan, query_ctx.clone())
333            .await
334            .context(TableOperationSnafu)
335    }
336
337    async fn plan_and_exec_tql(
338        &self,
339        query_ctx: &QueryContextRef,
340        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
341        tql: Tql,
342    ) -> Result<Output> {
343        let plan = self
344            .statement_executor
345            .plan_tql(tql.clone(), query_ctx)
346            .await?;
347        query_interceptor.pre_execute(&Statement::Tql(tql), Some(&plan), query_ctx.clone())?;
348        self.statement_executor
349            .exec_plan(plan, query_ctx.clone())
350            .await
351            .context(TableOperationSnafu)
352    }
353
354    async fn check_otlp_legacy(
355        &self,
356        names: &[&String],
357        ctx: QueryContextRef,
358    ) -> server_error::Result<bool> {
359        let db_string = ctx.get_db_string();
360        // fast cache check
361        let cache = self
362            .otlp_metrics_table_legacy_cache
363            .entry(db_string.clone())
364            .or_default();
365        if let Some(flag) = fast_legacy_check(&cache, names)? {
366            return Ok(flag);
367        }
368        // release cache reference to avoid lock contention
369        drop(cache);
370
371        let catalog = ctx.current_catalog();
372        let schema = ctx.current_schema();
373
374        // query legacy table names
375        let normalized_names = names
376            .iter()
377            .map(|n| legacy_normalize_otlp_name(n))
378            .collect::<Vec<_>>();
379        let table_names = normalized_names
380            .iter()
381            .map(|n| TableNameKey::new(catalog, &schema, n))
382            .collect::<Vec<_>>();
383        let table_values = self
384            .table_metadata_manager()
385            .table_name_manager()
386            .batch_get(table_names)
387            .await
388            .context(CommonMetaSnafu)?;
389        let table_ids = table_values
390            .into_iter()
391            .filter_map(|v| v.map(|vi| vi.table_id()))
392            .collect::<Vec<_>>();
393
394        // means no existing table is found, use new mode
395        if table_ids.is_empty() {
396            let cache = self
397                .otlp_metrics_table_legacy_cache
398                .entry(db_string)
399                .or_default();
400            names.iter().for_each(|name| {
401                cache.insert((*name).clone(), false);
402            });
403            return Ok(false);
404        }
405
406        // has existing table, check table options
407        let table_infos = self
408            .table_metadata_manager()
409            .table_info_manager()
410            .batch_get(&table_ids)
411            .await
412            .context(CommonMetaSnafu)?;
413        let options = table_infos
414            .values()
415            .map(|info| {
416                info.table_info
417                    .meta
418                    .options
419                    .extra_options
420                    .get(OTLP_METRIC_COMPAT_KEY)
421                    .unwrap_or(&OTLP_LEGACY_DEFAULT_VALUE)
422            })
423            .collect::<Vec<_>>();
424        let cache = self
425            .otlp_metrics_table_legacy_cache
426            .entry(db_string)
427            .or_default();
428        if !options.is_empty() {
429            // check value consistency
430            let has_prom = options.iter().any(|opt| *opt == OTLP_METRIC_COMPAT_PROM);
431            let has_legacy = options
432                .iter()
433                .any(|opt| *opt == OTLP_LEGACY_DEFAULT_VALUE.as_str());
434            ensure!(!(has_prom && has_legacy), OtlpMetricModeIncompatibleSnafu);
435            let flag = has_legacy;
436            names.iter().for_each(|name| {
437                cache.insert((*name).clone(), flag);
438            });
439            Ok(flag)
440        } else {
441            // no table info, use new mode
442            names.iter().for_each(|name| {
443                cache.insert((*name).clone(), false);
444            });
445            Ok(false)
446        }
447    }
448}
449
450fn fast_legacy_check(
451    cache: &DashMap<String, bool>,
452    names: &[&String],
453) -> server_error::Result<Option<bool>> {
454    let hit_cache = names
455        .iter()
456        .filter_map(|name| cache.get(*name))
457        .collect::<Vec<_>>();
458    if !hit_cache.is_empty() {
459        let hit_legacy = hit_cache.iter().any(|en| *en.value());
460        let hit_prom = hit_cache.iter().any(|en| !*en.value());
461
462        // hit but have true and false, means both legacy and new mode are used
463        // we cannot handle this case, so return error
464        // add doc links in err msg later
465        ensure!(!(hit_legacy && hit_prom), OtlpMetricModeIncompatibleSnafu);
466
467        let flag = hit_legacy;
468        // drop hit_cache to release references before inserting to avoid deadlock
469        drop(hit_cache);
470
471        // set cache for all names
472        names.iter().for_each(|name| {
473            if !cache.contains_key(*name) {
474                cache.insert((*name).clone(), flag);
475            }
476        });
477        Ok(Some(flag))
478    } else {
479        Ok(None)
480    }
481}
482
483/// If the relevant variables are set, the timeout is enforced for all PostgreSQL statements.
484/// For MySQL, it applies only to read-only statements.
485fn derive_timeout(stmt: &Statement, query_ctx: &QueryContextRef) -> Option<Duration> {
486    let query_timeout = query_ctx.query_timeout()?;
487    if query_timeout.is_zero() {
488        return None;
489    }
490    match query_ctx.channel() {
491        Channel::Mysql if stmt.is_readonly() => Some(query_timeout),
492        Channel::Postgres => Some(query_timeout),
493        _ => None,
494    }
495}
496
497/// Derives timeout for plan execution.
498fn derive_timeout_for_plan(plan: &LogicalPlan, query_ctx: &QueryContextRef) -> Option<Duration> {
499    let query_timeout = query_ctx.query_timeout()?;
500    if query_timeout.is_zero() {
501        return None;
502    }
503    match query_ctx.channel() {
504        Channel::Mysql if is_readonly_plan(plan) => Some(query_timeout),
505        Channel::Postgres => Some(query_timeout),
506        _ => None,
507    }
508}
509
510fn attach_timeout(output: Output, mut timeout: Duration) -> Result<Output> {
511    if timeout.is_zero() {
512        return StatementTimeoutSnafu.fail();
513    }
514
515    let output = match output.data {
516        OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
517        OutputData::Stream(mut stream) => {
518            let schema = stream.schema();
519            let s = Box::pin(stream! {
520                let mut start = tokio::time::Instant::now();
521                while let Some(item) = tokio::time::timeout(timeout, stream.next()).await.map_err(|_| StreamTimeoutSnafu.build())? {
522                    yield item;
523
524                    let now = tokio::time::Instant::now();
525                    timeout = timeout.checked_sub(now - start).unwrap_or(Duration::ZERO);
526                    start = now;
527                    // tokio::time::timeout may not return an error immediately when timeout is 0.
528                    if timeout.is_zero() {
529                        StreamTimeoutSnafu.fail()?;
530                    }
531                }
532            }) as Pin<Box<dyn Stream<Item = _> + Send>>;
533            let stream = RecordBatchStreamWrapper {
534                schema,
535                stream: s,
536                output_ordering: None,
537                metrics: Default::default(),
538                span: Span::current(),
539            };
540            Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
541        }
542    };
543
544    Ok(output)
545}
546
547impl Instance {
548    #[tracing::instrument(skip_all, name = "SqlQueryHandler::do_query")]
549    async fn do_query_inner(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
550        if self.is_suspended() {
551            return vec![error::SuspendedSnafu {}.fail()];
552        }
553
554        let query_interceptor_opt = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
555        let query_interceptor = query_interceptor_opt.as_ref();
556        let query = match query_interceptor.pre_parsing(query, query_ctx.clone()) {
557            Ok(q) => q,
558            Err(e) => return vec![Err(e)],
559        };
560
561        let checker_ref = self.plugins.get::<PermissionCheckerRef>();
562        let checker = checker_ref.as_ref();
563
564        match parse_stmt(query.as_ref(), query_ctx.sql_dialect())
565            .and_then(|stmts| query_interceptor.post_parsing(stmts, query_ctx.clone()))
566        {
567            Ok(stmts) => {
568                if stmts.is_empty() {
569                    return vec![
570                        InvalidSqlSnafu {
571                            err_msg: "empty statements",
572                        }
573                        .fail(),
574                    ];
575                }
576
577                let mut results = Vec::with_capacity(stmts.len());
578                for stmt in stmts {
579                    if let Err(e) = checker
580                        .check_permission(
581                            query_ctx.current_user(),
582                            PermissionReq::SqlStatement(&stmt),
583                        )
584                        .context(PermissionSnafu)
585                    {
586                        results.push(Err(e));
587                        break;
588                    }
589
590                    match self.query_statement(stmt.clone(), query_ctx.clone()).await {
591                        Ok(output) => {
592                            let output_result =
593                                query_interceptor.post_execute(output, query_ctx.clone());
594                            results.push(output_result);
595                        }
596                        Err(e) => {
597                            if e.status_code().should_log_error() {
598                                error!(e; "Failed to execute query: {stmt}");
599                            } else {
600                                debug!("Failed to execute query: {stmt}, {e}");
601                            }
602                            results.push(Err(e));
603                            break;
604                        }
605                    }
606                }
607                results
608            }
609            Err(e) => {
610                vec![Err(e)]
611            }
612        }
613    }
614
615    async fn exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
616        self.query_engine
617            .execute(plan, query_ctx)
618            .await
619            .context(ExecLogicalPlanSnafu)
620    }
621
622    async fn exec_plan_with_timeout(
623        &self,
624        plan: LogicalPlan,
625        query_ctx: QueryContextRef,
626    ) -> Result<Output> {
627        let timeout = derive_timeout_for_plan(&plan, &query_ctx);
628        match timeout {
629            Some(timeout) => {
630                let start = tokio::time::Instant::now();
631                let output = tokio::time::timeout(timeout, self.exec_plan(plan, query_ctx))
632                    .await
633                    .map_err(|_| StatementTimeoutSnafu.build())??;
634                let remaining_timeout = timeout.checked_sub(start.elapsed()).unwrap_or_default();
635                attach_timeout(output, remaining_timeout)
636            }
637            None => self.exec_plan(plan, query_ctx).await,
638        }
639    }
640
641    async fn do_exec_plan_inner(
642        &self,
643        plan: LogicalPlan,
644        stmt: Option<Statement>,
645        query_ctx: QueryContextRef,
646    ) -> Result<Output> {
647        ensure!(!self.is_suspended(), error::SuspendedSnafu);
648
649        let query_interceptor_opt = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
650        let query_interceptor = query_interceptor_opt.as_ref();
651
652        if let Some(ref s) = stmt {
653            query_interceptor.pre_execute(s, Some(&plan), query_ctx.clone())?;
654        }
655
656        let query = stmt
657            .as_ref()
658            .map(|s| s.to_string())
659            .unwrap_or_else(|| plan.display_indent().to_string());
660
661        let plan_is_readonly = is_readonly_plan(&plan);
662        let result = if should_track_plan_process(stmt.as_ref(), &plan) {
663            let slow_query_timer = if plan_is_readonly {
664                self.slow_query_options
665                    .enable
666                    .then(|| self.event_recorder.clone())
667                    .flatten()
668                    .map(|event_recorder| {
669                        SlowQueryTimer::new(
670                            CatalogQueryStatement::Plan(query.clone()),
671                            self.slow_query_options.threshold,
672                            self.slow_query_options.sample_ratio,
673                            self.slow_query_options.record_type,
674                            event_recorder,
675                        )
676                    })
677            } else {
678                None
679            };
680
681            let ticket = self.process_manager.register_query(
682                query_ctx.current_catalog().to_string(),
683                vec![query_ctx.current_schema()],
684                query,
685                query_ctx.conn_info().to_string(),
686                Some(query_ctx.process_id()),
687                slow_query_timer,
688            );
689
690            let query_fut = self.exec_plan_with_timeout(plan, query_ctx.clone());
691
692            CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
693                .await
694                .map_err(|_| error::CancelledSnafu.build())?
695                .map(|output| {
696                    let Output { meta, data } = output;
697
698                    let data = match data {
699                        OutputData::Stream(stream) => OutputData::Stream(Box::pin(
700                            CancellableStreamWrapper::new(stream, ticket),
701                        )),
702                        other => other,
703                    };
704                    Output { data, meta }
705                })
706        } else {
707            self.exec_plan_with_timeout(plan, query_ctx.clone()).await
708        };
709
710        result.and_then(|output| query_interceptor.post_execute(output, query_ctx))
711    }
712
713    #[tracing::instrument(skip_all, name = "SqlQueryHandler::do_promql_query")]
714    async fn do_promql_query_inner(
715        &self,
716        query: &PromQuery,
717        query_ctx: QueryContextRef,
718    ) -> Vec<Result<Output>> {
719        if self.is_suspended() {
720            return vec![error::SuspendedSnafu {}.fail()];
721        }
722
723        // check will be done in prometheus handler's do_query
724        let result = PrometheusHandler::do_query(self, query, query_ctx)
725            .await
726            .with_context(|_| ExecutePromqlSnafu {
727                query: format!("{query:?}"),
728            });
729        vec![result]
730    }
731
732    async fn do_describe_inner(
733        &self,
734        stmt: Statement,
735        query_ctx: QueryContextRef,
736    ) -> Result<Option<DescribeResult>> {
737        ensure!(!self.is_suspended(), error::SuspendedSnafu);
738
739        // EXPLAIN / EXPLAIN ANALYZE wrap an inner statement; describe them when the
740        // wrapped statement is something we already plan (so that bind parameters
741        // in the inner query get their types inferred). See #8029.
742        let is_inner_plannable = |s: &Statement| {
743            matches!(
744                s,
745                Statement::Insert(_) | Statement::Query(_) | Statement::Delete(_)
746            )
747        };
748        let plannable = is_inner_plannable(&stmt)
749            || matches!(&stmt, Statement::Explain(explain) if is_inner_plannable(explain.statement.as_ref()));
750
751        if plannable {
752            self.plugins
753                .get::<PermissionCheckerRef>()
754                .as_ref()
755                .check_permission(query_ctx.current_user(), PermissionReq::SqlStatement(&stmt))
756                .context(PermissionSnafu)?;
757
758            let plan = self
759                .query_engine
760                .planner()
761                .plan(&QueryStatement::Sql(stmt), query_ctx.clone())
762                .await
763                .context(PlanStatementSnafu)?;
764            self.query_engine
765                .describe(plan, query_ctx)
766                .await
767                .map(Some)
768                .context(error::DescribeStatementSnafu)
769        } else {
770            Ok(None)
771        }
772    }
773
774    async fn is_valid_schema_inner(&self, catalog: &str, schema: &str) -> Result<bool> {
775        self.catalog_manager
776            .schema_exists(catalog, schema, None)
777            .await
778            .context(error::CatalogSnafu)
779    }
780}
781
782#[async_trait]
783impl SqlQueryHandler for Instance {
784    async fn do_query(
785        &self,
786        query: &str,
787        query_ctx: QueryContextRef,
788    ) -> Vec<server_error::Result<Output>> {
789        self.do_query_inner(query, query_ctx)
790            .await
791            .into_iter()
792            .map(|result| result.map_err(BoxedError::new).context(ExecuteQuerySnafu))
793            .collect()
794    }
795
796    async fn do_exec_plan(
797        &self,
798        plan: LogicalPlan,
799        stmt: Option<Statement>,
800        query_ctx: QueryContextRef,
801    ) -> server_error::Result<Output> {
802        self.do_exec_plan_inner(plan, stmt, query_ctx)
803            .await
804            .map_err(BoxedError::new)
805            .context(server_error::ExecutePlanSnafu)
806    }
807
808    async fn do_promql_query(
809        &self,
810        query: &PromQuery,
811        query_ctx: QueryContextRef,
812    ) -> Vec<server_error::Result<Output>> {
813        self.do_promql_query_inner(query, query_ctx)
814            .await
815            .into_iter()
816            .map(|result| result.map_err(BoxedError::new).context(ExecuteQuerySnafu))
817            .collect()
818    }
819
820    async fn do_describe(
821        &self,
822        stmt: Statement,
823        query_ctx: QueryContextRef,
824    ) -> server_error::Result<Option<DescribeResult>> {
825        self.do_describe_inner(stmt, query_ctx)
826            .await
827            .map_err(BoxedError::new)
828            .context(server_error::DescribeStatementSnafu)
829    }
830
831    async fn is_valid_schema(&self, catalog: &str, schema: &str) -> server_error::Result<bool> {
832        self.is_valid_schema_inner(catalog, schema)
833            .await
834            .map_err(BoxedError::new)
835            .context(server_error::CheckDatabaseValiditySnafu)
836    }
837}
838
839/// Attaches a timer to the output and observes it once the output is exhausted.
840pub fn attach_timer(output: Output, timer: HistogramTimer) -> Output {
841    match output.data {
842        OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
843        OutputData::Stream(stream) => {
844            let stream = OnDone::new(stream, move || {
845                timer.observe_duration();
846            });
847            Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
848        }
849    }
850}
851
852#[async_trait]
853impl PrometheusHandler for Instance {
854    #[tracing::instrument(skip_all)]
855    async fn do_query(
856        &self,
857        query: &PromQuery,
858        query_ctx: QueryContextRef,
859    ) -> server_error::Result<Output> {
860        let interceptor = self
861            .plugins
862            .get::<PromQueryInterceptorRef<server_error::Error>>();
863
864        self.plugins
865            .get::<PermissionCheckerRef>()
866            .as_ref()
867            .check_permission(query_ctx.current_user(), PermissionReq::PromQuery)
868            .context(AuthSnafu)?;
869
870        let stmt = QueryLanguageParser::parse_promql(query, &query_ctx).with_context(|_| {
871            ParsePromQLSnafu {
872                query: query.clone(),
873            }
874        })?;
875
876        let plan = self
877            .statement_executor
878            .plan(&stmt, query_ctx.clone())
879            .await
880            .map_err(BoxedError::new)
881            .context(ExecuteQuerySnafu)?;
882
883        interceptor.pre_execute(query, Some(&plan), query_ctx.clone())?;
884
885        // Take the EvalStmt from the original QueryStatement and use it to create the CatalogQueryStatement.
886        let query_statement = if let QueryStatement::Promql(eval_stmt, alias) = stmt {
887            CatalogQueryStatement::Promql(eval_stmt, alias)
888        } else {
889            // It should not happen since the query is already parsed successfully.
890            return UnexpectedResultSnafu {
891                reason: "The query should always be promql.".to_string(),
892            }
893            .fail();
894        };
895        let query = query_statement.to_string();
896
897        let slow_query_timer = self
898            .slow_query_options
899            .enable
900            .then(|| self.event_recorder.clone())
901            .flatten()
902            .map(|event_recorder| {
903                SlowQueryTimer::new(
904                    query_statement,
905                    self.slow_query_options.threshold,
906                    self.slow_query_options.sample_ratio,
907                    self.slow_query_options.record_type,
908                    event_recorder,
909                )
910            });
911
912        let ticket = self.process_manager.register_query(
913            query_ctx.current_catalog().to_string(),
914            vec![query_ctx.current_schema()],
915            query,
916            query_ctx.conn_info().to_string(),
917            Some(query_ctx.process_id()),
918            slow_query_timer,
919        );
920
921        let query_fut = self.statement_executor.exec_plan(plan, query_ctx.clone());
922
923        let output = CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
924            .await
925            .map_err(|_| servers::error::CancelledSnafu.build())?
926            .map(|output| {
927                let Output { meta, data } = output;
928                let data = match data {
929                    OutputData::Stream(stream) => {
930                        OutputData::Stream(Box::pin(CancellableStreamWrapper::new(stream, ticket)))
931                    }
932                    other => other,
933                };
934                Output { data, meta }
935            })
936            .map_err(BoxedError::new)
937            .context(ExecuteQuerySnafu)?;
938
939        Ok(interceptor.post_execute(output, query_ctx)?)
940    }
941
942    async fn query_metric_names(
943        &self,
944        matchers: Vec<Matcher>,
945        ctx: &QueryContextRef,
946    ) -> server_error::Result<Vec<String>> {
947        self.handle_query_metric_names(matchers, ctx)
948            .await
949            .map_err(BoxedError::new)
950            .context(ExecuteQuerySnafu)
951    }
952
953    async fn query_label_values(
954        &self,
955        metric: String,
956        label_name: String,
957        matchers: Vec<Matcher>,
958        start: SystemTime,
959        end: SystemTime,
960        ctx: &QueryContextRef,
961    ) -> server_error::Result<Vec<String>> {
962        self.handle_query_label_values(metric, label_name, matchers, start, end, ctx)
963            .await
964            .map_err(BoxedError::new)
965            .context(ExecuteQuerySnafu)
966    }
967
968    fn catalog_manager(&self) -> CatalogManagerRef {
969        self.catalog_manager.clone()
970    }
971}
972
973/// Validate `stmt.database` permission if it's presented.
974macro_rules! validate_db_permission {
975    ($stmt: expr, $query_ctx: expr) => {
976        if let Some(database) = &$stmt.database {
977            validate_catalog_and_schema($query_ctx.current_catalog(), database, $query_ctx)
978                .map_err(BoxedError::new)
979                .context(SqlExecInterceptedSnafu)?;
980        }
981    };
982}
983
984pub fn check_permission(
985    plugins: Plugins,
986    stmt: &Statement,
987    query_ctx: &QueryContextRef,
988) -> Result<()> {
989    let need_validate = plugins
990        .get::<QueryOptions>()
991        .map(|opts| opts.disallow_cross_catalog_query)
992        .unwrap_or_default();
993
994    if !need_validate {
995        return Ok(());
996    }
997
998    match stmt {
999        // Will be checked in execution.
1000        // TODO(dennis): add a hook for admin commands.
1001        Statement::Admin(_) => {}
1002        // These are executed by query engine, and will be checked there.
1003        Statement::Query(_)
1004        | Statement::Explain(_)
1005        | Statement::Tql(_)
1006        | Statement::Delete(_)
1007        | Statement::DeclareCursor(_)
1008        | Statement::Copy(sql::statements::copy::Copy::CopyQueryTo(_)) => {}
1009        // database ops won't be checked
1010        Statement::CreateDatabase(_)
1011        | Statement::ShowDatabases(_)
1012        | Statement::DropDatabase(_)
1013        | Statement::AlterDatabase(_)
1014        | Statement::DropFlow(_)
1015        | Statement::Use(_) => {}
1016        #[cfg(feature = "enterprise")]
1017        Statement::DropTrigger(_) => {}
1018        Statement::ShowCreateDatabase(stmt) => {
1019            validate_database(&stmt.database_name, query_ctx)?;
1020        }
1021        Statement::ShowCreateTable(stmt) => {
1022            validate_param(&stmt.table_name, query_ctx)?;
1023        }
1024        Statement::ShowCreateFlow(stmt) => {
1025            validate_flow(&stmt.flow_name, query_ctx)?;
1026        }
1027        #[cfg(feature = "enterprise")]
1028        Statement::ShowCreateTrigger(stmt) => {
1029            validate_param(&stmt.trigger_name, query_ctx)?;
1030        }
1031        Statement::ShowCreateView(stmt) => {
1032            validate_param(&stmt.view_name, query_ctx)?;
1033        }
1034        Statement::CreateExternalTable(stmt) => {
1035            validate_param(&stmt.name, query_ctx)?;
1036        }
1037        Statement::CreateFlow(stmt) => {
1038            // TODO: should also validate source table name here?
1039            validate_param(&stmt.sink_table_name, query_ctx)?;
1040        }
1041        #[cfg(feature = "enterprise")]
1042        Statement::CreateTrigger(stmt) => {
1043            validate_param(&stmt.trigger_name, query_ctx)?;
1044        }
1045        Statement::CreateView(stmt) => {
1046            validate_param(&stmt.name, query_ctx)?;
1047        }
1048        Statement::AlterTable(stmt) => {
1049            validate_param(stmt.table_name(), query_ctx)?;
1050        }
1051        #[cfg(feature = "enterprise")]
1052        Statement::AlterTrigger(_) => {}
1053        // set/show variable now only alter/show variable in session
1054        Statement::SetVariables(_) | Statement::ShowVariables(_) => {}
1055        // show charset and show collation won't be checked
1056        Statement::ShowCharset(_) | Statement::ShowCollation(_) => {}
1057
1058        Statement::Comment(comment) => match &comment.object {
1059            CommentObject::Table(table) => validate_param(table, query_ctx)?,
1060            CommentObject::Column { table, .. } => validate_param(table, query_ctx)?,
1061            CommentObject::Flow(flow) => validate_flow(flow, query_ctx)?,
1062        },
1063
1064        Statement::Insert(insert) => {
1065            let name = insert.table_name().context(ParseSqlSnafu)?;
1066            validate_param(name, query_ctx)?;
1067        }
1068        Statement::CreateTable(stmt) => {
1069            validate_param(&stmt.name, query_ctx)?;
1070        }
1071        Statement::CreateTableLike(stmt) => {
1072            validate_param(&stmt.table_name, query_ctx)?;
1073            validate_param(&stmt.source_name, query_ctx)?;
1074        }
1075        Statement::DropTable(drop_stmt) => {
1076            for table_name in drop_stmt.table_names() {
1077                validate_param(table_name, query_ctx)?;
1078            }
1079        }
1080        Statement::DropView(stmt) => {
1081            validate_param(&stmt.view_name, query_ctx)?;
1082        }
1083        Statement::ShowTables(stmt) => {
1084            validate_db_permission!(stmt, query_ctx);
1085        }
1086        Statement::ShowTableStatus(stmt) => {
1087            validate_db_permission!(stmt, query_ctx);
1088        }
1089        Statement::ShowColumns(stmt) => {
1090            validate_db_permission!(stmt, query_ctx);
1091        }
1092        Statement::ShowIndex(stmt) => {
1093            validate_db_permission!(stmt, query_ctx);
1094        }
1095        Statement::ShowRegion(stmt) => {
1096            validate_db_permission!(stmt, query_ctx);
1097        }
1098        Statement::ShowViews(stmt) => {
1099            validate_db_permission!(stmt, query_ctx);
1100        }
1101        Statement::ShowFlows(stmt) => {
1102            validate_db_permission!(stmt, query_ctx);
1103        }
1104        #[cfg(feature = "enterprise")]
1105        Statement::ShowTriggers(_stmt) => {
1106            // The trigger is organized based on the catalog dimension, so there
1107            // is no need to check the permission of the database(schema).
1108        }
1109        Statement::ShowStatus(_stmt) => {}
1110        Statement::ShowSearchPath(_stmt) => {}
1111        Statement::DescribeTable(stmt) => {
1112            validate_param(stmt.name(), query_ctx)?;
1113        }
1114        Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => match stmt {
1115            CopyTable::To(copy_table_to) => validate_param(&copy_table_to.table_name, query_ctx)?,
1116            CopyTable::From(copy_table_from) => {
1117                validate_param(&copy_table_from.table_name, query_ctx)?
1118            }
1119        },
1120        Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
1121            match copy_database {
1122                CopyDatabase::To(stmt) => validate_database(&stmt.database_name, query_ctx)?,
1123                CopyDatabase::From(stmt) => validate_database(&stmt.database_name, query_ctx)?,
1124            }
1125        }
1126        Statement::TruncateTable(stmt) => {
1127            validate_param(stmt.table_name(), query_ctx)?;
1128        }
1129        // cursor operations are always allowed once it's created
1130        Statement::FetchCursor(_) | Statement::CloseCursor(_) => {}
1131        // User can only kill process in their own catalog.
1132        Statement::Kill(_) => {}
1133        // SHOW PROCESSLIST
1134        Statement::ShowProcesslist(_) => {}
1135    }
1136    Ok(())
1137}
1138
1139fn validate_param(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1140    let (catalog, schema, _) = table_idents_to_full_name(name, query_ctx)
1141        .map_err(BoxedError::new)
1142        .context(ExternalSnafu)?;
1143
1144    validate_catalog_and_schema(&catalog, &schema, query_ctx)
1145        .map_err(BoxedError::new)
1146        .context(SqlExecInterceptedSnafu)
1147}
1148
1149fn validate_flow(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1150    let catalog = match &name.0[..] {
1151        [_flow] => query_ctx.current_catalog().to_string(),
1152        [catalog, _flow] => catalog.to_string_unquoted(),
1153        _ => {
1154            return InvalidSqlSnafu {
1155                err_msg: format!(
1156                    "expect flow name to be <catalog>.<flow_name> or <flow_name>, actual: {name}",
1157                ),
1158            }
1159            .fail();
1160        }
1161    };
1162
1163    let schema = query_ctx.current_schema();
1164
1165    validate_catalog_and_schema(&catalog, &schema, query_ctx)
1166        .map_err(BoxedError::new)
1167        .context(SqlExecInterceptedSnafu)
1168}
1169
1170fn validate_database(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1171    let (catalog, schema) = match &name.0[..] {
1172        [schema] => (
1173            query_ctx.current_catalog().to_string(),
1174            schema.to_string_unquoted(),
1175        ),
1176        [catalog, schema] => (catalog.to_string_unquoted(), schema.to_string_unquoted()),
1177        _ => InvalidSqlSnafu {
1178            err_msg: format!(
1179                "expect database name to be <catalog>.<schema> or <schema>, actual: {name}",
1180            ),
1181        }
1182        .fail()?,
1183    };
1184
1185    validate_catalog_and_schema(&catalog, &schema, query_ctx)
1186        .map_err(BoxedError::new)
1187        .context(SqlExecInterceptedSnafu)
1188}
1189
1190fn is_readonly_plan(plan: &LogicalPlan) -> bool {
1191    !matches!(plan, LogicalPlan::Dml(_) | LogicalPlan::Ddl(_))
1192}
1193
1194fn should_track_statement_process(stmt: &Statement) -> bool {
1195    stmt.is_readonly()
1196        || matches!(stmt, Statement::Insert(insert) if insert.has_non_values_query_source())
1197}
1198
1199fn should_track_plan_process(stmt: Option<&Statement>, plan: &LogicalPlan) -> bool {
1200    is_readonly_plan(plan)
1201        || matches!(stmt, Some(Statement::Insert(insert)) if insert.has_non_values_query_source())
1202}
1203
1204#[cfg(test)]
1205mod tests {
1206    use std::collections::HashMap;
1207    use std::future::Future;
1208    use std::pin::Pin;
1209    use std::sync::atomic::{AtomicBool, Ordering};
1210    use std::sync::{Arc, Barrier};
1211    use std::task::{Context, Poll};
1212    use std::thread;
1213    use std::time::{Duration, Instant};
1214
1215    use api::v1::meta::{ProcedureDetailResponse, ReconcileRequest, ReconcileResponse};
1216    use catalog::process_manager::ProcessManager;
1217    use common_base::Plugins;
1218    use common_error::ext::{BoxedError, PlainError};
1219    use common_error::status_code::StatusCode;
1220    use common_meta::cache::LayeredCacheRegistryBuilder;
1221    use common_meta::kv_backend::memory::MemoryKvBackend;
1222    use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutor};
1223    use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
1224    use common_meta::rpc::procedure::{
1225        MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
1226    };
1227    use common_query::Output;
1228    use common_recordbatch::{
1229        OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream,
1230    };
1231    use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
1232    use datafusion_expr::dml::InsertOp;
1233    use datafusion_expr::{LogicalPlanBuilder, LogicalTableSource};
1234    use datatypes::prelude::ConcreteDataType;
1235    use datatypes::schema::{ColumnSchema, Schema as GtSchema, SchemaRef as GtSchemaRef};
1236    use query::query_engine::options::QueryOptions;
1237    use session::context::{Channel, ConnInfo, QueryContext, QueryContextBuilder};
1238    use snafu::{Location, Snafu};
1239    use sql::dialect::GreptimeDbDialect;
1240    use store_api::data_source::DataSource;
1241    use store_api::storage::ScanRequest;
1242    use strfmt::Format;
1243    use table::metadata::{FilterPushDownType, TableInfo, TableInfoBuilder, TableMetaBuilder};
1244    use table::test_util::EmptyTable;
1245    use table::{Table, TableRef};
1246    use tokio::sync::{mpsc, oneshot};
1247
1248    use super::*;
1249    use crate::frontend::FrontendOptions;
1250    use crate::instance::builder::FrontendBuilder;
1251
1252    #[derive(Debug, Snafu)]
1253    enum TestError {
1254        #[snafu(display("Failed to build test cache registry"))]
1255        BuildCacheRegistry {
1256            source: cache::error::Error,
1257            #[snafu(implicit)]
1258            location: Location,
1259        },
1260
1261        #[snafu(display("Failed to build test table meta for table: {table_name}"))]
1262        BuildTableMeta {
1263            table_name: String,
1264            source: table::metadata::TableMetaBuilderError,
1265            #[snafu(implicit)]
1266            location: Location,
1267        },
1268
1269        #[snafu(display("Failed to build test table info for table: {table_name}"))]
1270        BuildTableInfo {
1271            table_name: String,
1272            source: table::metadata::TableInfoBuilderError,
1273            #[snafu(implicit)]
1274            location: Location,
1275        },
1276
1277        #[snafu(display("Failed to register test table: {table_name}"))]
1278        RegisterTable {
1279            table_name: String,
1280            source: catalog::error::Error,
1281            #[snafu(implicit)]
1282            location: Location,
1283        },
1284
1285        #[snafu(display("Failed to build test frontend instance"))]
1286        BuildFrontend {
1287            source: crate::error::Error,
1288            #[snafu(implicit)]
1289            location: Location,
1290        },
1291
1292        #[snafu(display("Expected exactly one output for SQL `{sql}`, got {actual}"))]
1293        UnexpectedOutputCount {
1294            sql: String,
1295            actual: usize,
1296            #[snafu(implicit)]
1297            location: Location,
1298        },
1299
1300        #[snafu(display("Failed to execute SQL `{sql}`"))]
1301        ExecuteSql {
1302            sql: String,
1303            source: crate::error::Error,
1304            #[snafu(implicit)]
1305            location: Location,
1306        },
1307
1308        #[snafu(display("Timed out waiting for insert-select start notification"))]
1309        InsertStartTimeout {
1310            source: tokio::time::error::Elapsed,
1311            #[snafu(implicit)]
1312            location: Location,
1313        },
1314
1315        #[snafu(display("Insert-select start notification channel closed"))]
1316        InsertStartChannelClosed {
1317            #[snafu(implicit)]
1318            location: Location,
1319        },
1320
1321        #[snafu(display("Failed to release blocking insert-select interceptor"))]
1322        ReleaseBlockedInsert {
1323            #[snafu(implicit)]
1324            location: Location,
1325        },
1326
1327        #[snafu(display("Timed out waiting for insert-select source to be polled"))]
1328        SourcePollTimeout {
1329            source: tokio::time::error::Elapsed,
1330            #[snafu(implicit)]
1331            location: Location,
1332        },
1333
1334        #[snafu(display("Insert-select source poll notification channel closed"))]
1335        SourcePollChannelClosed {
1336            source: oneshot::error::RecvError,
1337            #[snafu(implicit)]
1338            location: Location,
1339        },
1340
1341        #[snafu(display("Timed out waiting for insert task to finish"))]
1342        InsertTaskTimeout {
1343            source: tokio::time::error::Elapsed,
1344            #[snafu(implicit)]
1345            location: Location,
1346        },
1347
1348        #[snafu(display("Insert task panicked"))]
1349        InsertTaskPanic {
1350            source: tokio::task::JoinError,
1351            #[snafu(implicit)]
1352            location: Location,
1353        },
1354
1355        #[snafu(display("Expected insert-select to be cancelled"))]
1356        InsertSelectNotCancelled {
1357            #[snafu(implicit)]
1358            location: Location,
1359        },
1360    }
1361
1362    type TestResult<T> = std::result::Result<T, TestError>;
1363
1364    fn parse_one_sql(sql: &str) -> Statement {
1365        parse_stmt(sql, &GreptimeDbDialect {}).unwrap().remove(0)
1366    }
1367
1368    fn test_query_ctx(process_id: u32) -> QueryContextRef {
1369        Arc::new(
1370            QueryContextBuilder::default()
1371                .channel(Channel::Mysql)
1372                .conn_info(ConnInfo::new(None, Channel::Mysql))
1373                .process_id(process_id)
1374                .build(),
1375        )
1376    }
1377
1378    struct BlockingInsertSelectInterceptor {
1379        started_tx: mpsc::UnboundedSender<()>,
1380        finish_rx: std::sync::Mutex<Option<oneshot::Receiver<()>>>,
1381    }
1382
1383    impl BlockingInsertSelectInterceptor {
1384        fn new(started_tx: mpsc::UnboundedSender<()>, finish_rx: oneshot::Receiver<()>) -> Self {
1385            Self {
1386                started_tx,
1387                finish_rx: std::sync::Mutex::new(Some(finish_rx)),
1388            }
1389        }
1390    }
1391
1392    impl SqlQueryInterceptor for BlockingInsertSelectInterceptor {
1393        type Error = Error;
1394
1395        fn pre_execute(
1396            &self,
1397            statement: &Statement,
1398            _plan: Option<&LogicalPlan>,
1399            _query_ctx: QueryContextRef,
1400        ) -> Result<()> {
1401            let Statement::Insert(insert) = statement else {
1402                return Ok(());
1403            };
1404            if !insert.has_non_values_query_source() {
1405                return Ok(());
1406            }
1407
1408            let finish_rx = self.finish_rx.lock().unwrap().take().unwrap();
1409            let _ = self.started_tx.send(());
1410            tokio::task::block_in_place(|| {
1411                tokio::runtime::Handle::current()
1412                    .block_on(finish_rx)
1413                    .unwrap();
1414            });
1415            Ok(())
1416        }
1417    }
1418
1419    struct PendingRecordBatchStream {
1420        schema: GtSchemaRef,
1421        polled_tx: Option<oneshot::Sender<()>>,
1422        _finish_tx: oneshot::Sender<()>,
1423        finish_rx: Pin<Box<oneshot::Receiver<()>>>,
1424    }
1425
1426    impl RecordBatchStream for PendingRecordBatchStream {
1427        fn schema(&self) -> GtSchemaRef {
1428            self.schema.clone()
1429        }
1430
1431        fn output_ordering(&self) -> Option<&[OrderOption]> {
1432            None
1433        }
1434
1435        fn metrics(&self) -> Option<common_recordbatch::adapter::RecordBatchMetrics> {
1436            None
1437        }
1438    }
1439
1440    impl Stream for PendingRecordBatchStream {
1441        type Item = common_recordbatch::error::Result<RecordBatch>;
1442
1443        fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1444            if let Some(polled_tx) = self.polled_tx.take() {
1445                let _ = polled_tx.send(());
1446            }
1447
1448            match self.finish_rx.as_mut().poll(cx) {
1449                Poll::Ready(_) => Poll::Ready(None),
1450                Poll::Pending => Poll::Pending,
1451            }
1452        }
1453    }
1454
1455    impl Unpin for PendingRecordBatchStream {}
1456
1457    struct PendingDataSource {
1458        schema: GtSchemaRef,
1459        polled_tx: std::sync::Mutex<Option<oneshot::Sender<()>>>,
1460    }
1461
1462    impl DataSource for PendingDataSource {
1463        fn get_stream(
1464            &self,
1465            _request: ScanRequest,
1466        ) -> std::result::Result<SendableRecordBatchStream, BoxedError> {
1467            let (finish_tx, finish_rx) = oneshot::channel();
1468            let mut polled_tx = self.polled_tx.lock().map_err(|_| {
1469                BoxedError::new(PlainError::new(
1470                    "pending data source lock poisoned".to_string(),
1471                    StatusCode::Unexpected,
1472                ))
1473            })?;
1474            Ok(Box::pin(PendingRecordBatchStream {
1475                schema: self.schema.clone(),
1476                polled_tx: polled_tx.take(),
1477                _finish_tx: finish_tx,
1478                finish_rx: Box::pin(finish_rx),
1479            }))
1480        }
1481    }
1482
1483    struct NoopProcedureExecutor;
1484
1485    #[async_trait::async_trait]
1486    impl ProcedureExecutor for NoopProcedureExecutor {
1487        async fn submit_ddl_task(
1488            &self,
1489            _ctx: &ExecutorContext,
1490            _request: SubmitDdlTaskRequest,
1491        ) -> common_meta::error::Result<SubmitDdlTaskResponse> {
1492            common_meta::error::UnsupportedSnafu {
1493                operation: "submit_ddl_task",
1494            }
1495            .fail()
1496        }
1497
1498        async fn migrate_region(
1499            &self,
1500            _ctx: &ExecutorContext,
1501            _request: MigrateRegionRequest,
1502        ) -> common_meta::error::Result<MigrateRegionResponse> {
1503            common_meta::error::UnsupportedSnafu {
1504                operation: "migrate_region",
1505            }
1506            .fail()
1507        }
1508
1509        async fn reconcile(
1510            &self,
1511            _ctx: &ExecutorContext,
1512            _request: ReconcileRequest,
1513        ) -> common_meta::error::Result<ReconcileResponse> {
1514            common_meta::error::UnsupportedSnafu {
1515                operation: "reconcile",
1516            }
1517            .fail()
1518        }
1519
1520        async fn query_procedure_state(
1521            &self,
1522            _ctx: &ExecutorContext,
1523            _pid: &str,
1524        ) -> common_meta::error::Result<ProcedureStateResponse> {
1525            common_meta::error::UnsupportedSnafu {
1526                operation: "query_procedure_state",
1527            }
1528            .fail()
1529        }
1530
1531        async fn list_procedures(
1532            &self,
1533            _ctx: &ExecutorContext,
1534        ) -> common_meta::error::Result<ProcedureDetailResponse> {
1535            common_meta::error::UnsupportedSnafu {
1536                operation: "list_procedures",
1537            }
1538            .fail()
1539        }
1540    }
1541
1542    fn test_cache_registry(
1543        kv_backend: common_meta::kv_backend::KvBackendRef,
1544    ) -> TestResult<common_meta::cache::LayeredCacheRegistryRef> {
1545        Ok(Arc::new(
1546            cache::with_default_composite_cache_registry(
1547                LayeredCacheRegistryBuilder::default()
1548                    .add_cache_registry(cache::build_fundamental_cache_registry(kv_backend)),
1549            )
1550            .context(BuildCacheRegistrySnafu)?
1551            .build(),
1552        ))
1553    }
1554
1555    fn test_table_info(table_id: u32, table_name: &str) -> TestResult<TableInfo> {
1556        let schema = Arc::new(GtSchema::new(vec![
1557            ColumnSchema::new("id", ConcreteDataType::int32_datatype(), false),
1558            ColumnSchema::new(
1559                "ts",
1560                ConcreteDataType::timestamp_millisecond_datatype(),
1561                false,
1562            )
1563            .with_time_index(true),
1564        ]));
1565        let table_meta = TableMetaBuilder::empty()
1566            .schema(schema)
1567            .primary_key_indices(vec![0])
1568            .value_indices(vec![1])
1569            .next_column_id(1024)
1570            .build()
1571            .with_context(|_| BuildTableMetaSnafu {
1572                table_name: table_name.to_string(),
1573            })?;
1574
1575        TableInfoBuilder::new(table_name, table_meta)
1576            .table_id(table_id)
1577            .build()
1578            .with_context(|_| BuildTableInfoSnafu {
1579                table_name: table_name.to_string(),
1580            })
1581    }
1582
1583    fn test_table(table_id: u32, table_name: &str) -> TestResult<table::TableRef> {
1584        let table_info = test_table_info(table_id, table_name)?;
1585        Ok(EmptyTable::from_table_info(&table_info))
1586    }
1587
1588    fn pending_table(
1589        table_id: u32,
1590        table_name: &str,
1591        polled_tx: oneshot::Sender<()>,
1592    ) -> TestResult<table::TableRef> {
1593        let table_info = test_table_info(table_id, table_name)?;
1594        let data_source = Arc::new(PendingDataSource {
1595            schema: table_info.meta.schema.clone(),
1596            polled_tx: std::sync::Mutex::new(Some(polled_tx)),
1597        });
1598
1599        Ok(Arc::new(Table::new(
1600            Arc::new(table_info),
1601            FilterPushDownType::Unsupported,
1602            data_source,
1603        )))
1604    }
1605
1606    async fn test_instance_with_tables(
1607        source_table: TableRef,
1608        target_table: TableRef,
1609    ) -> TestResult<Instance> {
1610        test_instance_with_plugins(source_table, target_table, Plugins::new()).await
1611    }
1612
1613    async fn test_instance_with_insert_select_interceptor(
1614        interceptor: SqlQueryInterceptorRef<Error>,
1615    ) -> TestResult<Instance> {
1616        let plugins = Plugins::new();
1617        plugins.insert::<SqlQueryInterceptorRef<Error>>(interceptor);
1618
1619        test_instance_with_plugins(
1620            test_table(1024, "source")?,
1621            test_table(1025, "target")?,
1622            plugins,
1623        )
1624        .await
1625    }
1626
1627    async fn test_instance_with_plugins(
1628        source_table: TableRef,
1629        target_table: TableRef,
1630        plugins: Plugins,
1631    ) -> TestResult<Instance> {
1632        let kv_backend = Arc::new(MemoryKvBackend::new());
1633        let process_manager = Arc::new(ProcessManager::new("test-frontend".to_string(), None));
1634        let catalog_manager = catalog::memory::MemoryCatalogManager::new_with_table(source_table);
1635        let target_table_name = "target";
1636        catalog_manager
1637            .register_table_sync(catalog::RegisterTableRequest {
1638                catalog: "greptime".to_string(),
1639                schema: "public".to_string(),
1640                table_name: target_table_name.to_string(),
1641                table_id: 1025,
1642                table: target_table,
1643            })
1644            .with_context(|_| RegisterTableSnafu {
1645                table_name: target_table_name.to_string(),
1646            })?;
1647        catalog_manager.register_process_list_table(process_manager.clone());
1648
1649        let cache_registry = test_cache_registry(kv_backend.clone())?;
1650
1651        FrontendBuilder::new(
1652            FrontendOptions::default(),
1653            kv_backend,
1654            cache_registry,
1655            catalog_manager,
1656            Arc::new(client::client_manager::NodeClients::default()),
1657            Arc::new(NoopProcedureExecutor),
1658            process_manager,
1659        )
1660        .with_plugin(plugins)
1661        .try_build()
1662        .await
1663        .context(BuildFrontendSnafu)
1664    }
1665
1666    async fn execute_one_sql(
1667        instance: &Instance,
1668        sql: &str,
1669        query_ctx: QueryContextRef,
1670    ) -> TestResult<Output> {
1671        let mut results = instance.do_query_inner(sql, query_ctx).await;
1672        ensure!(
1673            results.len() == 1,
1674            UnexpectedOutputCountSnafu {
1675                sql: sql.to_string(),
1676                actual: results.len(),
1677            }
1678        );
1679        results.remove(0).with_context(|_| ExecuteSqlSnafu {
1680            sql: sql.to_string(),
1681        })
1682    }
1683
1684    #[test]
1685    fn test_fast_legacy_check_deadlock_prevention() {
1686        // Create a DashMap to simulate the cache
1687        let cache = DashMap::new();
1688
1689        // Pre-populate cache with some entries
1690        cache.insert("metric1".to_string(), true); // legacy mode
1691        cache.insert("metric2".to_string(), false); // prom mode
1692        cache.insert("metric3".to_string(), true); // legacy mode
1693
1694        // Test case 1: Normal operation with cache hits
1695        let metric1 = "metric1".to_string();
1696        let metric4 = "metric4".to_string();
1697        let names1 = vec![&metric1, &metric4];
1698        let result = fast_legacy_check(&cache, &names1);
1699        assert!(result.is_ok());
1700        assert_eq!(result.unwrap(), Some(true)); // should return legacy mode
1701
1702        // Verify that metric4 was added to cache
1703        assert!(cache.contains_key("metric4"));
1704        assert!(*cache.get("metric4").unwrap().value());
1705
1706        // Test case 2: No cache hits
1707        let metric5 = "metric5".to_string();
1708        let metric6 = "metric6".to_string();
1709        let names2 = vec![&metric5, &metric6];
1710        let result = fast_legacy_check(&cache, &names2);
1711        assert!(result.is_ok());
1712        assert_eq!(result.unwrap(), None); // should return None as no cache hits
1713
1714        // Test case 3: Incompatible modes should return error
1715        let cache_incompatible = DashMap::new();
1716        cache_incompatible.insert("metric1".to_string(), true); // legacy
1717        cache_incompatible.insert("metric2".to_string(), false); // prom
1718        let metric1_test = "metric1".to_string();
1719        let metric2_test = "metric2".to_string();
1720        let names3 = vec![&metric1_test, &metric2_test];
1721        let result = fast_legacy_check(&cache_incompatible, &names3);
1722        assert!(result.is_err()); // should error due to incompatible modes
1723
1724        // Test case 4: Intensive concurrent access to test deadlock prevention
1725        // This test specifically targets the scenario where multiple threads
1726        // access the same cache entries simultaneously
1727        let cache_concurrent = Arc::new(DashMap::new());
1728        cache_concurrent.insert("shared_metric".to_string(), true);
1729
1730        let num_threads = 8;
1731        let operations_per_thread = 100;
1732        let barrier = Arc::new(Barrier::new(num_threads));
1733        let success_flag = Arc::new(AtomicBool::new(true));
1734
1735        let handles: Vec<_> = (0..num_threads)
1736            .map(|thread_id| {
1737                let cache_clone = Arc::clone(&cache_concurrent);
1738                let barrier_clone = Arc::clone(&barrier);
1739                let success_flag_clone = Arc::clone(&success_flag);
1740
1741                thread::spawn(move || {
1742                    // Wait for all threads to be ready
1743                    barrier_clone.wait();
1744
1745                    let start_time = Instant::now();
1746                    for i in 0..operations_per_thread {
1747                        // Each operation references existing cache entry and adds new ones
1748                        let shared_metric = "shared_metric".to_string();
1749                        let new_metric = format!("thread_{}_metric_{}", thread_id, i);
1750                        let names = vec![&shared_metric, &new_metric];
1751
1752                        match fast_legacy_check(&cache_clone, &names) {
1753                            Ok(_) => {}
1754                            Err(_) => {
1755                                success_flag_clone.store(false, Ordering::Relaxed);
1756                                return;
1757                            }
1758                        }
1759
1760                        // If the test takes too long, it likely means deadlock
1761                        if start_time.elapsed() > Duration::from_secs(10) {
1762                            success_flag_clone.store(false, Ordering::Relaxed);
1763                            return;
1764                        }
1765                    }
1766                })
1767            })
1768            .collect();
1769
1770        // Join all threads with timeout
1771        let start_time = Instant::now();
1772        for (i, handle) in handles.into_iter().enumerate() {
1773            let join_result = handle.join();
1774
1775            // Check if we're taking too long (potential deadlock)
1776            if start_time.elapsed() > Duration::from_secs(30) {
1777                panic!("Test timed out - possible deadlock detected!");
1778            }
1779
1780            if join_result.is_err() {
1781                panic!("Thread {} panicked during execution", i);
1782            }
1783        }
1784
1785        // Verify all operations completed successfully
1786        assert!(
1787            success_flag.load(Ordering::Relaxed),
1788            "Some operations failed"
1789        );
1790
1791        // Verify that many new entries were added (proving operations completed)
1792        let final_count = cache_concurrent.len();
1793        assert!(
1794            final_count > 1 + num_threads * operations_per_thread / 2,
1795            "Expected more cache entries, got {}",
1796            final_count
1797        );
1798    }
1799
1800    #[test]
1801    fn test_should_track_statement_process() {
1802        assert!(should_track_statement_process(&parse_one_sql(
1803            "SELECT * FROM demo"
1804        )));
1805        assert!(should_track_statement_process(&parse_one_sql(
1806            "INSERT INTO demo SELECT * FROM source"
1807        )));
1808        assert!(!should_track_statement_process(&parse_one_sql(
1809            "INSERT INTO demo VALUES (1)"
1810        )));
1811        assert!(!should_track_statement_process(&parse_one_sql(
1812            "INSERT INTO demo VALUES (now())"
1813        )));
1814    }
1815
1816    #[test]
1817    fn test_should_track_plan_process() {
1818        let select_stmt = parse_one_sql("SELECT * FROM demo");
1819        let insert_select_stmt = parse_one_sql("INSERT INTO demo SELECT * FROM source");
1820        let insert_values_stmt = parse_one_sql("INSERT INTO demo VALUES (now())");
1821
1822        let empty_plan = LogicalPlanBuilder::empty(false).build().unwrap();
1823        assert!(should_track_plan_process(Some(&select_stmt), &empty_plan));
1824        assert!(should_track_plan_process(
1825            Some(&insert_select_stmt),
1826            &insert_dml_plan()
1827        ));
1828        assert!(!should_track_plan_process(
1829            Some(&insert_values_stmt),
1830            &insert_dml_plan()
1831        ));
1832        assert!(!should_track_plan_process(None, &insert_dml_plan()));
1833    }
1834
1835    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1836    async fn test_insert_select_is_visible_in_show_processlist() -> TestResult<()> {
1837        let insert_sql = "INSERT INTO target SELECT * FROM source";
1838        let (started_tx, mut started_rx) = mpsc::unbounded_channel();
1839        let (finish_tx, finish_rx) = oneshot::channel();
1840        let interceptor = Arc::new(BlockingInsertSelectInterceptor::new(started_tx, finish_rx));
1841        let instance = Arc::new(test_instance_with_insert_select_interceptor(interceptor).await?);
1842
1843        let insert_task = tokio::spawn({
1844            let instance = instance.clone();
1845            async move { execute_one_sql(&instance, insert_sql, test_query_ctx(4242)).await }
1846        });
1847
1848        tokio::time::timeout(Duration::from_secs(5), started_rx.recv())
1849            .await
1850            .context(InsertStartTimeoutSnafu)?
1851            .context(InsertStartChannelClosedSnafu)?;
1852
1853        let output = execute_one_sql(&instance, "SHOW PROCESSLIST", test_query_ctx(43)).await?;
1854        let process_list = output.data.pretty_print().await;
1855        assert!(
1856            process_list.contains(insert_sql),
1857            "process list did not contain running insert:\n{process_list}"
1858        );
1859
1860        finish_tx
1861            .send(())
1862            .map_err(|_| ReleaseBlockedInsertSnafu.build())?;
1863        insert_task.await.context(InsertTaskPanicSnafu)??;
1864
1865        Ok(())
1866    }
1867
1868    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1869    async fn test_kill_query_cancels_insert_select() -> TestResult<()> {
1870        assert_kill_cancels_insert_select("KILL QUERY 4242").await
1871    }
1872
1873    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1874    async fn test_kill_process_id_cancels_insert_select() -> TestResult<()> {
1875        assert_kill_cancels_insert_select("KILL 'test-frontend/4242'").await
1876    }
1877
1878    async fn assert_kill_cancels_insert_select(kill_sql: &str) -> TestResult<()> {
1879        let insert_sql = "INSERT INTO target SELECT * FROM source";
1880        let (source_polled_tx, source_polled_rx) = oneshot::channel();
1881        let instance = Arc::new(
1882            test_instance_with_tables(
1883                pending_table(1024, "source", source_polled_tx)?,
1884                test_table(1025, "target")?,
1885            )
1886            .await?,
1887        );
1888
1889        let insert_task = tokio::spawn({
1890            let instance = instance.clone();
1891            async move { execute_one_sql(&instance, insert_sql, test_query_ctx(4242)).await }
1892        });
1893
1894        tokio::time::timeout(Duration::from_secs(5), source_polled_rx)
1895            .await
1896            .context(SourcePollTimeoutSnafu)?
1897            .context(SourcePollChannelClosedSnafu)?;
1898
1899        let output = execute_one_sql(&instance, kill_sql, test_query_ctx(43)).await?;
1900        assert!(matches!(output.data, OutputData::AffectedRows(1)));
1901
1902        let insert_result = tokio::time::timeout(Duration::from_secs(5), insert_task)
1903            .await
1904            .context(InsertTaskTimeoutSnafu)?
1905            .context(InsertTaskPanicSnafu)?;
1906        let err = match insert_result {
1907            Ok(_) => return InsertSelectNotCancelledSnafu.fail(),
1908            Err(TestError::ExecuteSql { source, .. }) => source,
1909            Err(err) => return Err(err),
1910        };
1911        assert_eq!(StatusCode::Cancelled, err.status_code());
1912
1913        let output = execute_one_sql(&instance, "SHOW PROCESSLIST", test_query_ctx(43)).await?;
1914        let process_list = output.data.pretty_print().await;
1915        assert!(
1916            !process_list.contains(insert_sql),
1917            "process list still contains killed insert:\n{process_list}"
1918        );
1919
1920        Ok(())
1921    }
1922
1923    fn insert_dml_plan() -> LogicalPlan {
1924        let schema = SchemaRef::new(Schema::new(vec![Field::new(
1925            "value",
1926            DataType::Int64,
1927            true,
1928        )]));
1929        let target = Arc::new(LogicalTableSource::new(schema));
1930        let input = LogicalPlanBuilder::empty(false).build().unwrap();
1931
1932        LogicalPlanBuilder::insert_into(input, "demo", target, InsertOp::Append)
1933            .unwrap()
1934            .build()
1935            .unwrap()
1936    }
1937
1938    #[test]
1939    fn test_exec_validation() {
1940        let query_ctx = QueryContext::arc();
1941        let plugins: Plugins = Plugins::new();
1942        plugins.insert(QueryOptions {
1943            disallow_cross_catalog_query: true,
1944        });
1945
1946        let sql = r#"
1947        SELECT * FROM demo;
1948        EXPLAIN SELECT * FROM demo;
1949        CREATE DATABASE test_database;
1950        SHOW DATABASES;
1951        "#;
1952        let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1953        assert_eq!(stmts.len(), 4);
1954        for stmt in stmts {
1955            let re = check_permission(plugins.clone(), &stmt, &query_ctx);
1956            re.unwrap();
1957        }
1958
1959        let sql = r#"
1960        SHOW CREATE TABLE demo;
1961        ALTER TABLE demo ADD COLUMN new_col INT;
1962        "#;
1963        let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1964        assert_eq!(stmts.len(), 2);
1965        for stmt in stmts {
1966            let re = check_permission(plugins.clone(), &stmt, &query_ctx);
1967            re.unwrap();
1968        }
1969
1970        fn replace_test(template_sql: &str, plugins: Plugins, query_ctx: &QueryContextRef) {
1971            // test right
1972            let right = vec![("", ""), ("", "public."), ("greptime.", "public.")];
1973            for (catalog, schema) in right {
1974                let sql = do_fmt(template_sql, catalog, schema);
1975                do_test(&sql, plugins.clone(), query_ctx, true);
1976            }
1977
1978            let wrong = vec![
1979                ("wrongcatalog.", "public."),
1980                ("wrongcatalog.", "wrongschema."),
1981            ];
1982            for (catalog, schema) in wrong {
1983                let sql = do_fmt(template_sql, catalog, schema);
1984                do_test(&sql, plugins.clone(), query_ctx, false);
1985            }
1986        }
1987
1988        fn do_fmt(template: &str, catalog: &str, schema: &str) -> String {
1989            let vars = HashMap::from([
1990                ("catalog".to_string(), catalog),
1991                ("schema".to_string(), schema),
1992            ]);
1993            template.format(&vars).unwrap()
1994        }
1995
1996        fn do_test(sql: &str, plugins: Plugins, query_ctx: &QueryContextRef, is_ok: bool) {
1997            let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
1998            let re = check_permission(plugins, stmt, query_ctx);
1999            if is_ok {
2000                re.unwrap();
2001            } else {
2002                assert!(re.is_err());
2003            }
2004        }
2005
2006        // test insert
2007        let sql = "INSERT INTO {catalog}{schema}monitor(host) VALUES ('host1');";
2008        replace_test(sql, plugins.clone(), &query_ctx);
2009
2010        // test create table
2011        let sql = r#"CREATE TABLE {catalog}{schema}demo(
2012                            host STRING,
2013                            ts TIMESTAMP,
2014                            TIME INDEX (ts),
2015                            PRIMARY KEY(host)
2016                        ) engine=mito;"#;
2017        replace_test(sql, plugins.clone(), &query_ctx);
2018
2019        // test drop table
2020        let sql = "DROP TABLE {catalog}{schema}demo;";
2021        replace_test(sql, plugins.clone(), &query_ctx);
2022
2023        // test show tables
2024        let sql = "SHOW TABLES FROM public";
2025        let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
2026        check_permission(plugins.clone(), &stmt[0], &query_ctx).unwrap();
2027
2028        let sql = "SHOW TABLES FROM private";
2029        let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
2030        let re = check_permission(plugins.clone(), &stmt[0], &query_ctx);
2031        assert!(re.is_ok());
2032
2033        // test describe table
2034        let sql = "DESC TABLE {catalog}{schema}demo;";
2035        replace_test(sql, plugins.clone(), &query_ctx);
2036
2037        let comment_flow_cases = [
2038            ("COMMENT ON FLOW my_flow IS 'comment';", true),
2039            ("COMMENT ON FLOW greptime.my_flow IS 'comment';", true),
2040            ("COMMENT ON FLOW wrongcatalog.my_flow IS 'comment';", false),
2041        ];
2042        for (sql, is_ok) in comment_flow_cases {
2043            let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
2044            let result = check_permission(plugins.clone(), stmt, &query_ctx);
2045            assert_eq!(result.is_ok(), is_ok);
2046        }
2047
2048        let show_flow_cases = [
2049            ("SHOW CREATE FLOW my_flow;", true),
2050            ("SHOW CREATE FLOW greptime.my_flow;", true),
2051            ("SHOW CREATE FLOW wrongcatalog.my_flow;", false),
2052        ];
2053        for (sql, is_ok) in show_flow_cases {
2054            let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
2055            let result = check_permission(plugins.clone(), stmt, &query_ctx);
2056            assert_eq!(result.is_ok(), is_ok);
2057        }
2058    }
2059}