Skip to main content

frontend/
instance.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod builder;
16mod dashboard;
17mod grpc;
18mod influxdb;
19mod jaeger;
20mod log_handler;
21mod logs;
22mod opentsdb;
23mod otlp;
24pub mod prom_store;
25mod promql;
26mod region_query;
27pub mod standalone;
28
29use std::pin::Pin;
30use std::sync::atomic::AtomicBool;
31use std::sync::{Arc, atomic};
32use std::time::{Duration, SystemTime};
33
34use async_stream::stream;
35use async_trait::async_trait;
36use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
37use catalog::CatalogManagerRef;
38use catalog::process_manager::{
39    ProcessManagerRef, QueryStatement as CatalogQueryStatement, SlowQueryTimer,
40};
41use client::OutputData;
42use common_base::Plugins;
43use common_base::cancellation::CancellableFuture;
44use common_error::ext::{BoxedError, ErrorExt};
45use common_event_recorder::EventRecorderRef;
46use common_meta::cache_invalidator::CacheInvalidatorRef;
47use common_meta::key::TableMetadataManagerRef;
48use common_meta::key::table_name::TableNameKey;
49use common_meta::node_manager::NodeManagerRef;
50use common_meta::procedure_executor::ProcedureExecutorRef;
51use common_query::Output;
52use common_recordbatch::RecordBatchStreamWrapper;
53use common_recordbatch::error::StreamTimeoutSnafu;
54use common_telemetry::logging::SlowQueryOptions;
55use common_telemetry::{debug, error, tracing};
56use dashmap::DashMap;
57use datafusion_expr::LogicalPlan;
58use futures::{Stream, StreamExt};
59use lazy_static::lazy_static;
60use operator::delete::DeleterRef;
61use operator::insert::InserterRef;
62use operator::statement::{StatementExecutor, StatementExecutorRef};
63use partition::manager::PartitionRuleManagerRef;
64use pipeline::pipeline_operator::PipelineOperator;
65use prometheus::HistogramTimer;
66use promql_parser::label::Matcher;
67use query::QueryEngineRef;
68use query::metrics::OnDone;
69use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
70use query::query_engine::DescribeResult;
71use query::query_engine::options::{QueryOptions, validate_catalog_and_schema};
72use servers::error::{
73    self as server_error, AuthSnafu, CommonMetaSnafu, ExecuteQuerySnafu,
74    OtlpMetricModeIncompatibleSnafu, ParsePromQLSnafu, UnexpectedResultSnafu,
75};
76use servers::interceptor::{
77    PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef,
78};
79use servers::otlp::metrics::legacy_normalize_otlp_name;
80use servers::prometheus_handler::PrometheusHandler;
81use servers::query_handler::sql::SqlQueryHandler;
82use session::context::{Channel, QueryContextRef};
83use session::table_name::table_idents_to_full_name;
84use snafu::prelude::*;
85use sql::ast::ObjectNamePartExt;
86use sql::dialect::Dialect;
87use sql::parser::{ParseOptions, ParserContext};
88use sql::statements::comment::CommentObject;
89use sql::statements::copy::{CopyDatabase, CopyTable};
90use sql::statements::statement::Statement;
91use sql::statements::tql::Tql;
92use sqlparser::ast::ObjectName;
93pub use standalone::StandaloneDatanodeManager;
94use table::requests::{OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM};
95use tracing::Span;
96
97use crate::error::{
98    self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, InvalidSqlSnafu,
99    ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu,
100    StatementTimeoutSnafu, TableOperationSnafu,
101};
102use crate::service_config::InfluxdbMergeMode;
103use crate::stream_wrapper::CancellableStreamWrapper;
104
105lazy_static! {
106    static ref OTLP_LEGACY_DEFAULT_VALUE: String = "legacy".to_string();
107}
108
109/// The frontend instance contains necessary components, and implements many
110/// traits, like [`servers::query_handler::grpc::GrpcQueryHandler`],
111/// [`servers::query_handler::sql::SqlQueryHandler`], etc.
112#[derive(Clone)]
113pub struct Instance {
114    frontend_peer_addr: String,
115    catalog_manager: CatalogManagerRef,
116    pipeline_operator: Arc<PipelineOperator>,
117    statement_executor: Arc<StatementExecutor>,
118    query_engine: QueryEngineRef,
119    plugins: Plugins,
120    inserter: InserterRef,
121    deleter: DeleterRef,
122    table_metadata_manager: TableMetadataManagerRef,
123    event_recorder: Option<EventRecorderRef>,
124    process_manager: ProcessManagerRef,
125    slow_query_options: SlowQueryOptions,
126    influxdb_default_merge_mode: InfluxdbMergeMode,
127    suspend: Arc<AtomicBool>,
128
129    // cache for otlp metrics
130    // first layer key: db-string
131    // key: direct input metric name
132    // value: if runs in legacy mode
133    otlp_metrics_table_legacy_cache: DashMap<String, DashMap<String, bool>>,
134}
135
136impl Instance {
137    pub fn frontend_peer_addr(&self) -> &str {
138        &self.frontend_peer_addr
139    }
140
141    pub fn catalog_manager(&self) -> &CatalogManagerRef {
142        &self.catalog_manager
143    }
144
145    pub fn query_engine(&self) -> &QueryEngineRef {
146        &self.query_engine
147    }
148
149    pub fn plugins(&self) -> &Plugins {
150        &self.plugins
151    }
152
153    pub fn statement_executor(&self) -> &StatementExecutorRef {
154        &self.statement_executor
155    }
156
157    pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
158        &self.table_metadata_manager
159    }
160
161    pub fn inserter(&self) -> &InserterRef {
162        &self.inserter
163    }
164
165    pub fn process_manager(&self) -> &ProcessManagerRef {
166        &self.process_manager
167    }
168
169    pub fn node_manager(&self) -> &NodeManagerRef {
170        self.inserter.node_manager()
171    }
172
173    pub fn partition_manager(&self) -> &PartitionRuleManagerRef {
174        self.inserter.partition_manager()
175    }
176
177    pub fn cache_invalidator(&self) -> &CacheInvalidatorRef {
178        self.statement_executor.cache_invalidator()
179    }
180
181    pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
182        self.statement_executor.procedure_executor()
183    }
184
185    pub fn suspend_state(&self) -> Arc<AtomicBool> {
186        self.suspend.clone()
187    }
188
189    pub(crate) fn is_suspended(&self) -> bool {
190        self.suspend.load(atomic::Ordering::Relaxed)
191    }
192}
193
194fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result<Vec<Statement>> {
195    ParserContext::create_with_dialect(sql, dialect, ParseOptions::default()).context(ParseSqlSnafu)
196}
197
198impl Instance {
199    async fn query_statement(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
200        check_permission(self.plugins.clone(), &stmt, &query_ctx)?;
201
202        let query_interceptor = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
203        let query_interceptor = query_interceptor.as_ref();
204
205        if stmt.is_readonly() {
206            let slow_query_timer = self
207                .slow_query_options
208                .enable
209                .then(|| self.event_recorder.clone())
210                .flatten()
211                .map(|event_recorder| {
212                    SlowQueryTimer::new(
213                        CatalogQueryStatement::Sql(stmt.clone()),
214                        self.slow_query_options.threshold,
215                        self.slow_query_options.sample_ratio,
216                        self.slow_query_options.record_type,
217                        event_recorder,
218                    )
219                });
220
221            let ticket = self.process_manager.register_query(
222                query_ctx.current_catalog().to_string(),
223                vec![query_ctx.current_schema()],
224                stmt.to_string(),
225                query_ctx.conn_info().to_string(),
226                Some(query_ctx.process_id()),
227                slow_query_timer,
228            );
229
230            let query_fut = self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor);
231
232            CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
233                .await
234                .map_err(|_| error::CancelledSnafu.build())?
235                .map(|output| {
236                    let Output { meta, data } = output;
237
238                    let data = match data {
239                        OutputData::Stream(stream) => OutputData::Stream(Box::pin(
240                            CancellableStreamWrapper::new(stream, ticket),
241                        )),
242                        other => other,
243                    };
244                    Output { data, meta }
245                })
246        } else {
247            self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor)
248                .await
249        }
250    }
251
252    async fn exec_statement_with_timeout(
253        &self,
254        stmt: Statement,
255        query_ctx: QueryContextRef,
256        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
257    ) -> Result<Output> {
258        let timeout = derive_timeout(&stmt, &query_ctx);
259        match timeout {
260            Some(timeout) => {
261                let start = tokio::time::Instant::now();
262                let output = tokio::time::timeout(
263                    timeout,
264                    self.exec_statement(stmt, query_ctx, query_interceptor),
265                )
266                .await
267                .map_err(|_| StatementTimeoutSnafu.build())??;
268                // compute remaining timeout
269                let remaining_timeout = timeout.checked_sub(start.elapsed()).unwrap_or_default();
270                attach_timeout(output, remaining_timeout)
271            }
272            None => {
273                self.exec_statement(stmt, query_ctx, query_interceptor)
274                    .await
275            }
276        }
277    }
278
279    async fn exec_statement(
280        &self,
281        stmt: Statement,
282        query_ctx: QueryContextRef,
283        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
284    ) -> Result<Output> {
285        match stmt {
286            Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
287                // TODO: remove this when format is supported in datafusion
288                if let Statement::Explain(explain) = &stmt
289                    && let Some(format) = explain.format()
290                {
291                    query_ctx.set_explain_format(format.to_string());
292                }
293
294                self.plan_and_exec_sql(stmt, &query_ctx, query_interceptor)
295                    .await
296            }
297            Statement::Tql(tql) => {
298                self.plan_and_exec_tql(&query_ctx, query_interceptor, tql)
299                    .await
300            }
301            _ => {
302                query_interceptor.pre_execute(&stmt, None, query_ctx.clone())?;
303                self.statement_executor
304                    .execute_sql(stmt, query_ctx)
305                    .await
306                    .context(TableOperationSnafu)
307            }
308        }
309    }
310
311    async fn plan_and_exec_sql(
312        &self,
313        stmt: Statement,
314        query_ctx: &QueryContextRef,
315        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
316    ) -> Result<Output> {
317        let stmt = QueryStatement::Sql(stmt);
318        let plan = self
319            .statement_executor
320            .plan(&stmt, query_ctx.clone())
321            .await?;
322        let QueryStatement::Sql(stmt) = stmt else {
323            unreachable!()
324        };
325        query_interceptor.pre_execute(&stmt, Some(&plan), query_ctx.clone())?;
326
327        self.statement_executor
328            .exec_plan(plan, query_ctx.clone())
329            .await
330            .context(TableOperationSnafu)
331    }
332
333    async fn plan_and_exec_tql(
334        &self,
335        query_ctx: &QueryContextRef,
336        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
337        tql: Tql,
338    ) -> Result<Output> {
339        let plan = self
340            .statement_executor
341            .plan_tql(tql.clone(), query_ctx)
342            .await?;
343        query_interceptor.pre_execute(&Statement::Tql(tql), Some(&plan), query_ctx.clone())?;
344        self.statement_executor
345            .exec_plan(plan, query_ctx.clone())
346            .await
347            .context(TableOperationSnafu)
348    }
349
350    async fn check_otlp_legacy(
351        &self,
352        names: &[&String],
353        ctx: QueryContextRef,
354    ) -> server_error::Result<bool> {
355        let db_string = ctx.get_db_string();
356        // fast cache check
357        let cache = self
358            .otlp_metrics_table_legacy_cache
359            .entry(db_string.clone())
360            .or_default();
361        if let Some(flag) = fast_legacy_check(&cache, names)? {
362            return Ok(flag);
363        }
364        // release cache reference to avoid lock contention
365        drop(cache);
366
367        let catalog = ctx.current_catalog();
368        let schema = ctx.current_schema();
369
370        // query legacy table names
371        let normalized_names = names
372            .iter()
373            .map(|n| legacy_normalize_otlp_name(n))
374            .collect::<Vec<_>>();
375        let table_names = normalized_names
376            .iter()
377            .map(|n| TableNameKey::new(catalog, &schema, n))
378            .collect::<Vec<_>>();
379        let table_values = self
380            .table_metadata_manager()
381            .table_name_manager()
382            .batch_get(table_names)
383            .await
384            .context(CommonMetaSnafu)?;
385        let table_ids = table_values
386            .into_iter()
387            .filter_map(|v| v.map(|vi| vi.table_id()))
388            .collect::<Vec<_>>();
389
390        // means no existing table is found, use new mode
391        if table_ids.is_empty() {
392            let cache = self
393                .otlp_metrics_table_legacy_cache
394                .entry(db_string)
395                .or_default();
396            names.iter().for_each(|name| {
397                cache.insert((*name).clone(), false);
398            });
399            return Ok(false);
400        }
401
402        // has existing table, check table options
403        let table_infos = self
404            .table_metadata_manager()
405            .table_info_manager()
406            .batch_get(&table_ids)
407            .await
408            .context(CommonMetaSnafu)?;
409        let options = table_infos
410            .values()
411            .map(|info| {
412                info.table_info
413                    .meta
414                    .options
415                    .extra_options
416                    .get(OTLP_METRIC_COMPAT_KEY)
417                    .unwrap_or(&OTLP_LEGACY_DEFAULT_VALUE)
418            })
419            .collect::<Vec<_>>();
420        let cache = self
421            .otlp_metrics_table_legacy_cache
422            .entry(db_string)
423            .or_default();
424        if !options.is_empty() {
425            // check value consistency
426            let has_prom = options.iter().any(|opt| *opt == OTLP_METRIC_COMPAT_PROM);
427            let has_legacy = options
428                .iter()
429                .any(|opt| *opt == OTLP_LEGACY_DEFAULT_VALUE.as_str());
430            ensure!(!(has_prom && has_legacy), OtlpMetricModeIncompatibleSnafu);
431            let flag = has_legacy;
432            names.iter().for_each(|name| {
433                cache.insert((*name).clone(), flag);
434            });
435            Ok(flag)
436        } else {
437            // no table info, use new mode
438            names.iter().for_each(|name| {
439                cache.insert((*name).clone(), false);
440            });
441            Ok(false)
442        }
443    }
444}
445
446fn fast_legacy_check(
447    cache: &DashMap<String, bool>,
448    names: &[&String],
449) -> server_error::Result<Option<bool>> {
450    let hit_cache = names
451        .iter()
452        .filter_map(|name| cache.get(*name))
453        .collect::<Vec<_>>();
454    if !hit_cache.is_empty() {
455        let hit_legacy = hit_cache.iter().any(|en| *en.value());
456        let hit_prom = hit_cache.iter().any(|en| !*en.value());
457
458        // hit but have true and false, means both legacy and new mode are used
459        // we cannot handle this case, so return error
460        // add doc links in err msg later
461        ensure!(!(hit_legacy && hit_prom), OtlpMetricModeIncompatibleSnafu);
462
463        let flag = hit_legacy;
464        // drop hit_cache to release references before inserting to avoid deadlock
465        drop(hit_cache);
466
467        // set cache for all names
468        names.iter().for_each(|name| {
469            if !cache.contains_key(*name) {
470                cache.insert((*name).clone(), flag);
471            }
472        });
473        Ok(Some(flag))
474    } else {
475        Ok(None)
476    }
477}
478
479/// If the relevant variables are set, the timeout is enforced for all PostgreSQL statements.
480/// For MySQL, it applies only to read-only statements.
481fn derive_timeout(stmt: &Statement, query_ctx: &QueryContextRef) -> Option<Duration> {
482    let query_timeout = query_ctx.query_timeout()?;
483    if query_timeout.is_zero() {
484        return None;
485    }
486    match query_ctx.channel() {
487        Channel::Mysql if stmt.is_readonly() => Some(query_timeout),
488        Channel::Postgres => Some(query_timeout),
489        _ => None,
490    }
491}
492
493/// Derives timeout for plan execution.
494fn derive_timeout_for_plan(plan: &LogicalPlan, query_ctx: &QueryContextRef) -> Option<Duration> {
495    let query_timeout = query_ctx.query_timeout()?;
496    if query_timeout.is_zero() {
497        return None;
498    }
499    match query_ctx.channel() {
500        Channel::Mysql if is_readonly_plan(plan) => Some(query_timeout),
501        Channel::Postgres => Some(query_timeout),
502        _ => None,
503    }
504}
505
506fn attach_timeout(output: Output, mut timeout: Duration) -> Result<Output> {
507    if timeout.is_zero() {
508        return StatementTimeoutSnafu.fail();
509    }
510
511    let output = match output.data {
512        OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
513        OutputData::Stream(mut stream) => {
514            let schema = stream.schema();
515            let s = Box::pin(stream! {
516                let mut start = tokio::time::Instant::now();
517                while let Some(item) = tokio::time::timeout(timeout, stream.next()).await.map_err(|_| StreamTimeoutSnafu.build())? {
518                    yield item;
519
520                    let now = tokio::time::Instant::now();
521                    timeout = timeout.checked_sub(now - start).unwrap_or(Duration::ZERO);
522                    start = now;
523                    // tokio::time::timeout may not return an error immediately when timeout is 0.
524                    if timeout.is_zero() {
525                        StreamTimeoutSnafu.fail()?;
526                    }
527                }
528            }) as Pin<Box<dyn Stream<Item = _> + Send>>;
529            let stream = RecordBatchStreamWrapper {
530                schema,
531                stream: s,
532                output_ordering: None,
533                metrics: Default::default(),
534                span: Span::current(),
535            };
536            Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
537        }
538    };
539
540    Ok(output)
541}
542
543impl Instance {
544    #[tracing::instrument(skip_all, name = "SqlQueryHandler::do_query")]
545    async fn do_query_inner(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
546        if self.is_suspended() {
547            return vec![error::SuspendedSnafu {}.fail()];
548        }
549
550        let query_interceptor_opt = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
551        let query_interceptor = query_interceptor_opt.as_ref();
552        let query = match query_interceptor.pre_parsing(query, query_ctx.clone()) {
553            Ok(q) => q,
554            Err(e) => return vec![Err(e)],
555        };
556
557        let checker_ref = self.plugins.get::<PermissionCheckerRef>();
558        let checker = checker_ref.as_ref();
559
560        match parse_stmt(query.as_ref(), query_ctx.sql_dialect())
561            .and_then(|stmts| query_interceptor.post_parsing(stmts, query_ctx.clone()))
562        {
563            Ok(stmts) => {
564                if stmts.is_empty() {
565                    return vec![
566                        InvalidSqlSnafu {
567                            err_msg: "empty statements",
568                        }
569                        .fail(),
570                    ];
571                }
572
573                let mut results = Vec::with_capacity(stmts.len());
574                for stmt in stmts {
575                    if let Err(e) = checker
576                        .check_permission(
577                            query_ctx.current_user(),
578                            PermissionReq::SqlStatement(&stmt),
579                        )
580                        .context(PermissionSnafu)
581                    {
582                        results.push(Err(e));
583                        break;
584                    }
585
586                    match self.query_statement(stmt.clone(), query_ctx.clone()).await {
587                        Ok(output) => {
588                            let output_result =
589                                query_interceptor.post_execute(output, query_ctx.clone());
590                            results.push(output_result);
591                        }
592                        Err(e) => {
593                            if e.status_code().should_log_error() {
594                                error!(e; "Failed to execute query: {stmt}");
595                            } else {
596                                debug!("Failed to execute query: {stmt}, {e}");
597                            }
598                            results.push(Err(e));
599                            break;
600                        }
601                    }
602                }
603                results
604            }
605            Err(e) => {
606                vec![Err(e)]
607            }
608        }
609    }
610
611    async fn exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
612        self.query_engine
613            .execute(plan, query_ctx)
614            .await
615            .context(ExecLogicalPlanSnafu)
616    }
617
618    async fn exec_plan_with_timeout(
619        &self,
620        plan: LogicalPlan,
621        query_ctx: QueryContextRef,
622    ) -> Result<Output> {
623        let timeout = derive_timeout_for_plan(&plan, &query_ctx);
624        match timeout {
625            Some(timeout) => {
626                let start = tokio::time::Instant::now();
627                let output = tokio::time::timeout(timeout, self.exec_plan(plan, query_ctx))
628                    .await
629                    .map_err(|_| StatementTimeoutSnafu.build())??;
630                let remaining_timeout = timeout.checked_sub(start.elapsed()).unwrap_or_default();
631                attach_timeout(output, remaining_timeout)
632            }
633            None => self.exec_plan(plan, query_ctx).await,
634        }
635    }
636
637    async fn do_exec_plan_inner(
638        &self,
639        plan: LogicalPlan,
640        stmt: Option<Statement>,
641        query_ctx: QueryContextRef,
642    ) -> Result<Output> {
643        ensure!(!self.is_suspended(), error::SuspendedSnafu);
644
645        let query_interceptor_opt = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
646        let query_interceptor = query_interceptor_opt.as_ref();
647
648        if let Some(ref s) = stmt {
649            query_interceptor.pre_execute(s, Some(&plan), query_ctx.clone())?;
650        }
651
652        let query = stmt
653            .as_ref()
654            .map(|s| s.to_string())
655            .unwrap_or_else(|| plan.display_indent().to_string());
656
657        let result = if is_readonly_plan(&plan) {
658            let slow_query_timer = self
659                .slow_query_options
660                .enable
661                .then(|| self.event_recorder.clone())
662                .flatten()
663                .map(|event_recorder| {
664                    SlowQueryTimer::new(
665                        CatalogQueryStatement::Plan(query.clone()),
666                        self.slow_query_options.threshold,
667                        self.slow_query_options.sample_ratio,
668                        self.slow_query_options.record_type,
669                        event_recorder,
670                    )
671                });
672
673            let ticket = self.process_manager.register_query(
674                query_ctx.current_catalog().to_string(),
675                vec![query_ctx.current_schema()],
676                query,
677                query_ctx.conn_info().to_string(),
678                Some(query_ctx.process_id()),
679                slow_query_timer,
680            );
681
682            let query_fut = self.exec_plan_with_timeout(plan, query_ctx.clone());
683
684            CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
685                .await
686                .map_err(|_| error::CancelledSnafu.build())?
687                .map(|output| {
688                    let Output { meta, data } = output;
689
690                    let data = match data {
691                        OutputData::Stream(stream) => OutputData::Stream(Box::pin(
692                            CancellableStreamWrapper::new(stream, ticket),
693                        )),
694                        other => other,
695                    };
696                    Output { data, meta }
697                })
698        } else {
699            self.exec_plan_with_timeout(plan, query_ctx.clone()).await
700        };
701
702        result.and_then(|output| query_interceptor.post_execute(output, query_ctx))
703    }
704
705    #[tracing::instrument(skip_all, name = "SqlQueryHandler::do_promql_query")]
706    async fn do_promql_query_inner(
707        &self,
708        query: &PromQuery,
709        query_ctx: QueryContextRef,
710    ) -> Vec<Result<Output>> {
711        if self.is_suspended() {
712            return vec![error::SuspendedSnafu {}.fail()];
713        }
714
715        // check will be done in prometheus handler's do_query
716        let result = PrometheusHandler::do_query(self, query, query_ctx)
717            .await
718            .with_context(|_| ExecutePromqlSnafu {
719                query: format!("{query:?}"),
720            });
721        vec![result]
722    }
723
724    async fn do_describe_inner(
725        &self,
726        stmt: Statement,
727        query_ctx: QueryContextRef,
728    ) -> Result<Option<DescribeResult>> {
729        ensure!(!self.is_suspended(), error::SuspendedSnafu);
730
731        // EXPLAIN / EXPLAIN ANALYZE wrap an inner statement; describe them when the
732        // wrapped statement is something we already plan (so that bind parameters
733        // in the inner query get their types inferred). See #8029.
734        let is_inner_plannable = |s: &Statement| {
735            matches!(
736                s,
737                Statement::Insert(_) | Statement::Query(_) | Statement::Delete(_)
738            )
739        };
740        let plannable = is_inner_plannable(&stmt)
741            || matches!(&stmt, Statement::Explain(explain) if is_inner_plannable(explain.statement.as_ref()));
742
743        if plannable {
744            self.plugins
745                .get::<PermissionCheckerRef>()
746                .as_ref()
747                .check_permission(query_ctx.current_user(), PermissionReq::SqlStatement(&stmt))
748                .context(PermissionSnafu)?;
749
750            let plan = self
751                .query_engine
752                .planner()
753                .plan(&QueryStatement::Sql(stmt), query_ctx.clone())
754                .await
755                .context(PlanStatementSnafu)?;
756            self.query_engine
757                .describe(plan, query_ctx)
758                .await
759                .map(Some)
760                .context(error::DescribeStatementSnafu)
761        } else {
762            Ok(None)
763        }
764    }
765
766    async fn is_valid_schema_inner(&self, catalog: &str, schema: &str) -> Result<bool> {
767        self.catalog_manager
768            .schema_exists(catalog, schema, None)
769            .await
770            .context(error::CatalogSnafu)
771    }
772}
773
774#[async_trait]
775impl SqlQueryHandler for Instance {
776    async fn do_query(
777        &self,
778        query: &str,
779        query_ctx: QueryContextRef,
780    ) -> Vec<server_error::Result<Output>> {
781        self.do_query_inner(query, query_ctx)
782            .await
783            .into_iter()
784            .map(|result| result.map_err(BoxedError::new).context(ExecuteQuerySnafu))
785            .collect()
786    }
787
788    async fn do_exec_plan(
789        &self,
790        plan: LogicalPlan,
791        stmt: Option<Statement>,
792        query_ctx: QueryContextRef,
793    ) -> server_error::Result<Output> {
794        self.do_exec_plan_inner(plan, stmt, query_ctx)
795            .await
796            .map_err(BoxedError::new)
797            .context(server_error::ExecutePlanSnafu)
798    }
799
800    async fn do_promql_query(
801        &self,
802        query: &PromQuery,
803        query_ctx: QueryContextRef,
804    ) -> Vec<server_error::Result<Output>> {
805        self.do_promql_query_inner(query, query_ctx)
806            .await
807            .into_iter()
808            .map(|result| result.map_err(BoxedError::new).context(ExecuteQuerySnafu))
809            .collect()
810    }
811
812    async fn do_describe(
813        &self,
814        stmt: Statement,
815        query_ctx: QueryContextRef,
816    ) -> server_error::Result<Option<DescribeResult>> {
817        self.do_describe_inner(stmt, query_ctx)
818            .await
819            .map_err(BoxedError::new)
820            .context(server_error::DescribeStatementSnafu)
821    }
822
823    async fn is_valid_schema(&self, catalog: &str, schema: &str) -> server_error::Result<bool> {
824        self.is_valid_schema_inner(catalog, schema)
825            .await
826            .map_err(BoxedError::new)
827            .context(server_error::CheckDatabaseValiditySnafu)
828    }
829}
830
831/// Attaches a timer to the output and observes it once the output is exhausted.
832pub fn attach_timer(output: Output, timer: HistogramTimer) -> Output {
833    match output.data {
834        OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
835        OutputData::Stream(stream) => {
836            let stream = OnDone::new(stream, move || {
837                timer.observe_duration();
838            });
839            Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
840        }
841    }
842}
843
844#[async_trait]
845impl PrometheusHandler for Instance {
846    #[tracing::instrument(skip_all)]
847    async fn do_query(
848        &self,
849        query: &PromQuery,
850        query_ctx: QueryContextRef,
851    ) -> server_error::Result<Output> {
852        let interceptor = self
853            .plugins
854            .get::<PromQueryInterceptorRef<server_error::Error>>();
855
856        self.plugins
857            .get::<PermissionCheckerRef>()
858            .as_ref()
859            .check_permission(query_ctx.current_user(), PermissionReq::PromQuery)
860            .context(AuthSnafu)?;
861
862        let stmt = QueryLanguageParser::parse_promql(query, &query_ctx).with_context(|_| {
863            ParsePromQLSnafu {
864                query: query.clone(),
865            }
866        })?;
867
868        let plan = self
869            .statement_executor
870            .plan(&stmt, query_ctx.clone())
871            .await
872            .map_err(BoxedError::new)
873            .context(ExecuteQuerySnafu)?;
874
875        interceptor.pre_execute(query, Some(&plan), query_ctx.clone())?;
876
877        // Take the EvalStmt from the original QueryStatement and use it to create the CatalogQueryStatement.
878        let query_statement = if let QueryStatement::Promql(eval_stmt, alias) = stmt {
879            CatalogQueryStatement::Promql(eval_stmt, alias)
880        } else {
881            // It should not happen since the query is already parsed successfully.
882            return UnexpectedResultSnafu {
883                reason: "The query should always be promql.".to_string(),
884            }
885            .fail();
886        };
887        let query = query_statement.to_string();
888
889        let slow_query_timer = self
890            .slow_query_options
891            .enable
892            .then(|| self.event_recorder.clone())
893            .flatten()
894            .map(|event_recorder| {
895                SlowQueryTimer::new(
896                    query_statement,
897                    self.slow_query_options.threshold,
898                    self.slow_query_options.sample_ratio,
899                    self.slow_query_options.record_type,
900                    event_recorder,
901                )
902            });
903
904        let ticket = self.process_manager.register_query(
905            query_ctx.current_catalog().to_string(),
906            vec![query_ctx.current_schema()],
907            query,
908            query_ctx.conn_info().to_string(),
909            Some(query_ctx.process_id()),
910            slow_query_timer,
911        );
912
913        let query_fut = self.statement_executor.exec_plan(plan, query_ctx.clone());
914
915        let output = CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
916            .await
917            .map_err(|_| servers::error::CancelledSnafu.build())?
918            .map(|output| {
919                let Output { meta, data } = output;
920                let data = match data {
921                    OutputData::Stream(stream) => {
922                        OutputData::Stream(Box::pin(CancellableStreamWrapper::new(stream, ticket)))
923                    }
924                    other => other,
925                };
926                Output { data, meta }
927            })
928            .map_err(BoxedError::new)
929            .context(ExecuteQuerySnafu)?;
930
931        Ok(interceptor.post_execute(output, query_ctx)?)
932    }
933
934    async fn query_metric_names(
935        &self,
936        matchers: Vec<Matcher>,
937        ctx: &QueryContextRef,
938    ) -> server_error::Result<Vec<String>> {
939        self.handle_query_metric_names(matchers, ctx)
940            .await
941            .map_err(BoxedError::new)
942            .context(ExecuteQuerySnafu)
943    }
944
945    async fn query_label_values(
946        &self,
947        metric: String,
948        label_name: String,
949        matchers: Vec<Matcher>,
950        start: SystemTime,
951        end: SystemTime,
952        ctx: &QueryContextRef,
953    ) -> server_error::Result<Vec<String>> {
954        self.handle_query_label_values(metric, label_name, matchers, start, end, ctx)
955            .await
956            .map_err(BoxedError::new)
957            .context(ExecuteQuerySnafu)
958    }
959
960    fn catalog_manager(&self) -> CatalogManagerRef {
961        self.catalog_manager.clone()
962    }
963}
964
965/// Validate `stmt.database` permission if it's presented.
966macro_rules! validate_db_permission {
967    ($stmt: expr, $query_ctx: expr) => {
968        if let Some(database) = &$stmt.database {
969            validate_catalog_and_schema($query_ctx.current_catalog(), database, $query_ctx)
970                .map_err(BoxedError::new)
971                .context(SqlExecInterceptedSnafu)?;
972        }
973    };
974}
975
976pub fn check_permission(
977    plugins: Plugins,
978    stmt: &Statement,
979    query_ctx: &QueryContextRef,
980) -> Result<()> {
981    let need_validate = plugins
982        .get::<QueryOptions>()
983        .map(|opts| opts.disallow_cross_catalog_query)
984        .unwrap_or_default();
985
986    if !need_validate {
987        return Ok(());
988    }
989
990    match stmt {
991        // Will be checked in execution.
992        // TODO(dennis): add a hook for admin commands.
993        Statement::Admin(_) => {}
994        // These are executed by query engine, and will be checked there.
995        Statement::Query(_)
996        | Statement::Explain(_)
997        | Statement::Tql(_)
998        | Statement::Delete(_)
999        | Statement::DeclareCursor(_)
1000        | Statement::Copy(sql::statements::copy::Copy::CopyQueryTo(_)) => {}
1001        // database ops won't be checked
1002        Statement::CreateDatabase(_)
1003        | Statement::ShowDatabases(_)
1004        | Statement::DropDatabase(_)
1005        | Statement::AlterDatabase(_)
1006        | Statement::DropFlow(_)
1007        | Statement::Use(_) => {}
1008        #[cfg(feature = "enterprise")]
1009        Statement::DropTrigger(_) => {}
1010        Statement::ShowCreateDatabase(stmt) => {
1011            validate_database(&stmt.database_name, query_ctx)?;
1012        }
1013        Statement::ShowCreateTable(stmt) => {
1014            validate_param(&stmt.table_name, query_ctx)?;
1015        }
1016        Statement::ShowCreateFlow(stmt) => {
1017            validate_flow(&stmt.flow_name, query_ctx)?;
1018        }
1019        #[cfg(feature = "enterprise")]
1020        Statement::ShowCreateTrigger(stmt) => {
1021            validate_param(&stmt.trigger_name, query_ctx)?;
1022        }
1023        Statement::ShowCreateView(stmt) => {
1024            validate_param(&stmt.view_name, query_ctx)?;
1025        }
1026        Statement::CreateExternalTable(stmt) => {
1027            validate_param(&stmt.name, query_ctx)?;
1028        }
1029        Statement::CreateFlow(stmt) => {
1030            // TODO: should also validate source table name here?
1031            validate_param(&stmt.sink_table_name, query_ctx)?;
1032        }
1033        #[cfg(feature = "enterprise")]
1034        Statement::CreateTrigger(stmt) => {
1035            validate_param(&stmt.trigger_name, query_ctx)?;
1036        }
1037        Statement::CreateView(stmt) => {
1038            validate_param(&stmt.name, query_ctx)?;
1039        }
1040        Statement::AlterTable(stmt) => {
1041            validate_param(stmt.table_name(), query_ctx)?;
1042        }
1043        #[cfg(feature = "enterprise")]
1044        Statement::AlterTrigger(_) => {}
1045        // set/show variable now only alter/show variable in session
1046        Statement::SetVariables(_) | Statement::ShowVariables(_) => {}
1047        // show charset and show collation won't be checked
1048        Statement::ShowCharset(_) | Statement::ShowCollation(_) => {}
1049
1050        Statement::Comment(comment) => match &comment.object {
1051            CommentObject::Table(table) => validate_param(table, query_ctx)?,
1052            CommentObject::Column { table, .. } => validate_param(table, query_ctx)?,
1053            CommentObject::Flow(flow) => validate_flow(flow, query_ctx)?,
1054        },
1055
1056        Statement::Insert(insert) => {
1057            let name = insert.table_name().context(ParseSqlSnafu)?;
1058            validate_param(name, query_ctx)?;
1059        }
1060        Statement::CreateTable(stmt) => {
1061            validate_param(&stmt.name, query_ctx)?;
1062        }
1063        Statement::CreateTableLike(stmt) => {
1064            validate_param(&stmt.table_name, query_ctx)?;
1065            validate_param(&stmt.source_name, query_ctx)?;
1066        }
1067        Statement::DropTable(drop_stmt) => {
1068            for table_name in drop_stmt.table_names() {
1069                validate_param(table_name, query_ctx)?;
1070            }
1071        }
1072        Statement::DropView(stmt) => {
1073            validate_param(&stmt.view_name, query_ctx)?;
1074        }
1075        Statement::ShowTables(stmt) => {
1076            validate_db_permission!(stmt, query_ctx);
1077        }
1078        Statement::ShowTableStatus(stmt) => {
1079            validate_db_permission!(stmt, query_ctx);
1080        }
1081        Statement::ShowColumns(stmt) => {
1082            validate_db_permission!(stmt, query_ctx);
1083        }
1084        Statement::ShowIndex(stmt) => {
1085            validate_db_permission!(stmt, query_ctx);
1086        }
1087        Statement::ShowRegion(stmt) => {
1088            validate_db_permission!(stmt, query_ctx);
1089        }
1090        Statement::ShowViews(stmt) => {
1091            validate_db_permission!(stmt, query_ctx);
1092        }
1093        Statement::ShowFlows(stmt) => {
1094            validate_db_permission!(stmt, query_ctx);
1095        }
1096        #[cfg(feature = "enterprise")]
1097        Statement::ShowTriggers(_stmt) => {
1098            // The trigger is organized based on the catalog dimension, so there
1099            // is no need to check the permission of the database(schema).
1100        }
1101        Statement::ShowStatus(_stmt) => {}
1102        Statement::ShowSearchPath(_stmt) => {}
1103        Statement::DescribeTable(stmt) => {
1104            validate_param(stmt.name(), query_ctx)?;
1105        }
1106        Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => match stmt {
1107            CopyTable::To(copy_table_to) => validate_param(&copy_table_to.table_name, query_ctx)?,
1108            CopyTable::From(copy_table_from) => {
1109                validate_param(&copy_table_from.table_name, query_ctx)?
1110            }
1111        },
1112        Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
1113            match copy_database {
1114                CopyDatabase::To(stmt) => validate_database(&stmt.database_name, query_ctx)?,
1115                CopyDatabase::From(stmt) => validate_database(&stmt.database_name, query_ctx)?,
1116            }
1117        }
1118        Statement::TruncateTable(stmt) => {
1119            validate_param(stmt.table_name(), query_ctx)?;
1120        }
1121        // cursor operations are always allowed once it's created
1122        Statement::FetchCursor(_) | Statement::CloseCursor(_) => {}
1123        // User can only kill process in their own catalog.
1124        Statement::Kill(_) => {}
1125        // SHOW PROCESSLIST
1126        Statement::ShowProcesslist(_) => {}
1127    }
1128    Ok(())
1129}
1130
1131fn validate_param(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1132    let (catalog, schema, _) = table_idents_to_full_name(name, query_ctx)
1133        .map_err(BoxedError::new)
1134        .context(ExternalSnafu)?;
1135
1136    validate_catalog_and_schema(&catalog, &schema, query_ctx)
1137        .map_err(BoxedError::new)
1138        .context(SqlExecInterceptedSnafu)
1139}
1140
1141fn validate_flow(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1142    let catalog = match &name.0[..] {
1143        [_flow] => query_ctx.current_catalog().to_string(),
1144        [catalog, _flow] => catalog.to_string_unquoted(),
1145        _ => {
1146            return InvalidSqlSnafu {
1147                err_msg: format!(
1148                    "expect flow name to be <catalog>.<flow_name> or <flow_name>, actual: {name}",
1149                ),
1150            }
1151            .fail();
1152        }
1153    };
1154
1155    let schema = query_ctx.current_schema();
1156
1157    validate_catalog_and_schema(&catalog, &schema, query_ctx)
1158        .map_err(BoxedError::new)
1159        .context(SqlExecInterceptedSnafu)
1160}
1161
1162fn validate_database(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1163    let (catalog, schema) = match &name.0[..] {
1164        [schema] => (
1165            query_ctx.current_catalog().to_string(),
1166            schema.to_string_unquoted(),
1167        ),
1168        [catalog, schema] => (catalog.to_string_unquoted(), schema.to_string_unquoted()),
1169        _ => InvalidSqlSnafu {
1170            err_msg: format!(
1171                "expect database name to be <catalog>.<schema> or <schema>, actual: {name}",
1172            ),
1173        }
1174        .fail()?,
1175    };
1176
1177    validate_catalog_and_schema(&catalog, &schema, query_ctx)
1178        .map_err(BoxedError::new)
1179        .context(SqlExecInterceptedSnafu)
1180}
1181
1182fn is_readonly_plan(plan: &LogicalPlan) -> bool {
1183    !matches!(plan, LogicalPlan::Dml(_) | LogicalPlan::Ddl(_))
1184}
1185
1186#[cfg(test)]
1187mod tests {
1188    use std::collections::HashMap;
1189    use std::sync::atomic::{AtomicBool, Ordering};
1190    use std::sync::{Arc, Barrier};
1191    use std::thread;
1192    use std::time::{Duration, Instant};
1193
1194    use common_base::Plugins;
1195    use query::query_engine::options::QueryOptions;
1196    use session::context::QueryContext;
1197    use sql::dialect::GreptimeDbDialect;
1198    use strfmt::Format;
1199
1200    use super::*;
1201
1202    #[test]
1203    fn test_fast_legacy_check_deadlock_prevention() {
1204        // Create a DashMap to simulate the cache
1205        let cache = DashMap::new();
1206
1207        // Pre-populate cache with some entries
1208        cache.insert("metric1".to_string(), true); // legacy mode
1209        cache.insert("metric2".to_string(), false); // prom mode
1210        cache.insert("metric3".to_string(), true); // legacy mode
1211
1212        // Test case 1: Normal operation with cache hits
1213        let metric1 = "metric1".to_string();
1214        let metric4 = "metric4".to_string();
1215        let names1 = vec![&metric1, &metric4];
1216        let result = fast_legacy_check(&cache, &names1);
1217        assert!(result.is_ok());
1218        assert_eq!(result.unwrap(), Some(true)); // should return legacy mode
1219
1220        // Verify that metric4 was added to cache
1221        assert!(cache.contains_key("metric4"));
1222        assert!(*cache.get("metric4").unwrap().value());
1223
1224        // Test case 2: No cache hits
1225        let metric5 = "metric5".to_string();
1226        let metric6 = "metric6".to_string();
1227        let names2 = vec![&metric5, &metric6];
1228        let result = fast_legacy_check(&cache, &names2);
1229        assert!(result.is_ok());
1230        assert_eq!(result.unwrap(), None); // should return None as no cache hits
1231
1232        // Test case 3: Incompatible modes should return error
1233        let cache_incompatible = DashMap::new();
1234        cache_incompatible.insert("metric1".to_string(), true); // legacy
1235        cache_incompatible.insert("metric2".to_string(), false); // prom
1236        let metric1_test = "metric1".to_string();
1237        let metric2_test = "metric2".to_string();
1238        let names3 = vec![&metric1_test, &metric2_test];
1239        let result = fast_legacy_check(&cache_incompatible, &names3);
1240        assert!(result.is_err()); // should error due to incompatible modes
1241
1242        // Test case 4: Intensive concurrent access to test deadlock prevention
1243        // This test specifically targets the scenario where multiple threads
1244        // access the same cache entries simultaneously
1245        let cache_concurrent = Arc::new(DashMap::new());
1246        cache_concurrent.insert("shared_metric".to_string(), true);
1247
1248        let num_threads = 8;
1249        let operations_per_thread = 100;
1250        let barrier = Arc::new(Barrier::new(num_threads));
1251        let success_flag = Arc::new(AtomicBool::new(true));
1252
1253        let handles: Vec<_> = (0..num_threads)
1254            .map(|thread_id| {
1255                let cache_clone = Arc::clone(&cache_concurrent);
1256                let barrier_clone = Arc::clone(&barrier);
1257                let success_flag_clone = Arc::clone(&success_flag);
1258
1259                thread::spawn(move || {
1260                    // Wait for all threads to be ready
1261                    barrier_clone.wait();
1262
1263                    let start_time = Instant::now();
1264                    for i in 0..operations_per_thread {
1265                        // Each operation references existing cache entry and adds new ones
1266                        let shared_metric = "shared_metric".to_string();
1267                        let new_metric = format!("thread_{}_metric_{}", thread_id, i);
1268                        let names = vec![&shared_metric, &new_metric];
1269
1270                        match fast_legacy_check(&cache_clone, &names) {
1271                            Ok(_) => {}
1272                            Err(_) => {
1273                                success_flag_clone.store(false, Ordering::Relaxed);
1274                                return;
1275                            }
1276                        }
1277
1278                        // If the test takes too long, it likely means deadlock
1279                        if start_time.elapsed() > Duration::from_secs(10) {
1280                            success_flag_clone.store(false, Ordering::Relaxed);
1281                            return;
1282                        }
1283                    }
1284                })
1285            })
1286            .collect();
1287
1288        // Join all threads with timeout
1289        let start_time = Instant::now();
1290        for (i, handle) in handles.into_iter().enumerate() {
1291            let join_result = handle.join();
1292
1293            // Check if we're taking too long (potential deadlock)
1294            if start_time.elapsed() > Duration::from_secs(30) {
1295                panic!("Test timed out - possible deadlock detected!");
1296            }
1297
1298            if join_result.is_err() {
1299                panic!("Thread {} panicked during execution", i);
1300            }
1301        }
1302
1303        // Verify all operations completed successfully
1304        assert!(
1305            success_flag.load(Ordering::Relaxed),
1306            "Some operations failed"
1307        );
1308
1309        // Verify that many new entries were added (proving operations completed)
1310        let final_count = cache_concurrent.len();
1311        assert!(
1312            final_count > 1 + num_threads * operations_per_thread / 2,
1313            "Expected more cache entries, got {}",
1314            final_count
1315        );
1316    }
1317
1318    #[test]
1319    fn test_exec_validation() {
1320        let query_ctx = QueryContext::arc();
1321        let plugins: Plugins = Plugins::new();
1322        plugins.insert(QueryOptions {
1323            disallow_cross_catalog_query: true,
1324        });
1325
1326        let sql = r#"
1327        SELECT * FROM demo;
1328        EXPLAIN SELECT * FROM demo;
1329        CREATE DATABASE test_database;
1330        SHOW DATABASES;
1331        "#;
1332        let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1333        assert_eq!(stmts.len(), 4);
1334        for stmt in stmts {
1335            let re = check_permission(plugins.clone(), &stmt, &query_ctx);
1336            re.unwrap();
1337        }
1338
1339        let sql = r#"
1340        SHOW CREATE TABLE demo;
1341        ALTER TABLE demo ADD COLUMN new_col INT;
1342        "#;
1343        let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1344        assert_eq!(stmts.len(), 2);
1345        for stmt in stmts {
1346            let re = check_permission(plugins.clone(), &stmt, &query_ctx);
1347            re.unwrap();
1348        }
1349
1350        fn replace_test(template_sql: &str, plugins: Plugins, query_ctx: &QueryContextRef) {
1351            // test right
1352            let right = vec![("", ""), ("", "public."), ("greptime.", "public.")];
1353            for (catalog, schema) in right {
1354                let sql = do_fmt(template_sql, catalog, schema);
1355                do_test(&sql, plugins.clone(), query_ctx, true);
1356            }
1357
1358            let wrong = vec![
1359                ("wrongcatalog.", "public."),
1360                ("wrongcatalog.", "wrongschema."),
1361            ];
1362            for (catalog, schema) in wrong {
1363                let sql = do_fmt(template_sql, catalog, schema);
1364                do_test(&sql, plugins.clone(), query_ctx, false);
1365            }
1366        }
1367
1368        fn do_fmt(template: &str, catalog: &str, schema: &str) -> String {
1369            let vars = HashMap::from([
1370                ("catalog".to_string(), catalog),
1371                ("schema".to_string(), schema),
1372            ]);
1373            template.format(&vars).unwrap()
1374        }
1375
1376        fn do_test(sql: &str, plugins: Plugins, query_ctx: &QueryContextRef, is_ok: bool) {
1377            let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
1378            let re = check_permission(plugins, stmt, query_ctx);
1379            if is_ok {
1380                re.unwrap();
1381            } else {
1382                assert!(re.is_err());
1383            }
1384        }
1385
1386        // test insert
1387        let sql = "INSERT INTO {catalog}{schema}monitor(host) VALUES ('host1');";
1388        replace_test(sql, plugins.clone(), &query_ctx);
1389
1390        // test create table
1391        let sql = r#"CREATE TABLE {catalog}{schema}demo(
1392                            host STRING,
1393                            ts TIMESTAMP,
1394                            TIME INDEX (ts),
1395                            PRIMARY KEY(host)
1396                        ) engine=mito;"#;
1397        replace_test(sql, plugins.clone(), &query_ctx);
1398
1399        // test drop table
1400        let sql = "DROP TABLE {catalog}{schema}demo;";
1401        replace_test(sql, plugins.clone(), &query_ctx);
1402
1403        // test show tables
1404        let sql = "SHOW TABLES FROM public";
1405        let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1406        check_permission(plugins.clone(), &stmt[0], &query_ctx).unwrap();
1407
1408        let sql = "SHOW TABLES FROM private";
1409        let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1410        let re = check_permission(plugins.clone(), &stmt[0], &query_ctx);
1411        assert!(re.is_ok());
1412
1413        // test describe table
1414        let sql = "DESC TABLE {catalog}{schema}demo;";
1415        replace_test(sql, plugins.clone(), &query_ctx);
1416
1417        let comment_flow_cases = [
1418            ("COMMENT ON FLOW my_flow IS 'comment';", true),
1419            ("COMMENT ON FLOW greptime.my_flow IS 'comment';", true),
1420            ("COMMENT ON FLOW wrongcatalog.my_flow IS 'comment';", false),
1421        ];
1422        for (sql, is_ok) in comment_flow_cases {
1423            let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
1424            let result = check_permission(plugins.clone(), stmt, &query_ctx);
1425            assert_eq!(result.is_ok(), is_ok);
1426        }
1427
1428        let show_flow_cases = [
1429            ("SHOW CREATE FLOW my_flow;", true),
1430            ("SHOW CREATE FLOW greptime.my_flow;", true),
1431            ("SHOW CREATE FLOW wrongcatalog.my_flow;", false),
1432        ];
1433        for (sql, is_ok) in show_flow_cases {
1434            let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
1435            let result = check_permission(plugins.clone(), stmt, &query_ctx);
1436            assert_eq!(result.is_ok(), is_ok);
1437        }
1438    }
1439}