frontend/
instance.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod builder;
16mod grpc;
17mod influxdb;
18mod jaeger;
19mod log_handler;
20mod logs;
21mod opentsdb;
22mod otlp;
23pub mod prom_store;
24mod promql;
25mod region_query;
26pub mod standalone;
27
28use std::pin::Pin;
29use std::sync::atomic::AtomicBool;
30use std::sync::{Arc, atomic};
31use std::time::{Duration, SystemTime};
32
33use async_stream::stream;
34use async_trait::async_trait;
35use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
36use catalog::CatalogManagerRef;
37use catalog::process_manager::{
38    ProcessManagerRef, QueryStatement as CatalogQueryStatement, SlowQueryTimer,
39};
40use client::OutputData;
41use common_base::Plugins;
42use common_base::cancellation::CancellableFuture;
43use common_error::ext::{BoxedError, ErrorExt};
44use common_event_recorder::EventRecorderRef;
45use common_meta::cache_invalidator::CacheInvalidatorRef;
46use common_meta::key::TableMetadataManagerRef;
47use common_meta::key::table_name::TableNameKey;
48use common_meta::node_manager::NodeManagerRef;
49use common_meta::procedure_executor::ProcedureExecutorRef;
50use common_query::Output;
51use common_recordbatch::RecordBatchStreamWrapper;
52use common_recordbatch::error::StreamTimeoutSnafu;
53use common_telemetry::logging::SlowQueryOptions;
54use common_telemetry::{debug, error, tracing};
55use dashmap::DashMap;
56use datafusion_expr::LogicalPlan;
57use futures::{Stream, StreamExt};
58use lazy_static::lazy_static;
59use operator::delete::DeleterRef;
60use operator::insert::InserterRef;
61use operator::statement::{StatementExecutor, StatementExecutorRef};
62use partition::manager::PartitionRuleManagerRef;
63use pipeline::pipeline_operator::PipelineOperator;
64use prometheus::HistogramTimer;
65use promql_parser::label::Matcher;
66use query::QueryEngineRef;
67use query::metrics::OnDone;
68use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
69use query::query_engine::DescribeResult;
70use query::query_engine::options::{QueryOptions, validate_catalog_and_schema};
71use servers::error::{
72    self as server_error, AuthSnafu, CommonMetaSnafu, ExecuteQuerySnafu,
73    OtlpMetricModeIncompatibleSnafu, ParsePromQLSnafu, UnexpectedResultSnafu,
74};
75use servers::interceptor::{
76    PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef,
77};
78use servers::otlp::metrics::legacy_normalize_otlp_name;
79use servers::prometheus_handler::PrometheusHandler;
80use servers::query_handler::sql::SqlQueryHandler;
81use session::context::{Channel, QueryContextRef};
82use session::table_name::table_idents_to_full_name;
83use snafu::prelude::*;
84use sql::ast::ObjectNamePartExt;
85use sql::dialect::Dialect;
86use sql::parser::{ParseOptions, ParserContext};
87use sql::statements::comment::CommentObject;
88use sql::statements::copy::{CopyDatabase, CopyTable};
89use sql::statements::statement::Statement;
90use sql::statements::tql::Tql;
91use sqlparser::ast::ObjectName;
92pub use standalone::StandaloneDatanodeManager;
93use table::requests::{OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM};
94
95use crate::error::{
96    self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, InvalidSqlSnafu,
97    ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu,
98    StatementTimeoutSnafu, TableOperationSnafu,
99};
100use crate::limiter::LimiterRef;
101use crate::stream_wrapper::CancellableStreamWrapper;
102
103lazy_static! {
104    static ref OTLP_LEGACY_DEFAULT_VALUE: String = "legacy".to_string();
105}
106
107/// The frontend instance contains necessary components, and implements many
108/// traits, like [`servers::query_handler::grpc::GrpcQueryHandler`],
109/// [`servers::query_handler::sql::SqlQueryHandler`], etc.
110#[derive(Clone)]
111pub struct Instance {
112    catalog_manager: CatalogManagerRef,
113    pipeline_operator: Arc<PipelineOperator>,
114    statement_executor: Arc<StatementExecutor>,
115    query_engine: QueryEngineRef,
116    plugins: Plugins,
117    inserter: InserterRef,
118    deleter: DeleterRef,
119    table_metadata_manager: TableMetadataManagerRef,
120    event_recorder: Option<EventRecorderRef>,
121    limiter: Option<LimiterRef>,
122    process_manager: ProcessManagerRef,
123    slow_query_options: SlowQueryOptions,
124    suspend: Arc<AtomicBool>,
125
126    // cache for otlp metrics
127    // first layer key: db-string
128    // key: direct input metric name
129    // value: if runs in legacy mode
130    otlp_metrics_table_legacy_cache: DashMap<String, DashMap<String, bool>>,
131}
132
133impl Instance {
134    pub fn catalog_manager(&self) -> &CatalogManagerRef {
135        &self.catalog_manager
136    }
137
138    pub fn query_engine(&self) -> &QueryEngineRef {
139        &self.query_engine
140    }
141
142    pub fn plugins(&self) -> &Plugins {
143        &self.plugins
144    }
145
146    pub fn statement_executor(&self) -> &StatementExecutorRef {
147        &self.statement_executor
148    }
149
150    pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
151        &self.table_metadata_manager
152    }
153
154    pub fn inserter(&self) -> &InserterRef {
155        &self.inserter
156    }
157
158    pub fn process_manager(&self) -> &ProcessManagerRef {
159        &self.process_manager
160    }
161
162    pub fn node_manager(&self) -> &NodeManagerRef {
163        self.inserter.node_manager()
164    }
165
166    pub fn partition_manager(&self) -> &PartitionRuleManagerRef {
167        self.inserter.partition_manager()
168    }
169
170    pub fn cache_invalidator(&self) -> &CacheInvalidatorRef {
171        self.statement_executor.cache_invalidator()
172    }
173
174    pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
175        self.statement_executor.procedure_executor()
176    }
177
178    pub fn suspend_state(&self) -> Arc<AtomicBool> {
179        self.suspend.clone()
180    }
181
182    pub(crate) fn is_suspended(&self) -> bool {
183        self.suspend.load(atomic::Ordering::Relaxed)
184    }
185}
186
187fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result<Vec<Statement>> {
188    ParserContext::create_with_dialect(sql, dialect, ParseOptions::default()).context(ParseSqlSnafu)
189}
190
191impl Instance {
192    async fn query_statement(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
193        check_permission(self.plugins.clone(), &stmt, &query_ctx)?;
194
195        let query_interceptor = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
196        let query_interceptor = query_interceptor.as_ref();
197
198        if should_capture_statement(Some(&stmt)) {
199            let slow_query_timer = self
200                .slow_query_options
201                .enable
202                .then(|| self.event_recorder.clone())
203                .flatten()
204                .map(|event_recorder| {
205                    SlowQueryTimer::new(
206                        CatalogQueryStatement::Sql(stmt.clone()),
207                        self.slow_query_options.threshold,
208                        self.slow_query_options.sample_ratio,
209                        self.slow_query_options.record_type,
210                        event_recorder,
211                    )
212                });
213
214            let ticket = self.process_manager.register_query(
215                query_ctx.current_catalog().to_string(),
216                vec![query_ctx.current_schema()],
217                stmt.to_string(),
218                query_ctx.conn_info().to_string(),
219                Some(query_ctx.process_id()),
220                slow_query_timer,
221            );
222
223            let query_fut = self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor);
224
225            CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
226                .await
227                .map_err(|_| error::CancelledSnafu.build())?
228                .map(|output| {
229                    let Output { meta, data } = output;
230
231                    let data = match data {
232                        OutputData::Stream(stream) => OutputData::Stream(Box::pin(
233                            CancellableStreamWrapper::new(stream, ticket),
234                        )),
235                        other => other,
236                    };
237                    Output { data, meta }
238                })
239        } else {
240            self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor)
241                .await
242        }
243    }
244
245    async fn exec_statement_with_timeout(
246        &self,
247        stmt: Statement,
248        query_ctx: QueryContextRef,
249        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
250    ) -> Result<Output> {
251        let timeout = derive_timeout(&stmt, &query_ctx);
252        match timeout {
253            Some(timeout) => {
254                let start = tokio::time::Instant::now();
255                let output = tokio::time::timeout(
256                    timeout,
257                    self.exec_statement(stmt, query_ctx, query_interceptor),
258                )
259                .await
260                .map_err(|_| StatementTimeoutSnafu.build())??;
261                // compute remaining timeout
262                let remaining_timeout = timeout.checked_sub(start.elapsed()).unwrap_or_default();
263                attach_timeout(output, remaining_timeout)
264            }
265            None => {
266                self.exec_statement(stmt, query_ctx, query_interceptor)
267                    .await
268            }
269        }
270    }
271
272    async fn exec_statement(
273        &self,
274        stmt: Statement,
275        query_ctx: QueryContextRef,
276        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
277    ) -> Result<Output> {
278        match stmt {
279            Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
280                // TODO: remove this when format is supported in datafusion
281                if let Statement::Explain(explain) = &stmt
282                    && let Some(format) = explain.format()
283                {
284                    query_ctx.set_explain_format(format.to_string());
285                }
286
287                self.plan_and_exec_sql(stmt, &query_ctx, query_interceptor)
288                    .await
289            }
290            Statement::Tql(tql) => {
291                self.plan_and_exec_tql(&query_ctx, query_interceptor, tql)
292                    .await
293            }
294            _ => {
295                query_interceptor.pre_execute(&stmt, None, query_ctx.clone())?;
296                self.statement_executor
297                    .execute_sql(stmt, query_ctx)
298                    .await
299                    .context(TableOperationSnafu)
300            }
301        }
302    }
303
304    async fn plan_and_exec_sql(
305        &self,
306        stmt: Statement,
307        query_ctx: &QueryContextRef,
308        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
309    ) -> Result<Output> {
310        let stmt = QueryStatement::Sql(stmt);
311        let plan = self
312            .statement_executor
313            .plan(&stmt, query_ctx.clone())
314            .await?;
315        let QueryStatement::Sql(stmt) = stmt else {
316            unreachable!()
317        };
318        query_interceptor.pre_execute(&stmt, Some(&plan), query_ctx.clone())?;
319        self.statement_executor
320            .exec_plan(plan, query_ctx.clone())
321            .await
322            .context(TableOperationSnafu)
323    }
324
325    async fn plan_and_exec_tql(
326        &self,
327        query_ctx: &QueryContextRef,
328        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
329        tql: Tql,
330    ) -> Result<Output> {
331        let plan = self
332            .statement_executor
333            .plan_tql(tql.clone(), query_ctx)
334            .await?;
335        query_interceptor.pre_execute(&Statement::Tql(tql), Some(&plan), query_ctx.clone())?;
336        self.statement_executor
337            .exec_plan(plan, query_ctx.clone())
338            .await
339            .context(TableOperationSnafu)
340    }
341
342    async fn check_otlp_legacy(
343        &self,
344        names: &[&String],
345        ctx: QueryContextRef,
346    ) -> server_error::Result<bool> {
347        let db_string = ctx.get_db_string();
348        // fast cache check
349        let cache = self
350            .otlp_metrics_table_legacy_cache
351            .entry(db_string.clone())
352            .or_default();
353        if let Some(flag) = fast_legacy_check(&cache, names)? {
354            return Ok(flag);
355        }
356        // release cache reference to avoid lock contention
357        drop(cache);
358
359        let catalog = ctx.current_catalog();
360        let schema = ctx.current_schema();
361
362        // query legacy table names
363        let normalized_names = names
364            .iter()
365            .map(|n| legacy_normalize_otlp_name(n))
366            .collect::<Vec<_>>();
367        let table_names = normalized_names
368            .iter()
369            .map(|n| TableNameKey::new(catalog, &schema, n))
370            .collect::<Vec<_>>();
371        let table_values = self
372            .table_metadata_manager()
373            .table_name_manager()
374            .batch_get(table_names)
375            .await
376            .context(CommonMetaSnafu)?;
377        let table_ids = table_values
378            .into_iter()
379            .filter_map(|v| v.map(|vi| vi.table_id()))
380            .collect::<Vec<_>>();
381
382        // means no existing table is found, use new mode
383        if table_ids.is_empty() {
384            let cache = self
385                .otlp_metrics_table_legacy_cache
386                .entry(db_string)
387                .or_default();
388            names.iter().for_each(|name| {
389                cache.insert((*name).clone(), false);
390            });
391            return Ok(false);
392        }
393
394        // has existing table, check table options
395        let table_infos = self
396            .table_metadata_manager()
397            .table_info_manager()
398            .batch_get(&table_ids)
399            .await
400            .context(CommonMetaSnafu)?;
401        let options = table_infos
402            .values()
403            .map(|info| {
404                info.table_info
405                    .meta
406                    .options
407                    .extra_options
408                    .get(OTLP_METRIC_COMPAT_KEY)
409                    .unwrap_or(&OTLP_LEGACY_DEFAULT_VALUE)
410            })
411            .collect::<Vec<_>>();
412        let cache = self
413            .otlp_metrics_table_legacy_cache
414            .entry(db_string)
415            .or_default();
416        if !options.is_empty() {
417            // check value consistency
418            let has_prom = options.iter().any(|opt| *opt == OTLP_METRIC_COMPAT_PROM);
419            let has_legacy = options
420                .iter()
421                .any(|opt| *opt == OTLP_LEGACY_DEFAULT_VALUE.as_str());
422            ensure!(!(has_prom && has_legacy), OtlpMetricModeIncompatibleSnafu);
423            let flag = has_legacy;
424            names.iter().for_each(|name| {
425                cache.insert((*name).clone(), flag);
426            });
427            Ok(flag)
428        } else {
429            // no table info, use new mode
430            names.iter().for_each(|name| {
431                cache.insert((*name).clone(), false);
432            });
433            Ok(false)
434        }
435    }
436}
437
438fn fast_legacy_check(
439    cache: &DashMap<String, bool>,
440    names: &[&String],
441) -> server_error::Result<Option<bool>> {
442    let hit_cache = names
443        .iter()
444        .filter_map(|name| cache.get(*name))
445        .collect::<Vec<_>>();
446    if !hit_cache.is_empty() {
447        let hit_legacy = hit_cache.iter().any(|en| *en.value());
448        let hit_prom = hit_cache.iter().any(|en| !*en.value());
449
450        // hit but have true and false, means both legacy and new mode are used
451        // we cannot handle this case, so return error
452        // add doc links in err msg later
453        ensure!(!(hit_legacy && hit_prom), OtlpMetricModeIncompatibleSnafu);
454
455        let flag = hit_legacy;
456        // drop hit_cache to release references before inserting to avoid deadlock
457        drop(hit_cache);
458
459        // set cache for all names
460        names.iter().for_each(|name| {
461            if !cache.contains_key(*name) {
462                cache.insert((*name).clone(), flag);
463            }
464        });
465        Ok(Some(flag))
466    } else {
467        Ok(None)
468    }
469}
470
471/// If the relevant variables are set, the timeout is enforced for all PostgreSQL statements.
472/// For MySQL, it applies only to read-only statements.
473fn derive_timeout(stmt: &Statement, query_ctx: &QueryContextRef) -> Option<Duration> {
474    let query_timeout = query_ctx.query_timeout()?;
475    if query_timeout.is_zero() {
476        return None;
477    }
478    match query_ctx.channel() {
479        Channel::Mysql if stmt.is_readonly() => Some(query_timeout),
480        Channel::Postgres => Some(query_timeout),
481        _ => None,
482    }
483}
484
485fn attach_timeout(output: Output, mut timeout: Duration) -> Result<Output> {
486    if timeout.is_zero() {
487        return StatementTimeoutSnafu.fail();
488    }
489
490    let output = match output.data {
491        OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
492        OutputData::Stream(mut stream) => {
493            let schema = stream.schema();
494            let s = Box::pin(stream! {
495                let mut start = tokio::time::Instant::now();
496                while let Some(item) = tokio::time::timeout(timeout, stream.next()).await.map_err(|_| StreamTimeoutSnafu.build())? {
497                    yield item;
498
499                    let now = tokio::time::Instant::now();
500                    timeout = timeout.checked_sub(now - start).unwrap_or(Duration::ZERO);
501                    start = now;
502                    // tokio::time::timeout may not return an error immediately when timeout is 0.
503                    if timeout.is_zero() {
504                        StreamTimeoutSnafu.fail()?;
505                    }
506                }
507            }) as Pin<Box<dyn Stream<Item = _> + Send>>;
508            let stream = RecordBatchStreamWrapper {
509                schema,
510                stream: s,
511                output_ordering: None,
512                metrics: Default::default(),
513            };
514            Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
515        }
516    };
517
518    Ok(output)
519}
520
521#[async_trait]
522impl SqlQueryHandler for Instance {
523    type Error = Error;
524
525    #[tracing::instrument(skip_all)]
526    async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
527        if self.is_suspended() {
528            return vec![error::SuspendedSnafu {}.fail()];
529        }
530
531        let query_interceptor_opt = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
532        let query_interceptor = query_interceptor_opt.as_ref();
533        let query = match query_interceptor.pre_parsing(query, query_ctx.clone()) {
534            Ok(q) => q,
535            Err(e) => return vec![Err(e)],
536        };
537
538        let checker_ref = self.plugins.get::<PermissionCheckerRef>();
539        let checker = checker_ref.as_ref();
540
541        match parse_stmt(query.as_ref(), query_ctx.sql_dialect())
542            .and_then(|stmts| query_interceptor.post_parsing(stmts, query_ctx.clone()))
543        {
544            Ok(stmts) => {
545                if stmts.is_empty() {
546                    return vec![
547                        InvalidSqlSnafu {
548                            err_msg: "empty statements",
549                        }
550                        .fail(),
551                    ];
552                }
553
554                let mut results = Vec::with_capacity(stmts.len());
555                for stmt in stmts {
556                    if let Err(e) = checker
557                        .check_permission(
558                            query_ctx.current_user(),
559                            PermissionReq::SqlStatement(&stmt),
560                        )
561                        .context(PermissionSnafu)
562                    {
563                        results.push(Err(e));
564                        break;
565                    }
566
567                    match self.query_statement(stmt.clone(), query_ctx.clone()).await {
568                        Ok(output) => {
569                            let output_result =
570                                query_interceptor.post_execute(output, query_ctx.clone());
571                            results.push(output_result);
572                        }
573                        Err(e) => {
574                            if e.status_code().should_log_error() {
575                                error!(e; "Failed to execute query: {stmt}");
576                            } else {
577                                debug!("Failed to execute query: {stmt}, {e}");
578                            }
579                            results.push(Err(e));
580                            break;
581                        }
582                    }
583                }
584                results
585            }
586            Err(e) => {
587                vec![Err(e)]
588            }
589        }
590    }
591
592    async fn do_exec_plan(
593        &self,
594        stmt: Option<Statement>,
595        plan: LogicalPlan,
596        query_ctx: QueryContextRef,
597    ) -> Result<Output> {
598        ensure!(!self.is_suspended(), error::SuspendedSnafu);
599
600        if should_capture_statement(stmt.as_ref()) {
601            // It's safe to unwrap here because we've already checked the type.
602            let stmt = stmt.unwrap();
603            let query = stmt.to_string();
604            let slow_query_timer = self
605                .slow_query_options
606                .enable
607                .then(|| self.event_recorder.clone())
608                .flatten()
609                .map(|event_recorder| {
610                    SlowQueryTimer::new(
611                        CatalogQueryStatement::Sql(stmt.clone()),
612                        self.slow_query_options.threshold,
613                        self.slow_query_options.sample_ratio,
614                        self.slow_query_options.record_type,
615                        event_recorder,
616                    )
617                });
618
619            let ticket = self.process_manager.register_query(
620                query_ctx.current_catalog().to_string(),
621                vec![query_ctx.current_schema()],
622                query,
623                query_ctx.conn_info().to_string(),
624                Some(query_ctx.process_id()),
625                slow_query_timer,
626            );
627
628            let query_fut = self.query_engine.execute(plan.clone(), query_ctx);
629
630            CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
631                .await
632                .map_err(|_| error::CancelledSnafu.build())?
633                .map(|output| {
634                    let Output { meta, data } = output;
635
636                    let data = match data {
637                        OutputData::Stream(stream) => OutputData::Stream(Box::pin(
638                            CancellableStreamWrapper::new(stream, ticket),
639                        )),
640                        other => other,
641                    };
642                    Output { data, meta }
643                })
644                .context(ExecLogicalPlanSnafu)
645        } else {
646            // plan should be prepared before exec
647            // we'll do check there
648            self.query_engine
649                .execute(plan.clone(), query_ctx)
650                .await
651                .context(ExecLogicalPlanSnafu)
652        }
653    }
654
655    #[tracing::instrument(skip_all)]
656    async fn do_promql_query(
657        &self,
658        query: &PromQuery,
659        query_ctx: QueryContextRef,
660    ) -> Vec<Result<Output>> {
661        if self.is_suspended() {
662            return vec![error::SuspendedSnafu {}.fail()];
663        }
664
665        // check will be done in prometheus handler's do_query
666        let result = PrometheusHandler::do_query(self, query, query_ctx)
667            .await
668            .with_context(|_| ExecutePromqlSnafu {
669                query: format!("{query:?}"),
670            });
671        vec![result]
672    }
673
674    async fn do_describe(
675        &self,
676        stmt: Statement,
677        query_ctx: QueryContextRef,
678    ) -> Result<Option<DescribeResult>> {
679        ensure!(!self.is_suspended(), error::SuspendedSnafu);
680
681        if matches!(
682            stmt,
683            Statement::Insert(_) | Statement::Query(_) | Statement::Delete(_)
684        ) {
685            self.plugins
686                .get::<PermissionCheckerRef>()
687                .as_ref()
688                .check_permission(query_ctx.current_user(), PermissionReq::SqlStatement(&stmt))
689                .context(PermissionSnafu)?;
690
691            let plan = self
692                .query_engine
693                .planner()
694                .plan(&QueryStatement::Sql(stmt), query_ctx.clone())
695                .await
696                .context(PlanStatementSnafu)?;
697            self.query_engine
698                .describe(plan, query_ctx)
699                .await
700                .map(Some)
701                .context(error::DescribeStatementSnafu)
702        } else {
703            Ok(None)
704        }
705    }
706
707    async fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result<bool> {
708        self.catalog_manager
709            .schema_exists(catalog, schema, None)
710            .await
711            .context(error::CatalogSnafu)
712    }
713}
714
715/// Attaches a timer to the output and observes it once the output is exhausted.
716pub fn attach_timer(output: Output, timer: HistogramTimer) -> Output {
717    match output.data {
718        OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
719        OutputData::Stream(stream) => {
720            let stream = OnDone::new(stream, move || {
721                timer.observe_duration();
722            });
723            Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
724        }
725    }
726}
727
728#[async_trait]
729impl PrometheusHandler for Instance {
730    #[tracing::instrument(skip_all)]
731    async fn do_query(
732        &self,
733        query: &PromQuery,
734        query_ctx: QueryContextRef,
735    ) -> server_error::Result<Output> {
736        let interceptor = self
737            .plugins
738            .get::<PromQueryInterceptorRef<server_error::Error>>();
739
740        self.plugins
741            .get::<PermissionCheckerRef>()
742            .as_ref()
743            .check_permission(query_ctx.current_user(), PermissionReq::PromQuery)
744            .context(AuthSnafu)?;
745
746        let stmt = QueryLanguageParser::parse_promql(query, &query_ctx).with_context(|_| {
747            ParsePromQLSnafu {
748                query: query.clone(),
749            }
750        })?;
751
752        let plan = self
753            .statement_executor
754            .plan(&stmt, query_ctx.clone())
755            .await
756            .map_err(BoxedError::new)
757            .context(ExecuteQuerySnafu)?;
758
759        interceptor.pre_execute(query, Some(&plan), query_ctx.clone())?;
760
761        // Take the EvalStmt from the original QueryStatement and use it to create the CatalogQueryStatement.
762        let query_statement = if let QueryStatement::Promql(eval_stmt, alias) = stmt {
763            CatalogQueryStatement::Promql(eval_stmt, alias)
764        } else {
765            // It should not happen since the query is already parsed successfully.
766            return UnexpectedResultSnafu {
767                reason: "The query should always be promql.".to_string(),
768            }
769            .fail();
770        };
771        let query = query_statement.to_string();
772
773        let slow_query_timer = self
774            .slow_query_options
775            .enable
776            .then(|| self.event_recorder.clone())
777            .flatten()
778            .map(|event_recorder| {
779                SlowQueryTimer::new(
780                    query_statement,
781                    self.slow_query_options.threshold,
782                    self.slow_query_options.sample_ratio,
783                    self.slow_query_options.record_type,
784                    event_recorder,
785                )
786            });
787
788        let ticket = self.process_manager.register_query(
789            query_ctx.current_catalog().to_string(),
790            vec![query_ctx.current_schema()],
791            query,
792            query_ctx.conn_info().to_string(),
793            Some(query_ctx.process_id()),
794            slow_query_timer,
795        );
796
797        let query_fut = self.statement_executor.exec_plan(plan, query_ctx.clone());
798
799        let output = CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
800            .await
801            .map_err(|_| servers::error::CancelledSnafu.build())?
802            .map(|output| {
803                let Output { meta, data } = output;
804                let data = match data {
805                    OutputData::Stream(stream) => {
806                        OutputData::Stream(Box::pin(CancellableStreamWrapper::new(stream, ticket)))
807                    }
808                    other => other,
809                };
810                Output { data, meta }
811            })
812            .map_err(BoxedError::new)
813            .context(ExecuteQuerySnafu)?;
814
815        Ok(interceptor.post_execute(output, query_ctx)?)
816    }
817
818    async fn query_metric_names(
819        &self,
820        matchers: Vec<Matcher>,
821        ctx: &QueryContextRef,
822    ) -> server_error::Result<Vec<String>> {
823        self.handle_query_metric_names(matchers, ctx)
824            .await
825            .map_err(BoxedError::new)
826            .context(ExecuteQuerySnafu)
827    }
828
829    async fn query_label_values(
830        &self,
831        metric: String,
832        label_name: String,
833        matchers: Vec<Matcher>,
834        start: SystemTime,
835        end: SystemTime,
836        ctx: &QueryContextRef,
837    ) -> server_error::Result<Vec<String>> {
838        self.handle_query_label_values(metric, label_name, matchers, start, end, ctx)
839            .await
840            .map_err(BoxedError::new)
841            .context(ExecuteQuerySnafu)
842    }
843
844    fn catalog_manager(&self) -> CatalogManagerRef {
845        self.catalog_manager.clone()
846    }
847}
848
849/// Validate `stmt.database` permission if it's presented.
850macro_rules! validate_db_permission {
851    ($stmt: expr, $query_ctx: expr) => {
852        if let Some(database) = &$stmt.database {
853            validate_catalog_and_schema($query_ctx.current_catalog(), database, $query_ctx)
854                .map_err(BoxedError::new)
855                .context(SqlExecInterceptedSnafu)?;
856        }
857    };
858}
859
860pub fn check_permission(
861    plugins: Plugins,
862    stmt: &Statement,
863    query_ctx: &QueryContextRef,
864) -> Result<()> {
865    let need_validate = plugins
866        .get::<QueryOptions>()
867        .map(|opts| opts.disallow_cross_catalog_query)
868        .unwrap_or_default();
869
870    if !need_validate {
871        return Ok(());
872    }
873
874    match stmt {
875        // Will be checked in execution.
876        // TODO(dennis): add a hook for admin commands.
877        Statement::Admin(_) => {}
878        // These are executed by query engine, and will be checked there.
879        Statement::Query(_)
880        | Statement::Explain(_)
881        | Statement::Tql(_)
882        | Statement::Delete(_)
883        | Statement::DeclareCursor(_)
884        | Statement::Copy(sql::statements::copy::Copy::CopyQueryTo(_)) => {}
885        // database ops won't be checked
886        Statement::CreateDatabase(_)
887        | Statement::ShowDatabases(_)
888        | Statement::DropDatabase(_)
889        | Statement::AlterDatabase(_)
890        | Statement::DropFlow(_)
891        | Statement::Use(_) => {}
892        #[cfg(feature = "enterprise")]
893        Statement::DropTrigger(_) => {}
894        Statement::ShowCreateDatabase(stmt) => {
895            validate_database(&stmt.database_name, query_ctx)?;
896        }
897        Statement::ShowCreateTable(stmt) => {
898            validate_param(&stmt.table_name, query_ctx)?;
899        }
900        Statement::ShowCreateFlow(stmt) => {
901            validate_flow(&stmt.flow_name, query_ctx)?;
902        }
903        #[cfg(feature = "enterprise")]
904        Statement::ShowCreateTrigger(stmt) => {
905            validate_param(&stmt.trigger_name, query_ctx)?;
906        }
907        Statement::ShowCreateView(stmt) => {
908            validate_param(&stmt.view_name, query_ctx)?;
909        }
910        Statement::CreateExternalTable(stmt) => {
911            validate_param(&stmt.name, query_ctx)?;
912        }
913        Statement::CreateFlow(stmt) => {
914            // TODO: should also validate source table name here?
915            validate_param(&stmt.sink_table_name, query_ctx)?;
916        }
917        #[cfg(feature = "enterprise")]
918        Statement::CreateTrigger(stmt) => {
919            validate_param(&stmt.trigger_name, query_ctx)?;
920        }
921        Statement::CreateView(stmt) => {
922            validate_param(&stmt.name, query_ctx)?;
923        }
924        Statement::AlterTable(stmt) => {
925            validate_param(stmt.table_name(), query_ctx)?;
926        }
927        #[cfg(feature = "enterprise")]
928        Statement::AlterTrigger(_) => {}
929        // set/show variable now only alter/show variable in session
930        Statement::SetVariables(_) | Statement::ShowVariables(_) => {}
931        // show charset and show collation won't be checked
932        Statement::ShowCharset(_) | Statement::ShowCollation(_) => {}
933
934        Statement::Comment(comment) => match &comment.object {
935            CommentObject::Table(table) => validate_param(table, query_ctx)?,
936            CommentObject::Column { table, .. } => validate_param(table, query_ctx)?,
937            CommentObject::Flow(flow) => validate_flow(flow, query_ctx)?,
938        },
939
940        Statement::Insert(insert) => {
941            let name = insert.table_name().context(ParseSqlSnafu)?;
942            validate_param(name, query_ctx)?;
943        }
944        Statement::CreateTable(stmt) => {
945            validate_param(&stmt.name, query_ctx)?;
946        }
947        Statement::CreateTableLike(stmt) => {
948            validate_param(&stmt.table_name, query_ctx)?;
949            validate_param(&stmt.source_name, query_ctx)?;
950        }
951        Statement::DropTable(drop_stmt) => {
952            for table_name in drop_stmt.table_names() {
953                validate_param(table_name, query_ctx)?;
954            }
955        }
956        Statement::DropView(stmt) => {
957            validate_param(&stmt.view_name, query_ctx)?;
958        }
959        Statement::ShowTables(stmt) => {
960            validate_db_permission!(stmt, query_ctx);
961        }
962        Statement::ShowTableStatus(stmt) => {
963            validate_db_permission!(stmt, query_ctx);
964        }
965        Statement::ShowColumns(stmt) => {
966            validate_db_permission!(stmt, query_ctx);
967        }
968        Statement::ShowIndex(stmt) => {
969            validate_db_permission!(stmt, query_ctx);
970        }
971        Statement::ShowRegion(stmt) => {
972            validate_db_permission!(stmt, query_ctx);
973        }
974        Statement::ShowViews(stmt) => {
975            validate_db_permission!(stmt, query_ctx);
976        }
977        Statement::ShowFlows(stmt) => {
978            validate_db_permission!(stmt, query_ctx);
979        }
980        #[cfg(feature = "enterprise")]
981        Statement::ShowTriggers(_stmt) => {
982            // The trigger is organized based on the catalog dimension, so there
983            // is no need to check the permission of the database(schema).
984        }
985        Statement::ShowStatus(_stmt) => {}
986        Statement::ShowSearchPath(_stmt) => {}
987        Statement::DescribeTable(stmt) => {
988            validate_param(stmt.name(), query_ctx)?;
989        }
990        Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => match stmt {
991            CopyTable::To(copy_table_to) => validate_param(&copy_table_to.table_name, query_ctx)?,
992            CopyTable::From(copy_table_from) => {
993                validate_param(&copy_table_from.table_name, query_ctx)?
994            }
995        },
996        Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
997            match copy_database {
998                CopyDatabase::To(stmt) => validate_database(&stmt.database_name, query_ctx)?,
999                CopyDatabase::From(stmt) => validate_database(&stmt.database_name, query_ctx)?,
1000            }
1001        }
1002        Statement::TruncateTable(stmt) => {
1003            validate_param(stmt.table_name(), query_ctx)?;
1004        }
1005        // cursor operations are always allowed once it's created
1006        Statement::FetchCursor(_) | Statement::CloseCursor(_) => {}
1007        // User can only kill process in their own catalog.
1008        Statement::Kill(_) => {}
1009        // SHOW PROCESSLIST
1010        Statement::ShowProcesslist(_) => {}
1011    }
1012    Ok(())
1013}
1014
1015fn validate_param(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1016    let (catalog, schema, _) = table_idents_to_full_name(name, query_ctx)
1017        .map_err(BoxedError::new)
1018        .context(ExternalSnafu)?;
1019
1020    validate_catalog_and_schema(&catalog, &schema, query_ctx)
1021        .map_err(BoxedError::new)
1022        .context(SqlExecInterceptedSnafu)
1023}
1024
1025fn validate_flow(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1026    let catalog = match &name.0[..] {
1027        [_flow] => query_ctx.current_catalog().to_string(),
1028        [catalog, _flow] => catalog.to_string_unquoted(),
1029        _ => {
1030            return InvalidSqlSnafu {
1031                err_msg: format!(
1032                    "expect flow name to be <catalog>.<flow_name> or <flow_name>, actual: {name}",
1033                ),
1034            }
1035            .fail();
1036        }
1037    };
1038
1039    let schema = query_ctx.current_schema();
1040
1041    validate_catalog_and_schema(&catalog, &schema, query_ctx)
1042        .map_err(BoxedError::new)
1043        .context(SqlExecInterceptedSnafu)
1044}
1045
1046fn validate_database(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1047    let (catalog, schema) = match &name.0[..] {
1048        [schema] => (
1049            query_ctx.current_catalog().to_string(),
1050            schema.to_string_unquoted(),
1051        ),
1052        [catalog, schema] => (catalog.to_string_unquoted(), schema.to_string_unquoted()),
1053        _ => InvalidSqlSnafu {
1054            err_msg: format!(
1055                "expect database name to be <catalog>.<schema> or <schema>, actual: {name}",
1056            ),
1057        }
1058        .fail()?,
1059    };
1060
1061    validate_catalog_and_schema(&catalog, &schema, query_ctx)
1062        .map_err(BoxedError::new)
1063        .context(SqlExecInterceptedSnafu)
1064}
1065
1066// Create a query ticket and slow query timer if the statement is a query or readonly statement.
1067fn should_capture_statement(stmt: Option<&Statement>) -> bool {
1068    if let Some(stmt) = stmt {
1069        matches!(stmt, Statement::Query(_)) || stmt.is_readonly()
1070    } else {
1071        false
1072    }
1073}
1074
1075#[cfg(test)]
1076mod tests {
1077    use std::collections::HashMap;
1078    use std::sync::atomic::{AtomicBool, Ordering};
1079    use std::sync::{Arc, Barrier};
1080    use std::thread;
1081    use std::time::{Duration, Instant};
1082
1083    use common_base::Plugins;
1084    use query::query_engine::options::QueryOptions;
1085    use session::context::QueryContext;
1086    use sql::dialect::GreptimeDbDialect;
1087    use strfmt::Format;
1088
1089    use super::*;
1090
1091    #[test]
1092    fn test_fast_legacy_check_deadlock_prevention() {
1093        // Create a DashMap to simulate the cache
1094        let cache = DashMap::new();
1095
1096        // Pre-populate cache with some entries
1097        cache.insert("metric1".to_string(), true); // legacy mode
1098        cache.insert("metric2".to_string(), false); // prom mode
1099        cache.insert("metric3".to_string(), true); // legacy mode
1100
1101        // Test case 1: Normal operation with cache hits
1102        let metric1 = "metric1".to_string();
1103        let metric4 = "metric4".to_string();
1104        let names1 = vec![&metric1, &metric4];
1105        let result = fast_legacy_check(&cache, &names1);
1106        assert!(result.is_ok());
1107        assert_eq!(result.unwrap(), Some(true)); // should return legacy mode
1108
1109        // Verify that metric4 was added to cache
1110        assert!(cache.contains_key("metric4"));
1111        assert!(*cache.get("metric4").unwrap().value());
1112
1113        // Test case 2: No cache hits
1114        let metric5 = "metric5".to_string();
1115        let metric6 = "metric6".to_string();
1116        let names2 = vec![&metric5, &metric6];
1117        let result = fast_legacy_check(&cache, &names2);
1118        assert!(result.is_ok());
1119        assert_eq!(result.unwrap(), None); // should return None as no cache hits
1120
1121        // Test case 3: Incompatible modes should return error
1122        let cache_incompatible = DashMap::new();
1123        cache_incompatible.insert("metric1".to_string(), true); // legacy
1124        cache_incompatible.insert("metric2".to_string(), false); // prom
1125        let metric1_test = "metric1".to_string();
1126        let metric2_test = "metric2".to_string();
1127        let names3 = vec![&metric1_test, &metric2_test];
1128        let result = fast_legacy_check(&cache_incompatible, &names3);
1129        assert!(result.is_err()); // should error due to incompatible modes
1130
1131        // Test case 4: Intensive concurrent access to test deadlock prevention
1132        // This test specifically targets the scenario where multiple threads
1133        // access the same cache entries simultaneously
1134        let cache_concurrent = Arc::new(DashMap::new());
1135        cache_concurrent.insert("shared_metric".to_string(), true);
1136
1137        let num_threads = 8;
1138        let operations_per_thread = 100;
1139        let barrier = Arc::new(Barrier::new(num_threads));
1140        let success_flag = Arc::new(AtomicBool::new(true));
1141
1142        let handles: Vec<_> = (0..num_threads)
1143            .map(|thread_id| {
1144                let cache_clone = Arc::clone(&cache_concurrent);
1145                let barrier_clone = Arc::clone(&barrier);
1146                let success_flag_clone = Arc::clone(&success_flag);
1147
1148                thread::spawn(move || {
1149                    // Wait for all threads to be ready
1150                    barrier_clone.wait();
1151
1152                    let start_time = Instant::now();
1153                    for i in 0..operations_per_thread {
1154                        // Each operation references existing cache entry and adds new ones
1155                        let shared_metric = "shared_metric".to_string();
1156                        let new_metric = format!("thread_{}_metric_{}", thread_id, i);
1157                        let names = vec![&shared_metric, &new_metric];
1158
1159                        match fast_legacy_check(&cache_clone, &names) {
1160                            Ok(_) => {}
1161                            Err(_) => {
1162                                success_flag_clone.store(false, Ordering::Relaxed);
1163                                return;
1164                            }
1165                        }
1166
1167                        // If the test takes too long, it likely means deadlock
1168                        if start_time.elapsed() > Duration::from_secs(10) {
1169                            success_flag_clone.store(false, Ordering::Relaxed);
1170                            return;
1171                        }
1172                    }
1173                })
1174            })
1175            .collect();
1176
1177        // Join all threads with timeout
1178        let start_time = Instant::now();
1179        for (i, handle) in handles.into_iter().enumerate() {
1180            let join_result = handle.join();
1181
1182            // Check if we're taking too long (potential deadlock)
1183            if start_time.elapsed() > Duration::from_secs(30) {
1184                panic!("Test timed out - possible deadlock detected!");
1185            }
1186
1187            if join_result.is_err() {
1188                panic!("Thread {} panicked during execution", i);
1189            }
1190        }
1191
1192        // Verify all operations completed successfully
1193        assert!(
1194            success_flag.load(Ordering::Relaxed),
1195            "Some operations failed"
1196        );
1197
1198        // Verify that many new entries were added (proving operations completed)
1199        let final_count = cache_concurrent.len();
1200        assert!(
1201            final_count > 1 + num_threads * operations_per_thread / 2,
1202            "Expected more cache entries, got {}",
1203            final_count
1204        );
1205    }
1206
1207    #[test]
1208    fn test_exec_validation() {
1209        let query_ctx = QueryContext::arc();
1210        let plugins: Plugins = Plugins::new();
1211        plugins.insert(QueryOptions {
1212            disallow_cross_catalog_query: true,
1213        });
1214
1215        let sql = r#"
1216        SELECT * FROM demo;
1217        EXPLAIN SELECT * FROM demo;
1218        CREATE DATABASE test_database;
1219        SHOW DATABASES;
1220        "#;
1221        let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1222        assert_eq!(stmts.len(), 4);
1223        for stmt in stmts {
1224            let re = check_permission(plugins.clone(), &stmt, &query_ctx);
1225            re.unwrap();
1226        }
1227
1228        let sql = r#"
1229        SHOW CREATE TABLE demo;
1230        ALTER TABLE demo ADD COLUMN new_col INT;
1231        "#;
1232        let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1233        assert_eq!(stmts.len(), 2);
1234        for stmt in stmts {
1235            let re = check_permission(plugins.clone(), &stmt, &query_ctx);
1236            re.unwrap();
1237        }
1238
1239        fn replace_test(template_sql: &str, plugins: Plugins, query_ctx: &QueryContextRef) {
1240            // test right
1241            let right = vec![("", ""), ("", "public."), ("greptime.", "public.")];
1242            for (catalog, schema) in right {
1243                let sql = do_fmt(template_sql, catalog, schema);
1244                do_test(&sql, plugins.clone(), query_ctx, true);
1245            }
1246
1247            let wrong = vec![
1248                ("wrongcatalog.", "public."),
1249                ("wrongcatalog.", "wrongschema."),
1250            ];
1251            for (catalog, schema) in wrong {
1252                let sql = do_fmt(template_sql, catalog, schema);
1253                do_test(&sql, plugins.clone(), query_ctx, false);
1254            }
1255        }
1256
1257        fn do_fmt(template: &str, catalog: &str, schema: &str) -> String {
1258            let vars = HashMap::from([
1259                ("catalog".to_string(), catalog),
1260                ("schema".to_string(), schema),
1261            ]);
1262            template.format(&vars).unwrap()
1263        }
1264
1265        fn do_test(sql: &str, plugins: Plugins, query_ctx: &QueryContextRef, is_ok: bool) {
1266            let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
1267            let re = check_permission(plugins, stmt, query_ctx);
1268            if is_ok {
1269                re.unwrap();
1270            } else {
1271                assert!(re.is_err());
1272            }
1273        }
1274
1275        // test insert
1276        let sql = "INSERT INTO {catalog}{schema}monitor(host) VALUES ('host1');";
1277        replace_test(sql, plugins.clone(), &query_ctx);
1278
1279        // test create table
1280        let sql = r#"CREATE TABLE {catalog}{schema}demo(
1281                            host STRING,
1282                            ts TIMESTAMP,
1283                            TIME INDEX (ts),
1284                            PRIMARY KEY(host)
1285                        ) engine=mito;"#;
1286        replace_test(sql, plugins.clone(), &query_ctx);
1287
1288        // test drop table
1289        let sql = "DROP TABLE {catalog}{schema}demo;";
1290        replace_test(sql, plugins.clone(), &query_ctx);
1291
1292        // test show tables
1293        let sql = "SHOW TABLES FROM public";
1294        let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1295        check_permission(plugins.clone(), &stmt[0], &query_ctx).unwrap();
1296
1297        let sql = "SHOW TABLES FROM private";
1298        let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1299        let re = check_permission(plugins.clone(), &stmt[0], &query_ctx);
1300        assert!(re.is_ok());
1301
1302        // test describe table
1303        let sql = "DESC TABLE {catalog}{schema}demo;";
1304        replace_test(sql, plugins.clone(), &query_ctx);
1305
1306        let comment_flow_cases = [
1307            ("COMMENT ON FLOW my_flow IS 'comment';", true),
1308            ("COMMENT ON FLOW greptime.my_flow IS 'comment';", true),
1309            ("COMMENT ON FLOW wrongcatalog.my_flow IS 'comment';", false),
1310        ];
1311        for (sql, is_ok) in comment_flow_cases {
1312            let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
1313            let result = check_permission(plugins.clone(), stmt, &query_ctx);
1314            assert_eq!(result.is_ok(), is_ok);
1315        }
1316
1317        let show_flow_cases = [
1318            ("SHOW CREATE FLOW my_flow;", true),
1319            ("SHOW CREATE FLOW greptime.my_flow;", true),
1320            ("SHOW CREATE FLOW wrongcatalog.my_flow;", false),
1321        ];
1322        for (sql, is_ok) in show_flow_cases {
1323            let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
1324            let result = check_permission(plugins.clone(), stmt, &query_ctx);
1325            assert_eq!(result.is_ok(), is_ok);
1326        }
1327    }
1328}