Skip to main content

frontend/
instance.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod builder;
16mod dashboard;
17mod grpc;
18mod influxdb;
19mod jaeger;
20mod log_handler;
21mod logs;
22mod opentsdb;
23mod otlp;
24pub mod prom_store;
25mod promql;
26mod region_query;
27pub mod standalone;
28
29use std::pin::Pin;
30use std::sync::atomic::AtomicBool;
31use std::sync::{Arc, atomic};
32use std::time::{Duration, SystemTime};
33
34use async_stream::stream;
35use async_trait::async_trait;
36use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
37use catalog::CatalogManagerRef;
38use catalog::process_manager::{
39    ProcessManagerRef, QueryStatement as CatalogQueryStatement, SlowQueryTimer,
40};
41use client::OutputData;
42use common_base::Plugins;
43use common_base::cancellation::CancellableFuture;
44use common_error::ext::{BoxedError, ErrorExt};
45use common_event_recorder::EventRecorderRef;
46use common_meta::cache_invalidator::CacheInvalidatorRef;
47use common_meta::key::TableMetadataManagerRef;
48use common_meta::key::table_name::TableNameKey;
49use common_meta::node_manager::NodeManagerRef;
50use common_meta::procedure_executor::ProcedureExecutorRef;
51use common_query::Output;
52use common_recordbatch::RecordBatchStreamWrapper;
53use common_recordbatch::error::StreamTimeoutSnafu;
54use common_telemetry::logging::SlowQueryOptions;
55use common_telemetry::{debug, error, tracing};
56use dashmap::DashMap;
57use datafusion_expr::LogicalPlan;
58use futures::{Stream, StreamExt};
59use lazy_static::lazy_static;
60use operator::delete::DeleterRef;
61use operator::insert::InserterRef;
62use operator::statement::{StatementExecutor, StatementExecutorRef};
63use partition::manager::PartitionRuleManagerRef;
64use pipeline::pipeline_operator::PipelineOperator;
65use prometheus::HistogramTimer;
66use promql_parser::label::Matcher;
67use query::QueryEngineRef;
68use query::metrics::OnDone;
69use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
70use query::query_engine::DescribeResult;
71use query::query_engine::options::{QueryOptions, validate_catalog_and_schema};
72use servers::error::{
73    self as server_error, AuthSnafu, CommonMetaSnafu, ExecuteQuerySnafu,
74    OtlpMetricModeIncompatibleSnafu, ParsePromQLSnafu, UnexpectedResultSnafu,
75};
76use servers::interceptor::{
77    PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef,
78};
79use servers::otlp::metrics::legacy_normalize_otlp_name;
80use servers::prometheus_handler::PrometheusHandler;
81use servers::query_handler::sql::SqlQueryHandler;
82use session::context::{Channel, QueryContextRef};
83use session::table_name::table_idents_to_full_name;
84use snafu::prelude::*;
85use sql::ast::ObjectNamePartExt;
86use sql::dialect::Dialect;
87use sql::parser::{ParseOptions, ParserContext};
88use sql::statements::comment::CommentObject;
89use sql::statements::copy::{CopyDatabase, CopyTable};
90use sql::statements::statement::Statement;
91use sql::statements::tql::Tql;
92use sqlparser::ast::ObjectName;
93pub use standalone::StandaloneDatanodeManager;
94use table::requests::{OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM};
95use tracing::Span;
96
97use crate::error::{
98    self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, InvalidSqlSnafu,
99    ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu,
100    StatementTimeoutSnafu, TableOperationSnafu,
101};
102use crate::stream_wrapper::CancellableStreamWrapper;
103
104lazy_static! {
105    static ref OTLP_LEGACY_DEFAULT_VALUE: String = "legacy".to_string();
106}
107
108/// The frontend instance contains necessary components, and implements many
109/// traits, like [`servers::query_handler::grpc::GrpcQueryHandler`],
110/// [`servers::query_handler::sql::SqlQueryHandler`], etc.
111#[derive(Clone)]
112pub struct Instance {
113    frontend_peer_addr: String,
114    catalog_manager: CatalogManagerRef,
115    pipeline_operator: Arc<PipelineOperator>,
116    statement_executor: Arc<StatementExecutor>,
117    query_engine: QueryEngineRef,
118    plugins: Plugins,
119    inserter: InserterRef,
120    deleter: DeleterRef,
121    table_metadata_manager: TableMetadataManagerRef,
122    event_recorder: Option<EventRecorderRef>,
123    process_manager: ProcessManagerRef,
124    slow_query_options: SlowQueryOptions,
125    suspend: Arc<AtomicBool>,
126
127    // cache for otlp metrics
128    // first layer key: db-string
129    // key: direct input metric name
130    // value: if runs in legacy mode
131    otlp_metrics_table_legacy_cache: DashMap<String, DashMap<String, bool>>,
132}
133
134impl Instance {
135    pub fn frontend_peer_addr(&self) -> &str {
136        &self.frontend_peer_addr
137    }
138
139    pub fn catalog_manager(&self) -> &CatalogManagerRef {
140        &self.catalog_manager
141    }
142
143    pub fn query_engine(&self) -> &QueryEngineRef {
144        &self.query_engine
145    }
146
147    pub fn plugins(&self) -> &Plugins {
148        &self.plugins
149    }
150
151    pub fn statement_executor(&self) -> &StatementExecutorRef {
152        &self.statement_executor
153    }
154
155    pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
156        &self.table_metadata_manager
157    }
158
159    pub fn inserter(&self) -> &InserterRef {
160        &self.inserter
161    }
162
163    pub fn process_manager(&self) -> &ProcessManagerRef {
164        &self.process_manager
165    }
166
167    pub fn node_manager(&self) -> &NodeManagerRef {
168        self.inserter.node_manager()
169    }
170
171    pub fn partition_manager(&self) -> &PartitionRuleManagerRef {
172        self.inserter.partition_manager()
173    }
174
175    pub fn cache_invalidator(&self) -> &CacheInvalidatorRef {
176        self.statement_executor.cache_invalidator()
177    }
178
179    pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
180        self.statement_executor.procedure_executor()
181    }
182
183    pub fn suspend_state(&self) -> Arc<AtomicBool> {
184        self.suspend.clone()
185    }
186
187    pub(crate) fn is_suspended(&self) -> bool {
188        self.suspend.load(atomic::Ordering::Relaxed)
189    }
190}
191
192fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result<Vec<Statement>> {
193    ParserContext::create_with_dialect(sql, dialect, ParseOptions::default()).context(ParseSqlSnafu)
194}
195
196impl Instance {
197    async fn query_statement(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
198        check_permission(self.plugins.clone(), &stmt, &query_ctx)?;
199
200        let query_interceptor = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
201        let query_interceptor = query_interceptor.as_ref();
202
203        if stmt.is_readonly() {
204            let slow_query_timer = self
205                .slow_query_options
206                .enable
207                .then(|| self.event_recorder.clone())
208                .flatten()
209                .map(|event_recorder| {
210                    SlowQueryTimer::new(
211                        CatalogQueryStatement::Sql(stmt.clone()),
212                        self.slow_query_options.threshold,
213                        self.slow_query_options.sample_ratio,
214                        self.slow_query_options.record_type,
215                        event_recorder,
216                    )
217                });
218
219            let ticket = self.process_manager.register_query(
220                query_ctx.current_catalog().to_string(),
221                vec![query_ctx.current_schema()],
222                stmt.to_string(),
223                query_ctx.conn_info().to_string(),
224                Some(query_ctx.process_id()),
225                slow_query_timer,
226            );
227
228            let query_fut = self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor);
229
230            CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
231                .await
232                .map_err(|_| error::CancelledSnafu.build())?
233                .map(|output| {
234                    let Output { meta, data } = output;
235
236                    let data = match data {
237                        OutputData::Stream(stream) => OutputData::Stream(Box::pin(
238                            CancellableStreamWrapper::new(stream, ticket),
239                        )),
240                        other => other,
241                    };
242                    Output { data, meta }
243                })
244        } else {
245            self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor)
246                .await
247        }
248    }
249
250    async fn exec_statement_with_timeout(
251        &self,
252        stmt: Statement,
253        query_ctx: QueryContextRef,
254        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
255    ) -> Result<Output> {
256        let timeout = derive_timeout(&stmt, &query_ctx);
257        match timeout {
258            Some(timeout) => {
259                let start = tokio::time::Instant::now();
260                let output = tokio::time::timeout(
261                    timeout,
262                    self.exec_statement(stmt, query_ctx, query_interceptor),
263                )
264                .await
265                .map_err(|_| StatementTimeoutSnafu.build())??;
266                // compute remaining timeout
267                let remaining_timeout = timeout.checked_sub(start.elapsed()).unwrap_or_default();
268                attach_timeout(output, remaining_timeout)
269            }
270            None => {
271                self.exec_statement(stmt, query_ctx, query_interceptor)
272                    .await
273            }
274        }
275    }
276
277    async fn exec_statement(
278        &self,
279        stmt: Statement,
280        query_ctx: QueryContextRef,
281        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
282    ) -> Result<Output> {
283        match stmt {
284            Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
285                // TODO: remove this when format is supported in datafusion
286                if let Statement::Explain(explain) = &stmt
287                    && let Some(format) = explain.format()
288                {
289                    query_ctx.set_explain_format(format.to_string());
290                }
291
292                self.plan_and_exec_sql(stmt, &query_ctx, query_interceptor)
293                    .await
294            }
295            Statement::Tql(tql) => {
296                self.plan_and_exec_tql(&query_ctx, query_interceptor, tql)
297                    .await
298            }
299            _ => {
300                query_interceptor.pre_execute(&stmt, None, query_ctx.clone())?;
301                self.statement_executor
302                    .execute_sql(stmt, query_ctx)
303                    .await
304                    .context(TableOperationSnafu)
305            }
306        }
307    }
308
309    async fn plan_and_exec_sql(
310        &self,
311        stmt: Statement,
312        query_ctx: &QueryContextRef,
313        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
314    ) -> Result<Output> {
315        let stmt = QueryStatement::Sql(stmt);
316        let plan = self
317            .statement_executor
318            .plan(&stmt, query_ctx.clone())
319            .await?;
320        let QueryStatement::Sql(stmt) = stmt else {
321            unreachable!()
322        };
323        query_interceptor.pre_execute(&stmt, Some(&plan), query_ctx.clone())?;
324
325        self.statement_executor
326            .exec_plan(plan, query_ctx.clone())
327            .await
328            .context(TableOperationSnafu)
329    }
330
331    async fn plan_and_exec_tql(
332        &self,
333        query_ctx: &QueryContextRef,
334        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
335        tql: Tql,
336    ) -> Result<Output> {
337        let plan = self
338            .statement_executor
339            .plan_tql(tql.clone(), query_ctx)
340            .await?;
341        query_interceptor.pre_execute(&Statement::Tql(tql), Some(&plan), query_ctx.clone())?;
342        self.statement_executor
343            .exec_plan(plan, query_ctx.clone())
344            .await
345            .context(TableOperationSnafu)
346    }
347
348    async fn check_otlp_legacy(
349        &self,
350        names: &[&String],
351        ctx: QueryContextRef,
352    ) -> server_error::Result<bool> {
353        let db_string = ctx.get_db_string();
354        // fast cache check
355        let cache = self
356            .otlp_metrics_table_legacy_cache
357            .entry(db_string.clone())
358            .or_default();
359        if let Some(flag) = fast_legacy_check(&cache, names)? {
360            return Ok(flag);
361        }
362        // release cache reference to avoid lock contention
363        drop(cache);
364
365        let catalog = ctx.current_catalog();
366        let schema = ctx.current_schema();
367
368        // query legacy table names
369        let normalized_names = names
370            .iter()
371            .map(|n| legacy_normalize_otlp_name(n))
372            .collect::<Vec<_>>();
373        let table_names = normalized_names
374            .iter()
375            .map(|n| TableNameKey::new(catalog, &schema, n))
376            .collect::<Vec<_>>();
377        let table_values = self
378            .table_metadata_manager()
379            .table_name_manager()
380            .batch_get(table_names)
381            .await
382            .context(CommonMetaSnafu)?;
383        let table_ids = table_values
384            .into_iter()
385            .filter_map(|v| v.map(|vi| vi.table_id()))
386            .collect::<Vec<_>>();
387
388        // means no existing table is found, use new mode
389        if table_ids.is_empty() {
390            let cache = self
391                .otlp_metrics_table_legacy_cache
392                .entry(db_string)
393                .or_default();
394            names.iter().for_each(|name| {
395                cache.insert((*name).clone(), false);
396            });
397            return Ok(false);
398        }
399
400        // has existing table, check table options
401        let table_infos = self
402            .table_metadata_manager()
403            .table_info_manager()
404            .batch_get(&table_ids)
405            .await
406            .context(CommonMetaSnafu)?;
407        let options = table_infos
408            .values()
409            .map(|info| {
410                info.table_info
411                    .meta
412                    .options
413                    .extra_options
414                    .get(OTLP_METRIC_COMPAT_KEY)
415                    .unwrap_or(&OTLP_LEGACY_DEFAULT_VALUE)
416            })
417            .collect::<Vec<_>>();
418        let cache = self
419            .otlp_metrics_table_legacy_cache
420            .entry(db_string)
421            .or_default();
422        if !options.is_empty() {
423            // check value consistency
424            let has_prom = options.iter().any(|opt| *opt == OTLP_METRIC_COMPAT_PROM);
425            let has_legacy = options
426                .iter()
427                .any(|opt| *opt == OTLP_LEGACY_DEFAULT_VALUE.as_str());
428            ensure!(!(has_prom && has_legacy), OtlpMetricModeIncompatibleSnafu);
429            let flag = has_legacy;
430            names.iter().for_each(|name| {
431                cache.insert((*name).clone(), flag);
432            });
433            Ok(flag)
434        } else {
435            // no table info, use new mode
436            names.iter().for_each(|name| {
437                cache.insert((*name).clone(), false);
438            });
439            Ok(false)
440        }
441    }
442}
443
444fn fast_legacy_check(
445    cache: &DashMap<String, bool>,
446    names: &[&String],
447) -> server_error::Result<Option<bool>> {
448    let hit_cache = names
449        .iter()
450        .filter_map(|name| cache.get(*name))
451        .collect::<Vec<_>>();
452    if !hit_cache.is_empty() {
453        let hit_legacy = hit_cache.iter().any(|en| *en.value());
454        let hit_prom = hit_cache.iter().any(|en| !*en.value());
455
456        // hit but have true and false, means both legacy and new mode are used
457        // we cannot handle this case, so return error
458        // add doc links in err msg later
459        ensure!(!(hit_legacy && hit_prom), OtlpMetricModeIncompatibleSnafu);
460
461        let flag = hit_legacy;
462        // drop hit_cache to release references before inserting to avoid deadlock
463        drop(hit_cache);
464
465        // set cache for all names
466        names.iter().for_each(|name| {
467            if !cache.contains_key(*name) {
468                cache.insert((*name).clone(), flag);
469            }
470        });
471        Ok(Some(flag))
472    } else {
473        Ok(None)
474    }
475}
476
477/// If the relevant variables are set, the timeout is enforced for all PostgreSQL statements.
478/// For MySQL, it applies only to read-only statements.
479fn derive_timeout(stmt: &Statement, query_ctx: &QueryContextRef) -> Option<Duration> {
480    let query_timeout = query_ctx.query_timeout()?;
481    if query_timeout.is_zero() {
482        return None;
483    }
484    match query_ctx.channel() {
485        Channel::Mysql if stmt.is_readonly() => Some(query_timeout),
486        Channel::Postgres => Some(query_timeout),
487        _ => None,
488    }
489}
490
491/// Derives timeout for plan execution.
492fn derive_timeout_for_plan(plan: &LogicalPlan, query_ctx: &QueryContextRef) -> Option<Duration> {
493    let query_timeout = query_ctx.query_timeout()?;
494    if query_timeout.is_zero() {
495        return None;
496    }
497    match query_ctx.channel() {
498        Channel::Mysql if is_readonly_plan(plan) => Some(query_timeout),
499        Channel::Postgres => Some(query_timeout),
500        _ => None,
501    }
502}
503
504fn attach_timeout(output: Output, mut timeout: Duration) -> Result<Output> {
505    if timeout.is_zero() {
506        return StatementTimeoutSnafu.fail();
507    }
508
509    let output = match output.data {
510        OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
511        OutputData::Stream(mut stream) => {
512            let schema = stream.schema();
513            let s = Box::pin(stream! {
514                let mut start = tokio::time::Instant::now();
515                while let Some(item) = tokio::time::timeout(timeout, stream.next()).await.map_err(|_| StreamTimeoutSnafu.build())? {
516                    yield item;
517
518                    let now = tokio::time::Instant::now();
519                    timeout = timeout.checked_sub(now - start).unwrap_or(Duration::ZERO);
520                    start = now;
521                    // tokio::time::timeout may not return an error immediately when timeout is 0.
522                    if timeout.is_zero() {
523                        StreamTimeoutSnafu.fail()?;
524                    }
525                }
526            }) as Pin<Box<dyn Stream<Item = _> + Send>>;
527            let stream = RecordBatchStreamWrapper {
528                schema,
529                stream: s,
530                output_ordering: None,
531                metrics: Default::default(),
532                span: Span::current(),
533            };
534            Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
535        }
536    };
537
538    Ok(output)
539}
540
541impl Instance {
542    #[tracing::instrument(skip_all, name = "SqlQueryHandler::do_query")]
543    async fn do_query_inner(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
544        if self.is_suspended() {
545            return vec![error::SuspendedSnafu {}.fail()];
546        }
547
548        let query_interceptor_opt = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
549        let query_interceptor = query_interceptor_opt.as_ref();
550        let query = match query_interceptor.pre_parsing(query, query_ctx.clone()) {
551            Ok(q) => q,
552            Err(e) => return vec![Err(e)],
553        };
554
555        let checker_ref = self.plugins.get::<PermissionCheckerRef>();
556        let checker = checker_ref.as_ref();
557
558        match parse_stmt(query.as_ref(), query_ctx.sql_dialect())
559            .and_then(|stmts| query_interceptor.post_parsing(stmts, query_ctx.clone()))
560        {
561            Ok(stmts) => {
562                if stmts.is_empty() {
563                    return vec![
564                        InvalidSqlSnafu {
565                            err_msg: "empty statements",
566                        }
567                        .fail(),
568                    ];
569                }
570
571                let mut results = Vec::with_capacity(stmts.len());
572                for stmt in stmts {
573                    if let Err(e) = checker
574                        .check_permission(
575                            query_ctx.current_user(),
576                            PermissionReq::SqlStatement(&stmt),
577                        )
578                        .context(PermissionSnafu)
579                    {
580                        results.push(Err(e));
581                        break;
582                    }
583
584                    match self.query_statement(stmt.clone(), query_ctx.clone()).await {
585                        Ok(output) => {
586                            let output_result =
587                                query_interceptor.post_execute(output, query_ctx.clone());
588                            results.push(output_result);
589                        }
590                        Err(e) => {
591                            if e.status_code().should_log_error() {
592                                error!(e; "Failed to execute query: {stmt}");
593                            } else {
594                                debug!("Failed to execute query: {stmt}, {e}");
595                            }
596                            results.push(Err(e));
597                            break;
598                        }
599                    }
600                }
601                results
602            }
603            Err(e) => {
604                vec![Err(e)]
605            }
606        }
607    }
608
609    async fn exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
610        self.query_engine
611            .execute(plan, query_ctx)
612            .await
613            .context(ExecLogicalPlanSnafu)
614    }
615
616    async fn exec_plan_with_timeout(
617        &self,
618        plan: LogicalPlan,
619        query_ctx: QueryContextRef,
620    ) -> Result<Output> {
621        let timeout = derive_timeout_for_plan(&plan, &query_ctx);
622        match timeout {
623            Some(timeout) => {
624                let start = tokio::time::Instant::now();
625                let output = tokio::time::timeout(timeout, self.exec_plan(plan, query_ctx))
626                    .await
627                    .map_err(|_| StatementTimeoutSnafu.build())??;
628                let remaining_timeout = timeout.checked_sub(start.elapsed()).unwrap_or_default();
629                attach_timeout(output, remaining_timeout)
630            }
631            None => self.exec_plan(plan, query_ctx).await,
632        }
633    }
634
635    async fn do_exec_plan_inner(
636        &self,
637        plan: LogicalPlan,
638        stmt: Option<Statement>,
639        query_ctx: QueryContextRef,
640    ) -> Result<Output> {
641        ensure!(!self.is_suspended(), error::SuspendedSnafu);
642
643        let query_interceptor_opt = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
644        let query_interceptor = query_interceptor_opt.as_ref();
645
646        if let Some(ref s) = stmt {
647            query_interceptor.pre_execute(s, Some(&plan), query_ctx.clone())?;
648        }
649
650        let query = stmt
651            .as_ref()
652            .map(|s| s.to_string())
653            .unwrap_or_else(|| plan.display_indent().to_string());
654
655        let result = if is_readonly_plan(&plan) {
656            let slow_query_timer = self
657                .slow_query_options
658                .enable
659                .then(|| self.event_recorder.clone())
660                .flatten()
661                .map(|event_recorder| {
662                    SlowQueryTimer::new(
663                        CatalogQueryStatement::Plan(query.clone()),
664                        self.slow_query_options.threshold,
665                        self.slow_query_options.sample_ratio,
666                        self.slow_query_options.record_type,
667                        event_recorder,
668                    )
669                });
670
671            let ticket = self.process_manager.register_query(
672                query_ctx.current_catalog().to_string(),
673                vec![query_ctx.current_schema()],
674                query,
675                query_ctx.conn_info().to_string(),
676                Some(query_ctx.process_id()),
677                slow_query_timer,
678            );
679
680            let query_fut = self.exec_plan_with_timeout(plan, query_ctx.clone());
681
682            CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
683                .await
684                .map_err(|_| error::CancelledSnafu.build())?
685                .map(|output| {
686                    let Output { meta, data } = output;
687
688                    let data = match data {
689                        OutputData::Stream(stream) => OutputData::Stream(Box::pin(
690                            CancellableStreamWrapper::new(stream, ticket),
691                        )),
692                        other => other,
693                    };
694                    Output { data, meta }
695                })
696        } else {
697            self.exec_plan_with_timeout(plan, query_ctx.clone()).await
698        };
699
700        result.and_then(|output| query_interceptor.post_execute(output, query_ctx))
701    }
702
703    #[tracing::instrument(skip_all, name = "SqlQueryHandler::do_promql_query")]
704    async fn do_promql_query_inner(
705        &self,
706        query: &PromQuery,
707        query_ctx: QueryContextRef,
708    ) -> Vec<Result<Output>> {
709        if self.is_suspended() {
710            return vec![error::SuspendedSnafu {}.fail()];
711        }
712
713        // check will be done in prometheus handler's do_query
714        let result = PrometheusHandler::do_query(self, query, query_ctx)
715            .await
716            .with_context(|_| ExecutePromqlSnafu {
717                query: format!("{query:?}"),
718            });
719        vec![result]
720    }
721
722    async fn do_describe_inner(
723        &self,
724        stmt: Statement,
725        query_ctx: QueryContextRef,
726    ) -> Result<Option<DescribeResult>> {
727        ensure!(!self.is_suspended(), error::SuspendedSnafu);
728
729        // EXPLAIN / EXPLAIN ANALYZE wrap an inner statement; describe them when the
730        // wrapped statement is something we already plan (so that bind parameters
731        // in the inner query get their types inferred). See #8029.
732        let is_inner_plannable = |s: &Statement| {
733            matches!(
734                s,
735                Statement::Insert(_) | Statement::Query(_) | Statement::Delete(_)
736            )
737        };
738        let plannable = is_inner_plannable(&stmt)
739            || matches!(&stmt, Statement::Explain(explain) if is_inner_plannable(explain.statement.as_ref()));
740
741        if plannable {
742            self.plugins
743                .get::<PermissionCheckerRef>()
744                .as_ref()
745                .check_permission(query_ctx.current_user(), PermissionReq::SqlStatement(&stmt))
746                .context(PermissionSnafu)?;
747
748            let plan = self
749                .query_engine
750                .planner()
751                .plan(&QueryStatement::Sql(stmt), query_ctx.clone())
752                .await
753                .context(PlanStatementSnafu)?;
754            self.query_engine
755                .describe(plan, query_ctx)
756                .await
757                .map(Some)
758                .context(error::DescribeStatementSnafu)
759        } else {
760            Ok(None)
761        }
762    }
763
764    async fn is_valid_schema_inner(&self, catalog: &str, schema: &str) -> Result<bool> {
765        self.catalog_manager
766            .schema_exists(catalog, schema, None)
767            .await
768            .context(error::CatalogSnafu)
769    }
770}
771
772#[async_trait]
773impl SqlQueryHandler for Instance {
774    async fn do_query(
775        &self,
776        query: &str,
777        query_ctx: QueryContextRef,
778    ) -> Vec<server_error::Result<Output>> {
779        self.do_query_inner(query, query_ctx)
780            .await
781            .into_iter()
782            .map(|result| result.map_err(BoxedError::new).context(ExecuteQuerySnafu))
783            .collect()
784    }
785
786    async fn do_exec_plan(
787        &self,
788        plan: LogicalPlan,
789        stmt: Option<Statement>,
790        query_ctx: QueryContextRef,
791    ) -> server_error::Result<Output> {
792        self.do_exec_plan_inner(plan, stmt, query_ctx)
793            .await
794            .map_err(BoxedError::new)
795            .context(server_error::ExecutePlanSnafu)
796    }
797
798    async fn do_promql_query(
799        &self,
800        query: &PromQuery,
801        query_ctx: QueryContextRef,
802    ) -> Vec<server_error::Result<Output>> {
803        self.do_promql_query_inner(query, query_ctx)
804            .await
805            .into_iter()
806            .map(|result| result.map_err(BoxedError::new).context(ExecuteQuerySnafu))
807            .collect()
808    }
809
810    async fn do_describe(
811        &self,
812        stmt: Statement,
813        query_ctx: QueryContextRef,
814    ) -> server_error::Result<Option<DescribeResult>> {
815        self.do_describe_inner(stmt, query_ctx)
816            .await
817            .map_err(BoxedError::new)
818            .context(server_error::DescribeStatementSnafu)
819    }
820
821    async fn is_valid_schema(&self, catalog: &str, schema: &str) -> server_error::Result<bool> {
822        self.is_valid_schema_inner(catalog, schema)
823            .await
824            .map_err(BoxedError::new)
825            .context(server_error::CheckDatabaseValiditySnafu)
826    }
827}
828
829/// Attaches a timer to the output and observes it once the output is exhausted.
830pub fn attach_timer(output: Output, timer: HistogramTimer) -> Output {
831    match output.data {
832        OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
833        OutputData::Stream(stream) => {
834            let stream = OnDone::new(stream, move || {
835                timer.observe_duration();
836            });
837            Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
838        }
839    }
840}
841
842#[async_trait]
843impl PrometheusHandler for Instance {
844    #[tracing::instrument(skip_all)]
845    async fn do_query(
846        &self,
847        query: &PromQuery,
848        query_ctx: QueryContextRef,
849    ) -> server_error::Result<Output> {
850        let interceptor = self
851            .plugins
852            .get::<PromQueryInterceptorRef<server_error::Error>>();
853
854        self.plugins
855            .get::<PermissionCheckerRef>()
856            .as_ref()
857            .check_permission(query_ctx.current_user(), PermissionReq::PromQuery)
858            .context(AuthSnafu)?;
859
860        let stmt = QueryLanguageParser::parse_promql(query, &query_ctx).with_context(|_| {
861            ParsePromQLSnafu {
862                query: query.clone(),
863            }
864        })?;
865
866        let plan = self
867            .statement_executor
868            .plan(&stmt, query_ctx.clone())
869            .await
870            .map_err(BoxedError::new)
871            .context(ExecuteQuerySnafu)?;
872
873        interceptor.pre_execute(query, Some(&plan), query_ctx.clone())?;
874
875        // Take the EvalStmt from the original QueryStatement and use it to create the CatalogQueryStatement.
876        let query_statement = if let QueryStatement::Promql(eval_stmt, alias) = stmt {
877            CatalogQueryStatement::Promql(eval_stmt, alias)
878        } else {
879            // It should not happen since the query is already parsed successfully.
880            return UnexpectedResultSnafu {
881                reason: "The query should always be promql.".to_string(),
882            }
883            .fail();
884        };
885        let query = query_statement.to_string();
886
887        let slow_query_timer = self
888            .slow_query_options
889            .enable
890            .then(|| self.event_recorder.clone())
891            .flatten()
892            .map(|event_recorder| {
893                SlowQueryTimer::new(
894                    query_statement,
895                    self.slow_query_options.threshold,
896                    self.slow_query_options.sample_ratio,
897                    self.slow_query_options.record_type,
898                    event_recorder,
899                )
900            });
901
902        let ticket = self.process_manager.register_query(
903            query_ctx.current_catalog().to_string(),
904            vec![query_ctx.current_schema()],
905            query,
906            query_ctx.conn_info().to_string(),
907            Some(query_ctx.process_id()),
908            slow_query_timer,
909        );
910
911        let query_fut = self.statement_executor.exec_plan(plan, query_ctx.clone());
912
913        let output = CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
914            .await
915            .map_err(|_| servers::error::CancelledSnafu.build())?
916            .map(|output| {
917                let Output { meta, data } = output;
918                let data = match data {
919                    OutputData::Stream(stream) => {
920                        OutputData::Stream(Box::pin(CancellableStreamWrapper::new(stream, ticket)))
921                    }
922                    other => other,
923                };
924                Output { data, meta }
925            })
926            .map_err(BoxedError::new)
927            .context(ExecuteQuerySnafu)?;
928
929        Ok(interceptor.post_execute(output, query_ctx)?)
930    }
931
932    async fn query_metric_names(
933        &self,
934        matchers: Vec<Matcher>,
935        ctx: &QueryContextRef,
936    ) -> server_error::Result<Vec<String>> {
937        self.handle_query_metric_names(matchers, ctx)
938            .await
939            .map_err(BoxedError::new)
940            .context(ExecuteQuerySnafu)
941    }
942
943    async fn query_label_values(
944        &self,
945        metric: String,
946        label_name: String,
947        matchers: Vec<Matcher>,
948        start: SystemTime,
949        end: SystemTime,
950        ctx: &QueryContextRef,
951    ) -> server_error::Result<Vec<String>> {
952        self.handle_query_label_values(metric, label_name, matchers, start, end, ctx)
953            .await
954            .map_err(BoxedError::new)
955            .context(ExecuteQuerySnafu)
956    }
957
958    fn catalog_manager(&self) -> CatalogManagerRef {
959        self.catalog_manager.clone()
960    }
961}
962
963/// Validate `stmt.database` permission if it's presented.
964macro_rules! validate_db_permission {
965    ($stmt: expr, $query_ctx: expr) => {
966        if let Some(database) = &$stmt.database {
967            validate_catalog_and_schema($query_ctx.current_catalog(), database, $query_ctx)
968                .map_err(BoxedError::new)
969                .context(SqlExecInterceptedSnafu)?;
970        }
971    };
972}
973
974pub fn check_permission(
975    plugins: Plugins,
976    stmt: &Statement,
977    query_ctx: &QueryContextRef,
978) -> Result<()> {
979    let need_validate = plugins
980        .get::<QueryOptions>()
981        .map(|opts| opts.disallow_cross_catalog_query)
982        .unwrap_or_default();
983
984    if !need_validate {
985        return Ok(());
986    }
987
988    match stmt {
989        // Will be checked in execution.
990        // TODO(dennis): add a hook for admin commands.
991        Statement::Admin(_) => {}
992        // These are executed by query engine, and will be checked there.
993        Statement::Query(_)
994        | Statement::Explain(_)
995        | Statement::Tql(_)
996        | Statement::Delete(_)
997        | Statement::DeclareCursor(_)
998        | Statement::Copy(sql::statements::copy::Copy::CopyQueryTo(_)) => {}
999        // database ops won't be checked
1000        Statement::CreateDatabase(_)
1001        | Statement::ShowDatabases(_)
1002        | Statement::DropDatabase(_)
1003        | Statement::AlterDatabase(_)
1004        | Statement::DropFlow(_)
1005        | Statement::Use(_) => {}
1006        #[cfg(feature = "enterprise")]
1007        Statement::DropTrigger(_) => {}
1008        Statement::ShowCreateDatabase(stmt) => {
1009            validate_database(&stmt.database_name, query_ctx)?;
1010        }
1011        Statement::ShowCreateTable(stmt) => {
1012            validate_param(&stmt.table_name, query_ctx)?;
1013        }
1014        Statement::ShowCreateFlow(stmt) => {
1015            validate_flow(&stmt.flow_name, query_ctx)?;
1016        }
1017        #[cfg(feature = "enterprise")]
1018        Statement::ShowCreateTrigger(stmt) => {
1019            validate_param(&stmt.trigger_name, query_ctx)?;
1020        }
1021        Statement::ShowCreateView(stmt) => {
1022            validate_param(&stmt.view_name, query_ctx)?;
1023        }
1024        Statement::CreateExternalTable(stmt) => {
1025            validate_param(&stmt.name, query_ctx)?;
1026        }
1027        Statement::CreateFlow(stmt) => {
1028            // TODO: should also validate source table name here?
1029            validate_param(&stmt.sink_table_name, query_ctx)?;
1030        }
1031        #[cfg(feature = "enterprise")]
1032        Statement::CreateTrigger(stmt) => {
1033            validate_param(&stmt.trigger_name, query_ctx)?;
1034        }
1035        Statement::CreateView(stmt) => {
1036            validate_param(&stmt.name, query_ctx)?;
1037        }
1038        Statement::AlterTable(stmt) => {
1039            validate_param(stmt.table_name(), query_ctx)?;
1040        }
1041        #[cfg(feature = "enterprise")]
1042        Statement::AlterTrigger(_) => {}
1043        // set/show variable now only alter/show variable in session
1044        Statement::SetVariables(_) | Statement::ShowVariables(_) => {}
1045        // show charset and show collation won't be checked
1046        Statement::ShowCharset(_) | Statement::ShowCollation(_) => {}
1047
1048        Statement::Comment(comment) => match &comment.object {
1049            CommentObject::Table(table) => validate_param(table, query_ctx)?,
1050            CommentObject::Column { table, .. } => validate_param(table, query_ctx)?,
1051            CommentObject::Flow(flow) => validate_flow(flow, query_ctx)?,
1052        },
1053
1054        Statement::Insert(insert) => {
1055            let name = insert.table_name().context(ParseSqlSnafu)?;
1056            validate_param(name, query_ctx)?;
1057        }
1058        Statement::CreateTable(stmt) => {
1059            validate_param(&stmt.name, query_ctx)?;
1060        }
1061        Statement::CreateTableLike(stmt) => {
1062            validate_param(&stmt.table_name, query_ctx)?;
1063            validate_param(&stmt.source_name, query_ctx)?;
1064        }
1065        Statement::DropTable(drop_stmt) => {
1066            for table_name in drop_stmt.table_names() {
1067                validate_param(table_name, query_ctx)?;
1068            }
1069        }
1070        Statement::DropView(stmt) => {
1071            validate_param(&stmt.view_name, query_ctx)?;
1072        }
1073        Statement::ShowTables(stmt) => {
1074            validate_db_permission!(stmt, query_ctx);
1075        }
1076        Statement::ShowTableStatus(stmt) => {
1077            validate_db_permission!(stmt, query_ctx);
1078        }
1079        Statement::ShowColumns(stmt) => {
1080            validate_db_permission!(stmt, query_ctx);
1081        }
1082        Statement::ShowIndex(stmt) => {
1083            validate_db_permission!(stmt, query_ctx);
1084        }
1085        Statement::ShowRegion(stmt) => {
1086            validate_db_permission!(stmt, query_ctx);
1087        }
1088        Statement::ShowViews(stmt) => {
1089            validate_db_permission!(stmt, query_ctx);
1090        }
1091        Statement::ShowFlows(stmt) => {
1092            validate_db_permission!(stmt, query_ctx);
1093        }
1094        #[cfg(feature = "enterprise")]
1095        Statement::ShowTriggers(_stmt) => {
1096            // The trigger is organized based on the catalog dimension, so there
1097            // is no need to check the permission of the database(schema).
1098        }
1099        Statement::ShowStatus(_stmt) => {}
1100        Statement::ShowSearchPath(_stmt) => {}
1101        Statement::DescribeTable(stmt) => {
1102            validate_param(stmt.name(), query_ctx)?;
1103        }
1104        Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => match stmt {
1105            CopyTable::To(copy_table_to) => validate_param(&copy_table_to.table_name, query_ctx)?,
1106            CopyTable::From(copy_table_from) => {
1107                validate_param(&copy_table_from.table_name, query_ctx)?
1108            }
1109        },
1110        Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
1111            match copy_database {
1112                CopyDatabase::To(stmt) => validate_database(&stmt.database_name, query_ctx)?,
1113                CopyDatabase::From(stmt) => validate_database(&stmt.database_name, query_ctx)?,
1114            }
1115        }
1116        Statement::TruncateTable(stmt) => {
1117            validate_param(stmt.table_name(), query_ctx)?;
1118        }
1119        // cursor operations are always allowed once it's created
1120        Statement::FetchCursor(_) | Statement::CloseCursor(_) => {}
1121        // User can only kill process in their own catalog.
1122        Statement::Kill(_) => {}
1123        // SHOW PROCESSLIST
1124        Statement::ShowProcesslist(_) => {}
1125    }
1126    Ok(())
1127}
1128
1129fn validate_param(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1130    let (catalog, schema, _) = table_idents_to_full_name(name, query_ctx)
1131        .map_err(BoxedError::new)
1132        .context(ExternalSnafu)?;
1133
1134    validate_catalog_and_schema(&catalog, &schema, query_ctx)
1135        .map_err(BoxedError::new)
1136        .context(SqlExecInterceptedSnafu)
1137}
1138
1139fn validate_flow(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1140    let catalog = match &name.0[..] {
1141        [_flow] => query_ctx.current_catalog().to_string(),
1142        [catalog, _flow] => catalog.to_string_unquoted(),
1143        _ => {
1144            return InvalidSqlSnafu {
1145                err_msg: format!(
1146                    "expect flow name to be <catalog>.<flow_name> or <flow_name>, actual: {name}",
1147                ),
1148            }
1149            .fail();
1150        }
1151    };
1152
1153    let schema = query_ctx.current_schema();
1154
1155    validate_catalog_and_schema(&catalog, &schema, query_ctx)
1156        .map_err(BoxedError::new)
1157        .context(SqlExecInterceptedSnafu)
1158}
1159
1160fn validate_database(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1161    let (catalog, schema) = match &name.0[..] {
1162        [schema] => (
1163            query_ctx.current_catalog().to_string(),
1164            schema.to_string_unquoted(),
1165        ),
1166        [catalog, schema] => (catalog.to_string_unquoted(), schema.to_string_unquoted()),
1167        _ => InvalidSqlSnafu {
1168            err_msg: format!(
1169                "expect database name to be <catalog>.<schema> or <schema>, actual: {name}",
1170            ),
1171        }
1172        .fail()?,
1173    };
1174
1175    validate_catalog_and_schema(&catalog, &schema, query_ctx)
1176        .map_err(BoxedError::new)
1177        .context(SqlExecInterceptedSnafu)
1178}
1179
1180fn is_readonly_plan(plan: &LogicalPlan) -> bool {
1181    !matches!(plan, LogicalPlan::Dml(_) | LogicalPlan::Ddl(_))
1182}
1183
1184#[cfg(test)]
1185mod tests {
1186    use std::collections::HashMap;
1187    use std::sync::atomic::{AtomicBool, Ordering};
1188    use std::sync::{Arc, Barrier};
1189    use std::thread;
1190    use std::time::{Duration, Instant};
1191
1192    use common_base::Plugins;
1193    use query::query_engine::options::QueryOptions;
1194    use session::context::QueryContext;
1195    use sql::dialect::GreptimeDbDialect;
1196    use strfmt::Format;
1197
1198    use super::*;
1199
1200    #[test]
1201    fn test_fast_legacy_check_deadlock_prevention() {
1202        // Create a DashMap to simulate the cache
1203        let cache = DashMap::new();
1204
1205        // Pre-populate cache with some entries
1206        cache.insert("metric1".to_string(), true); // legacy mode
1207        cache.insert("metric2".to_string(), false); // prom mode
1208        cache.insert("metric3".to_string(), true); // legacy mode
1209
1210        // Test case 1: Normal operation with cache hits
1211        let metric1 = "metric1".to_string();
1212        let metric4 = "metric4".to_string();
1213        let names1 = vec![&metric1, &metric4];
1214        let result = fast_legacy_check(&cache, &names1);
1215        assert!(result.is_ok());
1216        assert_eq!(result.unwrap(), Some(true)); // should return legacy mode
1217
1218        // Verify that metric4 was added to cache
1219        assert!(cache.contains_key("metric4"));
1220        assert!(*cache.get("metric4").unwrap().value());
1221
1222        // Test case 2: No cache hits
1223        let metric5 = "metric5".to_string();
1224        let metric6 = "metric6".to_string();
1225        let names2 = vec![&metric5, &metric6];
1226        let result = fast_legacy_check(&cache, &names2);
1227        assert!(result.is_ok());
1228        assert_eq!(result.unwrap(), None); // should return None as no cache hits
1229
1230        // Test case 3: Incompatible modes should return error
1231        let cache_incompatible = DashMap::new();
1232        cache_incompatible.insert("metric1".to_string(), true); // legacy
1233        cache_incompatible.insert("metric2".to_string(), false); // prom
1234        let metric1_test = "metric1".to_string();
1235        let metric2_test = "metric2".to_string();
1236        let names3 = vec![&metric1_test, &metric2_test];
1237        let result = fast_legacy_check(&cache_incompatible, &names3);
1238        assert!(result.is_err()); // should error due to incompatible modes
1239
1240        // Test case 4: Intensive concurrent access to test deadlock prevention
1241        // This test specifically targets the scenario where multiple threads
1242        // access the same cache entries simultaneously
1243        let cache_concurrent = Arc::new(DashMap::new());
1244        cache_concurrent.insert("shared_metric".to_string(), true);
1245
1246        let num_threads = 8;
1247        let operations_per_thread = 100;
1248        let barrier = Arc::new(Barrier::new(num_threads));
1249        let success_flag = Arc::new(AtomicBool::new(true));
1250
1251        let handles: Vec<_> = (0..num_threads)
1252            .map(|thread_id| {
1253                let cache_clone = Arc::clone(&cache_concurrent);
1254                let barrier_clone = Arc::clone(&barrier);
1255                let success_flag_clone = Arc::clone(&success_flag);
1256
1257                thread::spawn(move || {
1258                    // Wait for all threads to be ready
1259                    barrier_clone.wait();
1260
1261                    let start_time = Instant::now();
1262                    for i in 0..operations_per_thread {
1263                        // Each operation references existing cache entry and adds new ones
1264                        let shared_metric = "shared_metric".to_string();
1265                        let new_metric = format!("thread_{}_metric_{}", thread_id, i);
1266                        let names = vec![&shared_metric, &new_metric];
1267
1268                        match fast_legacy_check(&cache_clone, &names) {
1269                            Ok(_) => {}
1270                            Err(_) => {
1271                                success_flag_clone.store(false, Ordering::Relaxed);
1272                                return;
1273                            }
1274                        }
1275
1276                        // If the test takes too long, it likely means deadlock
1277                        if start_time.elapsed() > Duration::from_secs(10) {
1278                            success_flag_clone.store(false, Ordering::Relaxed);
1279                            return;
1280                        }
1281                    }
1282                })
1283            })
1284            .collect();
1285
1286        // Join all threads with timeout
1287        let start_time = Instant::now();
1288        for (i, handle) in handles.into_iter().enumerate() {
1289            let join_result = handle.join();
1290
1291            // Check if we're taking too long (potential deadlock)
1292            if start_time.elapsed() > Duration::from_secs(30) {
1293                panic!("Test timed out - possible deadlock detected!");
1294            }
1295
1296            if join_result.is_err() {
1297                panic!("Thread {} panicked during execution", i);
1298            }
1299        }
1300
1301        // Verify all operations completed successfully
1302        assert!(
1303            success_flag.load(Ordering::Relaxed),
1304            "Some operations failed"
1305        );
1306
1307        // Verify that many new entries were added (proving operations completed)
1308        let final_count = cache_concurrent.len();
1309        assert!(
1310            final_count > 1 + num_threads * operations_per_thread / 2,
1311            "Expected more cache entries, got {}",
1312            final_count
1313        );
1314    }
1315
1316    #[test]
1317    fn test_exec_validation() {
1318        let query_ctx = QueryContext::arc();
1319        let plugins: Plugins = Plugins::new();
1320        plugins.insert(QueryOptions {
1321            disallow_cross_catalog_query: true,
1322        });
1323
1324        let sql = r#"
1325        SELECT * FROM demo;
1326        EXPLAIN SELECT * FROM demo;
1327        CREATE DATABASE test_database;
1328        SHOW DATABASES;
1329        "#;
1330        let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1331        assert_eq!(stmts.len(), 4);
1332        for stmt in stmts {
1333            let re = check_permission(plugins.clone(), &stmt, &query_ctx);
1334            re.unwrap();
1335        }
1336
1337        let sql = r#"
1338        SHOW CREATE TABLE demo;
1339        ALTER TABLE demo ADD COLUMN new_col INT;
1340        "#;
1341        let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1342        assert_eq!(stmts.len(), 2);
1343        for stmt in stmts {
1344            let re = check_permission(plugins.clone(), &stmt, &query_ctx);
1345            re.unwrap();
1346        }
1347
1348        fn replace_test(template_sql: &str, plugins: Plugins, query_ctx: &QueryContextRef) {
1349            // test right
1350            let right = vec![("", ""), ("", "public."), ("greptime.", "public.")];
1351            for (catalog, schema) in right {
1352                let sql = do_fmt(template_sql, catalog, schema);
1353                do_test(&sql, plugins.clone(), query_ctx, true);
1354            }
1355
1356            let wrong = vec![
1357                ("wrongcatalog.", "public."),
1358                ("wrongcatalog.", "wrongschema."),
1359            ];
1360            for (catalog, schema) in wrong {
1361                let sql = do_fmt(template_sql, catalog, schema);
1362                do_test(&sql, plugins.clone(), query_ctx, false);
1363            }
1364        }
1365
1366        fn do_fmt(template: &str, catalog: &str, schema: &str) -> String {
1367            let vars = HashMap::from([
1368                ("catalog".to_string(), catalog),
1369                ("schema".to_string(), schema),
1370            ]);
1371            template.format(&vars).unwrap()
1372        }
1373
1374        fn do_test(sql: &str, plugins: Plugins, query_ctx: &QueryContextRef, is_ok: bool) {
1375            let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
1376            let re = check_permission(plugins, stmt, query_ctx);
1377            if is_ok {
1378                re.unwrap();
1379            } else {
1380                assert!(re.is_err());
1381            }
1382        }
1383
1384        // test insert
1385        let sql = "INSERT INTO {catalog}{schema}monitor(host) VALUES ('host1');";
1386        replace_test(sql, plugins.clone(), &query_ctx);
1387
1388        // test create table
1389        let sql = r#"CREATE TABLE {catalog}{schema}demo(
1390                            host STRING,
1391                            ts TIMESTAMP,
1392                            TIME INDEX (ts),
1393                            PRIMARY KEY(host)
1394                        ) engine=mito;"#;
1395        replace_test(sql, plugins.clone(), &query_ctx);
1396
1397        // test drop table
1398        let sql = "DROP TABLE {catalog}{schema}demo;";
1399        replace_test(sql, plugins.clone(), &query_ctx);
1400
1401        // test show tables
1402        let sql = "SHOW TABLES FROM public";
1403        let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1404        check_permission(plugins.clone(), &stmt[0], &query_ctx).unwrap();
1405
1406        let sql = "SHOW TABLES FROM private";
1407        let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1408        let re = check_permission(plugins.clone(), &stmt[0], &query_ctx);
1409        assert!(re.is_ok());
1410
1411        // test describe table
1412        let sql = "DESC TABLE {catalog}{schema}demo;";
1413        replace_test(sql, plugins.clone(), &query_ctx);
1414
1415        let comment_flow_cases = [
1416            ("COMMENT ON FLOW my_flow IS 'comment';", true),
1417            ("COMMENT ON FLOW greptime.my_flow IS 'comment';", true),
1418            ("COMMENT ON FLOW wrongcatalog.my_flow IS 'comment';", false),
1419        ];
1420        for (sql, is_ok) in comment_flow_cases {
1421            let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
1422            let result = check_permission(plugins.clone(), stmt, &query_ctx);
1423            assert_eq!(result.is_ok(), is_ok);
1424        }
1425
1426        let show_flow_cases = [
1427            ("SHOW CREATE FLOW my_flow;", true),
1428            ("SHOW CREATE FLOW greptime.my_flow;", true),
1429            ("SHOW CREATE FLOW wrongcatalog.my_flow;", false),
1430        ];
1431        for (sql, is_ok) in show_flow_cases {
1432            let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
1433            let result = check_permission(plugins.clone(), stmt, &query_ctx);
1434            assert_eq!(result.is_ok(), is_ok);
1435        }
1436    }
1437}