1pub mod builder;
16mod dashboard;
17mod grpc;
18mod influxdb;
19mod jaeger;
20mod log_handler;
21mod logs;
22mod opentsdb;
23mod otlp;
24pub mod prom_store;
25mod promql;
26mod region_query;
27pub mod standalone;
28
29use std::pin::Pin;
30use std::sync::atomic::AtomicBool;
31use std::sync::{Arc, atomic};
32use std::time::{Duration, SystemTime};
33
34use async_stream::stream;
35use async_trait::async_trait;
36use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
37use catalog::CatalogManagerRef;
38use catalog::process_manager::{
39 ProcessManagerRef, QueryStatement as CatalogQueryStatement, SlowQueryTimer,
40};
41use client::OutputData;
42use common_base::Plugins;
43use common_base::cancellation::CancellableFuture;
44use common_error::ext::{BoxedError, ErrorExt};
45use common_event_recorder::EventRecorderRef;
46use common_meta::cache_invalidator::CacheInvalidatorRef;
47use common_meta::key::TableMetadataManagerRef;
48use common_meta::key::table_name::TableNameKey;
49use common_meta::node_manager::NodeManagerRef;
50use common_meta::procedure_executor::ProcedureExecutorRef;
51use common_query::Output;
52use common_recordbatch::RecordBatchStreamWrapper;
53use common_recordbatch::error::StreamTimeoutSnafu;
54use common_telemetry::logging::SlowQueryOptions;
55use common_telemetry::{debug, error, tracing};
56use dashmap::DashMap;
57use datafusion_expr::LogicalPlan;
58use futures::{Stream, StreamExt};
59use lazy_static::lazy_static;
60use operator::delete::DeleterRef;
61use operator::insert::InserterRef;
62use operator::statement::{StatementExecutor, StatementExecutorRef};
63use partition::manager::PartitionRuleManagerRef;
64use pipeline::pipeline_operator::PipelineOperator;
65use prometheus::HistogramTimer;
66use promql_parser::label::Matcher;
67use query::QueryEngineRef;
68use query::metrics::OnDone;
69use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
70use query::query_engine::DescribeResult;
71use query::query_engine::options::{QueryOptions, validate_catalog_and_schema};
72use servers::error::{
73 self as server_error, AuthSnafu, CommonMetaSnafu, ExecuteQuerySnafu,
74 OtlpMetricModeIncompatibleSnafu, ParsePromQLSnafu, UnexpectedResultSnafu,
75};
76use servers::interceptor::{
77 PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef,
78};
79use servers::otlp::metrics::legacy_normalize_otlp_name;
80use servers::prometheus_handler::PrometheusHandler;
81use servers::query_handler::sql::SqlQueryHandler;
82use session::context::{Channel, QueryContextRef};
83use session::table_name::table_idents_to_full_name;
84use snafu::prelude::*;
85use sql::ast::ObjectNamePartExt;
86use sql::dialect::Dialect;
87use sql::parser::{ParseOptions, ParserContext};
88use sql::statements::comment::CommentObject;
89use sql::statements::copy::{CopyDatabase, CopyTable};
90use sql::statements::statement::Statement;
91use sql::statements::tql::Tql;
92use sqlparser::ast::ObjectName;
93pub use standalone::StandaloneDatanodeManager;
94use table::requests::{OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM};
95use tracing::Span;
96
97use crate::error::{
98 self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, InvalidSqlSnafu,
99 ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu,
100 StatementTimeoutSnafu, TableOperationSnafu,
101};
102use crate::stream_wrapper::CancellableStreamWrapper;
103
104lazy_static! {
105 static ref OTLP_LEGACY_DEFAULT_VALUE: String = "legacy".to_string();
106}
107
108#[derive(Clone)]
112pub struct Instance {
113 frontend_peer_addr: String,
114 catalog_manager: CatalogManagerRef,
115 pipeline_operator: Arc<PipelineOperator>,
116 statement_executor: Arc<StatementExecutor>,
117 query_engine: QueryEngineRef,
118 plugins: Plugins,
119 inserter: InserterRef,
120 deleter: DeleterRef,
121 table_metadata_manager: TableMetadataManagerRef,
122 event_recorder: Option<EventRecorderRef>,
123 process_manager: ProcessManagerRef,
124 slow_query_options: SlowQueryOptions,
125 suspend: Arc<AtomicBool>,
126
127 otlp_metrics_table_legacy_cache: DashMap<String, DashMap<String, bool>>,
132}
133
134impl Instance {
135 pub fn frontend_peer_addr(&self) -> &str {
136 &self.frontend_peer_addr
137 }
138
139 pub fn catalog_manager(&self) -> &CatalogManagerRef {
140 &self.catalog_manager
141 }
142
143 pub fn query_engine(&self) -> &QueryEngineRef {
144 &self.query_engine
145 }
146
147 pub fn plugins(&self) -> &Plugins {
148 &self.plugins
149 }
150
151 pub fn statement_executor(&self) -> &StatementExecutorRef {
152 &self.statement_executor
153 }
154
155 pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
156 &self.table_metadata_manager
157 }
158
159 pub fn inserter(&self) -> &InserterRef {
160 &self.inserter
161 }
162
163 pub fn process_manager(&self) -> &ProcessManagerRef {
164 &self.process_manager
165 }
166
167 pub fn node_manager(&self) -> &NodeManagerRef {
168 self.inserter.node_manager()
169 }
170
171 pub fn partition_manager(&self) -> &PartitionRuleManagerRef {
172 self.inserter.partition_manager()
173 }
174
175 pub fn cache_invalidator(&self) -> &CacheInvalidatorRef {
176 self.statement_executor.cache_invalidator()
177 }
178
179 pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
180 self.statement_executor.procedure_executor()
181 }
182
183 pub fn suspend_state(&self) -> Arc<AtomicBool> {
184 self.suspend.clone()
185 }
186
187 pub(crate) fn is_suspended(&self) -> bool {
188 self.suspend.load(atomic::Ordering::Relaxed)
189 }
190}
191
192fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result<Vec<Statement>> {
193 ParserContext::create_with_dialect(sql, dialect, ParseOptions::default()).context(ParseSqlSnafu)
194}
195
196impl Instance {
197 async fn query_statement(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
198 check_permission(self.plugins.clone(), &stmt, &query_ctx)?;
199
200 let query_interceptor = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
201 let query_interceptor = query_interceptor.as_ref();
202
203 if stmt.is_readonly() {
204 let slow_query_timer = self
205 .slow_query_options
206 .enable
207 .then(|| self.event_recorder.clone())
208 .flatten()
209 .map(|event_recorder| {
210 SlowQueryTimer::new(
211 CatalogQueryStatement::Sql(stmt.clone()),
212 self.slow_query_options.threshold,
213 self.slow_query_options.sample_ratio,
214 self.slow_query_options.record_type,
215 event_recorder,
216 )
217 });
218
219 let ticket = self.process_manager.register_query(
220 query_ctx.current_catalog().to_string(),
221 vec![query_ctx.current_schema()],
222 stmt.to_string(),
223 query_ctx.conn_info().to_string(),
224 Some(query_ctx.process_id()),
225 slow_query_timer,
226 );
227
228 let query_fut = self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor);
229
230 CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
231 .await
232 .map_err(|_| error::CancelledSnafu.build())?
233 .map(|output| {
234 let Output { meta, data } = output;
235
236 let data = match data {
237 OutputData::Stream(stream) => OutputData::Stream(Box::pin(
238 CancellableStreamWrapper::new(stream, ticket),
239 )),
240 other => other,
241 };
242 Output { data, meta }
243 })
244 } else {
245 self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor)
246 .await
247 }
248 }
249
250 async fn exec_statement_with_timeout(
251 &self,
252 stmt: Statement,
253 query_ctx: QueryContextRef,
254 query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
255 ) -> Result<Output> {
256 let timeout = derive_timeout(&stmt, &query_ctx);
257 match timeout {
258 Some(timeout) => {
259 let start = tokio::time::Instant::now();
260 let output = tokio::time::timeout(
261 timeout,
262 self.exec_statement(stmt, query_ctx, query_interceptor),
263 )
264 .await
265 .map_err(|_| StatementTimeoutSnafu.build())??;
266 let remaining_timeout = timeout.checked_sub(start.elapsed()).unwrap_or_default();
268 attach_timeout(output, remaining_timeout)
269 }
270 None => {
271 self.exec_statement(stmt, query_ctx, query_interceptor)
272 .await
273 }
274 }
275 }
276
277 async fn exec_statement(
278 &self,
279 stmt: Statement,
280 query_ctx: QueryContextRef,
281 query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
282 ) -> Result<Output> {
283 match stmt {
284 Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
285 if let Statement::Explain(explain) = &stmt
287 && let Some(format) = explain.format()
288 {
289 query_ctx.set_explain_format(format.to_string());
290 }
291
292 self.plan_and_exec_sql(stmt, &query_ctx, query_interceptor)
293 .await
294 }
295 Statement::Tql(tql) => {
296 self.plan_and_exec_tql(&query_ctx, query_interceptor, tql)
297 .await
298 }
299 _ => {
300 query_interceptor.pre_execute(&stmt, None, query_ctx.clone())?;
301 self.statement_executor
302 .execute_sql(stmt, query_ctx)
303 .await
304 .context(TableOperationSnafu)
305 }
306 }
307 }
308
309 async fn plan_and_exec_sql(
310 &self,
311 stmt: Statement,
312 query_ctx: &QueryContextRef,
313 query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
314 ) -> Result<Output> {
315 let stmt = QueryStatement::Sql(stmt);
316 let plan = self
317 .statement_executor
318 .plan(&stmt, query_ctx.clone())
319 .await?;
320 let QueryStatement::Sql(stmt) = stmt else {
321 unreachable!()
322 };
323 query_interceptor.pre_execute(&stmt, Some(&plan), query_ctx.clone())?;
324
325 self.statement_executor
326 .exec_plan(plan, query_ctx.clone())
327 .await
328 .context(TableOperationSnafu)
329 }
330
331 async fn plan_and_exec_tql(
332 &self,
333 query_ctx: &QueryContextRef,
334 query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
335 tql: Tql,
336 ) -> Result<Output> {
337 let plan = self
338 .statement_executor
339 .plan_tql(tql.clone(), query_ctx)
340 .await?;
341 query_interceptor.pre_execute(&Statement::Tql(tql), Some(&plan), query_ctx.clone())?;
342 self.statement_executor
343 .exec_plan(plan, query_ctx.clone())
344 .await
345 .context(TableOperationSnafu)
346 }
347
348 async fn check_otlp_legacy(
349 &self,
350 names: &[&String],
351 ctx: QueryContextRef,
352 ) -> server_error::Result<bool> {
353 let db_string = ctx.get_db_string();
354 let cache = self
356 .otlp_metrics_table_legacy_cache
357 .entry(db_string.clone())
358 .or_default();
359 if let Some(flag) = fast_legacy_check(&cache, names)? {
360 return Ok(flag);
361 }
362 drop(cache);
364
365 let catalog = ctx.current_catalog();
366 let schema = ctx.current_schema();
367
368 let normalized_names = names
370 .iter()
371 .map(|n| legacy_normalize_otlp_name(n))
372 .collect::<Vec<_>>();
373 let table_names = normalized_names
374 .iter()
375 .map(|n| TableNameKey::new(catalog, &schema, n))
376 .collect::<Vec<_>>();
377 let table_values = self
378 .table_metadata_manager()
379 .table_name_manager()
380 .batch_get(table_names)
381 .await
382 .context(CommonMetaSnafu)?;
383 let table_ids = table_values
384 .into_iter()
385 .filter_map(|v| v.map(|vi| vi.table_id()))
386 .collect::<Vec<_>>();
387
388 if table_ids.is_empty() {
390 let cache = self
391 .otlp_metrics_table_legacy_cache
392 .entry(db_string)
393 .or_default();
394 names.iter().for_each(|name| {
395 cache.insert((*name).clone(), false);
396 });
397 return Ok(false);
398 }
399
400 let table_infos = self
402 .table_metadata_manager()
403 .table_info_manager()
404 .batch_get(&table_ids)
405 .await
406 .context(CommonMetaSnafu)?;
407 let options = table_infos
408 .values()
409 .map(|info| {
410 info.table_info
411 .meta
412 .options
413 .extra_options
414 .get(OTLP_METRIC_COMPAT_KEY)
415 .unwrap_or(&OTLP_LEGACY_DEFAULT_VALUE)
416 })
417 .collect::<Vec<_>>();
418 let cache = self
419 .otlp_metrics_table_legacy_cache
420 .entry(db_string)
421 .or_default();
422 if !options.is_empty() {
423 let has_prom = options.iter().any(|opt| *opt == OTLP_METRIC_COMPAT_PROM);
425 let has_legacy = options
426 .iter()
427 .any(|opt| *opt == OTLP_LEGACY_DEFAULT_VALUE.as_str());
428 ensure!(!(has_prom && has_legacy), OtlpMetricModeIncompatibleSnafu);
429 let flag = has_legacy;
430 names.iter().for_each(|name| {
431 cache.insert((*name).clone(), flag);
432 });
433 Ok(flag)
434 } else {
435 names.iter().for_each(|name| {
437 cache.insert((*name).clone(), false);
438 });
439 Ok(false)
440 }
441 }
442}
443
444fn fast_legacy_check(
445 cache: &DashMap<String, bool>,
446 names: &[&String],
447) -> server_error::Result<Option<bool>> {
448 let hit_cache = names
449 .iter()
450 .filter_map(|name| cache.get(*name))
451 .collect::<Vec<_>>();
452 if !hit_cache.is_empty() {
453 let hit_legacy = hit_cache.iter().any(|en| *en.value());
454 let hit_prom = hit_cache.iter().any(|en| !*en.value());
455
456 ensure!(!(hit_legacy && hit_prom), OtlpMetricModeIncompatibleSnafu);
460
461 let flag = hit_legacy;
462 drop(hit_cache);
464
465 names.iter().for_each(|name| {
467 if !cache.contains_key(*name) {
468 cache.insert((*name).clone(), flag);
469 }
470 });
471 Ok(Some(flag))
472 } else {
473 Ok(None)
474 }
475}
476
477fn derive_timeout(stmt: &Statement, query_ctx: &QueryContextRef) -> Option<Duration> {
480 let query_timeout = query_ctx.query_timeout()?;
481 if query_timeout.is_zero() {
482 return None;
483 }
484 match query_ctx.channel() {
485 Channel::Mysql if stmt.is_readonly() => Some(query_timeout),
486 Channel::Postgres => Some(query_timeout),
487 _ => None,
488 }
489}
490
491fn derive_timeout_for_plan(plan: &LogicalPlan, query_ctx: &QueryContextRef) -> Option<Duration> {
493 let query_timeout = query_ctx.query_timeout()?;
494 if query_timeout.is_zero() {
495 return None;
496 }
497 match query_ctx.channel() {
498 Channel::Mysql if is_readonly_plan(plan) => Some(query_timeout),
499 Channel::Postgres => Some(query_timeout),
500 _ => None,
501 }
502}
503
504fn attach_timeout(output: Output, mut timeout: Duration) -> Result<Output> {
505 if timeout.is_zero() {
506 return StatementTimeoutSnafu.fail();
507 }
508
509 let output = match output.data {
510 OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
511 OutputData::Stream(mut stream) => {
512 let schema = stream.schema();
513 let s = Box::pin(stream! {
514 let mut start = tokio::time::Instant::now();
515 while let Some(item) = tokio::time::timeout(timeout, stream.next()).await.map_err(|_| StreamTimeoutSnafu.build())? {
516 yield item;
517
518 let now = tokio::time::Instant::now();
519 timeout = timeout.checked_sub(now - start).unwrap_or(Duration::ZERO);
520 start = now;
521 if timeout.is_zero() {
523 StreamTimeoutSnafu.fail()?;
524 }
525 }
526 }) as Pin<Box<dyn Stream<Item = _> + Send>>;
527 let stream = RecordBatchStreamWrapper {
528 schema,
529 stream: s,
530 output_ordering: None,
531 metrics: Default::default(),
532 span: Span::current(),
533 };
534 Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
535 }
536 };
537
538 Ok(output)
539}
540
541impl Instance {
542 #[tracing::instrument(skip_all, name = "SqlQueryHandler::do_query")]
543 async fn do_query_inner(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
544 if self.is_suspended() {
545 return vec![error::SuspendedSnafu {}.fail()];
546 }
547
548 let query_interceptor_opt = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
549 let query_interceptor = query_interceptor_opt.as_ref();
550 let query = match query_interceptor.pre_parsing(query, query_ctx.clone()) {
551 Ok(q) => q,
552 Err(e) => return vec![Err(e)],
553 };
554
555 let checker_ref = self.plugins.get::<PermissionCheckerRef>();
556 let checker = checker_ref.as_ref();
557
558 match parse_stmt(query.as_ref(), query_ctx.sql_dialect())
559 .and_then(|stmts| query_interceptor.post_parsing(stmts, query_ctx.clone()))
560 {
561 Ok(stmts) => {
562 if stmts.is_empty() {
563 return vec![
564 InvalidSqlSnafu {
565 err_msg: "empty statements",
566 }
567 .fail(),
568 ];
569 }
570
571 let mut results = Vec::with_capacity(stmts.len());
572 for stmt in stmts {
573 if let Err(e) = checker
574 .check_permission(
575 query_ctx.current_user(),
576 PermissionReq::SqlStatement(&stmt),
577 )
578 .context(PermissionSnafu)
579 {
580 results.push(Err(e));
581 break;
582 }
583
584 match self.query_statement(stmt.clone(), query_ctx.clone()).await {
585 Ok(output) => {
586 let output_result =
587 query_interceptor.post_execute(output, query_ctx.clone());
588 results.push(output_result);
589 }
590 Err(e) => {
591 if e.status_code().should_log_error() {
592 error!(e; "Failed to execute query: {stmt}");
593 } else {
594 debug!("Failed to execute query: {stmt}, {e}");
595 }
596 results.push(Err(e));
597 break;
598 }
599 }
600 }
601 results
602 }
603 Err(e) => {
604 vec![Err(e)]
605 }
606 }
607 }
608
609 async fn exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
610 self.query_engine
611 .execute(plan, query_ctx)
612 .await
613 .context(ExecLogicalPlanSnafu)
614 }
615
616 async fn exec_plan_with_timeout(
617 &self,
618 plan: LogicalPlan,
619 query_ctx: QueryContextRef,
620 ) -> Result<Output> {
621 let timeout = derive_timeout_for_plan(&plan, &query_ctx);
622 match timeout {
623 Some(timeout) => {
624 let start = tokio::time::Instant::now();
625 let output = tokio::time::timeout(timeout, self.exec_plan(plan, query_ctx))
626 .await
627 .map_err(|_| StatementTimeoutSnafu.build())??;
628 let remaining_timeout = timeout.checked_sub(start.elapsed()).unwrap_or_default();
629 attach_timeout(output, remaining_timeout)
630 }
631 None => self.exec_plan(plan, query_ctx).await,
632 }
633 }
634
635 async fn do_exec_plan_inner(
636 &self,
637 plan: LogicalPlan,
638 stmt: Option<Statement>,
639 query_ctx: QueryContextRef,
640 ) -> Result<Output> {
641 ensure!(!self.is_suspended(), error::SuspendedSnafu);
642
643 let query_interceptor_opt = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
644 let query_interceptor = query_interceptor_opt.as_ref();
645
646 if let Some(ref s) = stmt {
647 query_interceptor.pre_execute(s, Some(&plan), query_ctx.clone())?;
648 }
649
650 let query = stmt
651 .as_ref()
652 .map(|s| s.to_string())
653 .unwrap_or_else(|| plan.display_indent().to_string());
654
655 let result = if is_readonly_plan(&plan) {
656 let slow_query_timer = self
657 .slow_query_options
658 .enable
659 .then(|| self.event_recorder.clone())
660 .flatten()
661 .map(|event_recorder| {
662 SlowQueryTimer::new(
663 CatalogQueryStatement::Plan(query.clone()),
664 self.slow_query_options.threshold,
665 self.slow_query_options.sample_ratio,
666 self.slow_query_options.record_type,
667 event_recorder,
668 )
669 });
670
671 let ticket = self.process_manager.register_query(
672 query_ctx.current_catalog().to_string(),
673 vec![query_ctx.current_schema()],
674 query,
675 query_ctx.conn_info().to_string(),
676 Some(query_ctx.process_id()),
677 slow_query_timer,
678 );
679
680 let query_fut = self.exec_plan_with_timeout(plan, query_ctx.clone());
681
682 CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
683 .await
684 .map_err(|_| error::CancelledSnafu.build())?
685 .map(|output| {
686 let Output { meta, data } = output;
687
688 let data = match data {
689 OutputData::Stream(stream) => OutputData::Stream(Box::pin(
690 CancellableStreamWrapper::new(stream, ticket),
691 )),
692 other => other,
693 };
694 Output { data, meta }
695 })
696 } else {
697 self.exec_plan_with_timeout(plan, query_ctx.clone()).await
698 };
699
700 result.and_then(|output| query_interceptor.post_execute(output, query_ctx))
701 }
702
703 #[tracing::instrument(skip_all, name = "SqlQueryHandler::do_promql_query")]
704 async fn do_promql_query_inner(
705 &self,
706 query: &PromQuery,
707 query_ctx: QueryContextRef,
708 ) -> Vec<Result<Output>> {
709 if self.is_suspended() {
710 return vec![error::SuspendedSnafu {}.fail()];
711 }
712
713 let result = PrometheusHandler::do_query(self, query, query_ctx)
715 .await
716 .with_context(|_| ExecutePromqlSnafu {
717 query: format!("{query:?}"),
718 });
719 vec![result]
720 }
721
722 async fn do_describe_inner(
723 &self,
724 stmt: Statement,
725 query_ctx: QueryContextRef,
726 ) -> Result<Option<DescribeResult>> {
727 ensure!(!self.is_suspended(), error::SuspendedSnafu);
728
729 let is_inner_plannable = |s: &Statement| {
733 matches!(
734 s,
735 Statement::Insert(_) | Statement::Query(_) | Statement::Delete(_)
736 )
737 };
738 let plannable = is_inner_plannable(&stmt)
739 || matches!(&stmt, Statement::Explain(explain) if is_inner_plannable(explain.statement.as_ref()));
740
741 if plannable {
742 self.plugins
743 .get::<PermissionCheckerRef>()
744 .as_ref()
745 .check_permission(query_ctx.current_user(), PermissionReq::SqlStatement(&stmt))
746 .context(PermissionSnafu)?;
747
748 let plan = self
749 .query_engine
750 .planner()
751 .plan(&QueryStatement::Sql(stmt), query_ctx.clone())
752 .await
753 .context(PlanStatementSnafu)?;
754 self.query_engine
755 .describe(plan, query_ctx)
756 .await
757 .map(Some)
758 .context(error::DescribeStatementSnafu)
759 } else {
760 Ok(None)
761 }
762 }
763
764 async fn is_valid_schema_inner(&self, catalog: &str, schema: &str) -> Result<bool> {
765 self.catalog_manager
766 .schema_exists(catalog, schema, None)
767 .await
768 .context(error::CatalogSnafu)
769 }
770}
771
772#[async_trait]
773impl SqlQueryHandler for Instance {
774 async fn do_query(
775 &self,
776 query: &str,
777 query_ctx: QueryContextRef,
778 ) -> Vec<server_error::Result<Output>> {
779 self.do_query_inner(query, query_ctx)
780 .await
781 .into_iter()
782 .map(|result| result.map_err(BoxedError::new).context(ExecuteQuerySnafu))
783 .collect()
784 }
785
786 async fn do_exec_plan(
787 &self,
788 plan: LogicalPlan,
789 stmt: Option<Statement>,
790 query_ctx: QueryContextRef,
791 ) -> server_error::Result<Output> {
792 self.do_exec_plan_inner(plan, stmt, query_ctx)
793 .await
794 .map_err(BoxedError::new)
795 .context(server_error::ExecutePlanSnafu)
796 }
797
798 async fn do_promql_query(
799 &self,
800 query: &PromQuery,
801 query_ctx: QueryContextRef,
802 ) -> Vec<server_error::Result<Output>> {
803 self.do_promql_query_inner(query, query_ctx)
804 .await
805 .into_iter()
806 .map(|result| result.map_err(BoxedError::new).context(ExecuteQuerySnafu))
807 .collect()
808 }
809
810 async fn do_describe(
811 &self,
812 stmt: Statement,
813 query_ctx: QueryContextRef,
814 ) -> server_error::Result<Option<DescribeResult>> {
815 self.do_describe_inner(stmt, query_ctx)
816 .await
817 .map_err(BoxedError::new)
818 .context(server_error::DescribeStatementSnafu)
819 }
820
821 async fn is_valid_schema(&self, catalog: &str, schema: &str) -> server_error::Result<bool> {
822 self.is_valid_schema_inner(catalog, schema)
823 .await
824 .map_err(BoxedError::new)
825 .context(server_error::CheckDatabaseValiditySnafu)
826 }
827}
828
829pub fn attach_timer(output: Output, timer: HistogramTimer) -> Output {
831 match output.data {
832 OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
833 OutputData::Stream(stream) => {
834 let stream = OnDone::new(stream, move || {
835 timer.observe_duration();
836 });
837 Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
838 }
839 }
840}
841
842#[async_trait]
843impl PrometheusHandler for Instance {
844 #[tracing::instrument(skip_all)]
845 async fn do_query(
846 &self,
847 query: &PromQuery,
848 query_ctx: QueryContextRef,
849 ) -> server_error::Result<Output> {
850 let interceptor = self
851 .plugins
852 .get::<PromQueryInterceptorRef<server_error::Error>>();
853
854 self.plugins
855 .get::<PermissionCheckerRef>()
856 .as_ref()
857 .check_permission(query_ctx.current_user(), PermissionReq::PromQuery)
858 .context(AuthSnafu)?;
859
860 let stmt = QueryLanguageParser::parse_promql(query, &query_ctx).with_context(|_| {
861 ParsePromQLSnafu {
862 query: query.clone(),
863 }
864 })?;
865
866 let plan = self
867 .statement_executor
868 .plan(&stmt, query_ctx.clone())
869 .await
870 .map_err(BoxedError::new)
871 .context(ExecuteQuerySnafu)?;
872
873 interceptor.pre_execute(query, Some(&plan), query_ctx.clone())?;
874
875 let query_statement = if let QueryStatement::Promql(eval_stmt, alias) = stmt {
877 CatalogQueryStatement::Promql(eval_stmt, alias)
878 } else {
879 return UnexpectedResultSnafu {
881 reason: "The query should always be promql.".to_string(),
882 }
883 .fail();
884 };
885 let query = query_statement.to_string();
886
887 let slow_query_timer = self
888 .slow_query_options
889 .enable
890 .then(|| self.event_recorder.clone())
891 .flatten()
892 .map(|event_recorder| {
893 SlowQueryTimer::new(
894 query_statement,
895 self.slow_query_options.threshold,
896 self.slow_query_options.sample_ratio,
897 self.slow_query_options.record_type,
898 event_recorder,
899 )
900 });
901
902 let ticket = self.process_manager.register_query(
903 query_ctx.current_catalog().to_string(),
904 vec![query_ctx.current_schema()],
905 query,
906 query_ctx.conn_info().to_string(),
907 Some(query_ctx.process_id()),
908 slow_query_timer,
909 );
910
911 let query_fut = self.statement_executor.exec_plan(plan, query_ctx.clone());
912
913 let output = CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
914 .await
915 .map_err(|_| servers::error::CancelledSnafu.build())?
916 .map(|output| {
917 let Output { meta, data } = output;
918 let data = match data {
919 OutputData::Stream(stream) => {
920 OutputData::Stream(Box::pin(CancellableStreamWrapper::new(stream, ticket)))
921 }
922 other => other,
923 };
924 Output { data, meta }
925 })
926 .map_err(BoxedError::new)
927 .context(ExecuteQuerySnafu)?;
928
929 Ok(interceptor.post_execute(output, query_ctx)?)
930 }
931
932 async fn query_metric_names(
933 &self,
934 matchers: Vec<Matcher>,
935 ctx: &QueryContextRef,
936 ) -> server_error::Result<Vec<String>> {
937 self.handle_query_metric_names(matchers, ctx)
938 .await
939 .map_err(BoxedError::new)
940 .context(ExecuteQuerySnafu)
941 }
942
943 async fn query_label_values(
944 &self,
945 metric: String,
946 label_name: String,
947 matchers: Vec<Matcher>,
948 start: SystemTime,
949 end: SystemTime,
950 ctx: &QueryContextRef,
951 ) -> server_error::Result<Vec<String>> {
952 self.handle_query_label_values(metric, label_name, matchers, start, end, ctx)
953 .await
954 .map_err(BoxedError::new)
955 .context(ExecuteQuerySnafu)
956 }
957
958 fn catalog_manager(&self) -> CatalogManagerRef {
959 self.catalog_manager.clone()
960 }
961}
962
963macro_rules! validate_db_permission {
965 ($stmt: expr, $query_ctx: expr) => {
966 if let Some(database) = &$stmt.database {
967 validate_catalog_and_schema($query_ctx.current_catalog(), database, $query_ctx)
968 .map_err(BoxedError::new)
969 .context(SqlExecInterceptedSnafu)?;
970 }
971 };
972}
973
974pub fn check_permission(
975 plugins: Plugins,
976 stmt: &Statement,
977 query_ctx: &QueryContextRef,
978) -> Result<()> {
979 let need_validate = plugins
980 .get::<QueryOptions>()
981 .map(|opts| opts.disallow_cross_catalog_query)
982 .unwrap_or_default();
983
984 if !need_validate {
985 return Ok(());
986 }
987
988 match stmt {
989 Statement::Admin(_) => {}
992 Statement::Query(_)
994 | Statement::Explain(_)
995 | Statement::Tql(_)
996 | Statement::Delete(_)
997 | Statement::DeclareCursor(_)
998 | Statement::Copy(sql::statements::copy::Copy::CopyQueryTo(_)) => {}
999 Statement::CreateDatabase(_)
1001 | Statement::ShowDatabases(_)
1002 | Statement::DropDatabase(_)
1003 | Statement::AlterDatabase(_)
1004 | Statement::DropFlow(_)
1005 | Statement::Use(_) => {}
1006 #[cfg(feature = "enterprise")]
1007 Statement::DropTrigger(_) => {}
1008 Statement::ShowCreateDatabase(stmt) => {
1009 validate_database(&stmt.database_name, query_ctx)?;
1010 }
1011 Statement::ShowCreateTable(stmt) => {
1012 validate_param(&stmt.table_name, query_ctx)?;
1013 }
1014 Statement::ShowCreateFlow(stmt) => {
1015 validate_flow(&stmt.flow_name, query_ctx)?;
1016 }
1017 #[cfg(feature = "enterprise")]
1018 Statement::ShowCreateTrigger(stmt) => {
1019 validate_param(&stmt.trigger_name, query_ctx)?;
1020 }
1021 Statement::ShowCreateView(stmt) => {
1022 validate_param(&stmt.view_name, query_ctx)?;
1023 }
1024 Statement::CreateExternalTable(stmt) => {
1025 validate_param(&stmt.name, query_ctx)?;
1026 }
1027 Statement::CreateFlow(stmt) => {
1028 validate_param(&stmt.sink_table_name, query_ctx)?;
1030 }
1031 #[cfg(feature = "enterprise")]
1032 Statement::CreateTrigger(stmt) => {
1033 validate_param(&stmt.trigger_name, query_ctx)?;
1034 }
1035 Statement::CreateView(stmt) => {
1036 validate_param(&stmt.name, query_ctx)?;
1037 }
1038 Statement::AlterTable(stmt) => {
1039 validate_param(stmt.table_name(), query_ctx)?;
1040 }
1041 #[cfg(feature = "enterprise")]
1042 Statement::AlterTrigger(_) => {}
1043 Statement::SetVariables(_) | Statement::ShowVariables(_) => {}
1045 Statement::ShowCharset(_) | Statement::ShowCollation(_) => {}
1047
1048 Statement::Comment(comment) => match &comment.object {
1049 CommentObject::Table(table) => validate_param(table, query_ctx)?,
1050 CommentObject::Column { table, .. } => validate_param(table, query_ctx)?,
1051 CommentObject::Flow(flow) => validate_flow(flow, query_ctx)?,
1052 },
1053
1054 Statement::Insert(insert) => {
1055 let name = insert.table_name().context(ParseSqlSnafu)?;
1056 validate_param(name, query_ctx)?;
1057 }
1058 Statement::CreateTable(stmt) => {
1059 validate_param(&stmt.name, query_ctx)?;
1060 }
1061 Statement::CreateTableLike(stmt) => {
1062 validate_param(&stmt.table_name, query_ctx)?;
1063 validate_param(&stmt.source_name, query_ctx)?;
1064 }
1065 Statement::DropTable(drop_stmt) => {
1066 for table_name in drop_stmt.table_names() {
1067 validate_param(table_name, query_ctx)?;
1068 }
1069 }
1070 Statement::DropView(stmt) => {
1071 validate_param(&stmt.view_name, query_ctx)?;
1072 }
1073 Statement::ShowTables(stmt) => {
1074 validate_db_permission!(stmt, query_ctx);
1075 }
1076 Statement::ShowTableStatus(stmt) => {
1077 validate_db_permission!(stmt, query_ctx);
1078 }
1079 Statement::ShowColumns(stmt) => {
1080 validate_db_permission!(stmt, query_ctx);
1081 }
1082 Statement::ShowIndex(stmt) => {
1083 validate_db_permission!(stmt, query_ctx);
1084 }
1085 Statement::ShowRegion(stmt) => {
1086 validate_db_permission!(stmt, query_ctx);
1087 }
1088 Statement::ShowViews(stmt) => {
1089 validate_db_permission!(stmt, query_ctx);
1090 }
1091 Statement::ShowFlows(stmt) => {
1092 validate_db_permission!(stmt, query_ctx);
1093 }
1094 #[cfg(feature = "enterprise")]
1095 Statement::ShowTriggers(_stmt) => {
1096 }
1099 Statement::ShowStatus(_stmt) => {}
1100 Statement::ShowSearchPath(_stmt) => {}
1101 Statement::DescribeTable(stmt) => {
1102 validate_param(stmt.name(), query_ctx)?;
1103 }
1104 Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => match stmt {
1105 CopyTable::To(copy_table_to) => validate_param(©_table_to.table_name, query_ctx)?,
1106 CopyTable::From(copy_table_from) => {
1107 validate_param(©_table_from.table_name, query_ctx)?
1108 }
1109 },
1110 Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
1111 match copy_database {
1112 CopyDatabase::To(stmt) => validate_database(&stmt.database_name, query_ctx)?,
1113 CopyDatabase::From(stmt) => validate_database(&stmt.database_name, query_ctx)?,
1114 }
1115 }
1116 Statement::TruncateTable(stmt) => {
1117 validate_param(stmt.table_name(), query_ctx)?;
1118 }
1119 Statement::FetchCursor(_) | Statement::CloseCursor(_) => {}
1121 Statement::Kill(_) => {}
1123 Statement::ShowProcesslist(_) => {}
1125 }
1126 Ok(())
1127}
1128
1129fn validate_param(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1130 let (catalog, schema, _) = table_idents_to_full_name(name, query_ctx)
1131 .map_err(BoxedError::new)
1132 .context(ExternalSnafu)?;
1133
1134 validate_catalog_and_schema(&catalog, &schema, query_ctx)
1135 .map_err(BoxedError::new)
1136 .context(SqlExecInterceptedSnafu)
1137}
1138
1139fn validate_flow(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1140 let catalog = match &name.0[..] {
1141 [_flow] => query_ctx.current_catalog().to_string(),
1142 [catalog, _flow] => catalog.to_string_unquoted(),
1143 _ => {
1144 return InvalidSqlSnafu {
1145 err_msg: format!(
1146 "expect flow name to be <catalog>.<flow_name> or <flow_name>, actual: {name}",
1147 ),
1148 }
1149 .fail();
1150 }
1151 };
1152
1153 let schema = query_ctx.current_schema();
1154
1155 validate_catalog_and_schema(&catalog, &schema, query_ctx)
1156 .map_err(BoxedError::new)
1157 .context(SqlExecInterceptedSnafu)
1158}
1159
1160fn validate_database(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1161 let (catalog, schema) = match &name.0[..] {
1162 [schema] => (
1163 query_ctx.current_catalog().to_string(),
1164 schema.to_string_unquoted(),
1165 ),
1166 [catalog, schema] => (catalog.to_string_unquoted(), schema.to_string_unquoted()),
1167 _ => InvalidSqlSnafu {
1168 err_msg: format!(
1169 "expect database name to be <catalog>.<schema> or <schema>, actual: {name}",
1170 ),
1171 }
1172 .fail()?,
1173 };
1174
1175 validate_catalog_and_schema(&catalog, &schema, query_ctx)
1176 .map_err(BoxedError::new)
1177 .context(SqlExecInterceptedSnafu)
1178}
1179
1180fn is_readonly_plan(plan: &LogicalPlan) -> bool {
1181 !matches!(plan, LogicalPlan::Dml(_) | LogicalPlan::Ddl(_))
1182}
1183
1184#[cfg(test)]
1185mod tests {
1186 use std::collections::HashMap;
1187 use std::sync::atomic::{AtomicBool, Ordering};
1188 use std::sync::{Arc, Barrier};
1189 use std::thread;
1190 use std::time::{Duration, Instant};
1191
1192 use common_base::Plugins;
1193 use query::query_engine::options::QueryOptions;
1194 use session::context::QueryContext;
1195 use sql::dialect::GreptimeDbDialect;
1196 use strfmt::Format;
1197
1198 use super::*;
1199
1200 #[test]
1201 fn test_fast_legacy_check_deadlock_prevention() {
1202 let cache = DashMap::new();
1204
1205 cache.insert("metric1".to_string(), true); cache.insert("metric2".to_string(), false); cache.insert("metric3".to_string(), true); let metric1 = "metric1".to_string();
1212 let metric4 = "metric4".to_string();
1213 let names1 = vec![&metric1, &metric4];
1214 let result = fast_legacy_check(&cache, &names1);
1215 assert!(result.is_ok());
1216 assert_eq!(result.unwrap(), Some(true)); assert!(cache.contains_key("metric4"));
1220 assert!(*cache.get("metric4").unwrap().value());
1221
1222 let metric5 = "metric5".to_string();
1224 let metric6 = "metric6".to_string();
1225 let names2 = vec![&metric5, &metric6];
1226 let result = fast_legacy_check(&cache, &names2);
1227 assert!(result.is_ok());
1228 assert_eq!(result.unwrap(), None); let cache_incompatible = DashMap::new();
1232 cache_incompatible.insert("metric1".to_string(), true); cache_incompatible.insert("metric2".to_string(), false); let metric1_test = "metric1".to_string();
1235 let metric2_test = "metric2".to_string();
1236 let names3 = vec![&metric1_test, &metric2_test];
1237 let result = fast_legacy_check(&cache_incompatible, &names3);
1238 assert!(result.is_err()); let cache_concurrent = Arc::new(DashMap::new());
1244 cache_concurrent.insert("shared_metric".to_string(), true);
1245
1246 let num_threads = 8;
1247 let operations_per_thread = 100;
1248 let barrier = Arc::new(Barrier::new(num_threads));
1249 let success_flag = Arc::new(AtomicBool::new(true));
1250
1251 let handles: Vec<_> = (0..num_threads)
1252 .map(|thread_id| {
1253 let cache_clone = Arc::clone(&cache_concurrent);
1254 let barrier_clone = Arc::clone(&barrier);
1255 let success_flag_clone = Arc::clone(&success_flag);
1256
1257 thread::spawn(move || {
1258 barrier_clone.wait();
1260
1261 let start_time = Instant::now();
1262 for i in 0..operations_per_thread {
1263 let shared_metric = "shared_metric".to_string();
1265 let new_metric = format!("thread_{}_metric_{}", thread_id, i);
1266 let names = vec![&shared_metric, &new_metric];
1267
1268 match fast_legacy_check(&cache_clone, &names) {
1269 Ok(_) => {}
1270 Err(_) => {
1271 success_flag_clone.store(false, Ordering::Relaxed);
1272 return;
1273 }
1274 }
1275
1276 if start_time.elapsed() > Duration::from_secs(10) {
1278 success_flag_clone.store(false, Ordering::Relaxed);
1279 return;
1280 }
1281 }
1282 })
1283 })
1284 .collect();
1285
1286 let start_time = Instant::now();
1288 for (i, handle) in handles.into_iter().enumerate() {
1289 let join_result = handle.join();
1290
1291 if start_time.elapsed() > Duration::from_secs(30) {
1293 panic!("Test timed out - possible deadlock detected!");
1294 }
1295
1296 if join_result.is_err() {
1297 panic!("Thread {} panicked during execution", i);
1298 }
1299 }
1300
1301 assert!(
1303 success_flag.load(Ordering::Relaxed),
1304 "Some operations failed"
1305 );
1306
1307 let final_count = cache_concurrent.len();
1309 assert!(
1310 final_count > 1 + num_threads * operations_per_thread / 2,
1311 "Expected more cache entries, got {}",
1312 final_count
1313 );
1314 }
1315
1316 #[test]
1317 fn test_exec_validation() {
1318 let query_ctx = QueryContext::arc();
1319 let plugins: Plugins = Plugins::new();
1320 plugins.insert(QueryOptions {
1321 disallow_cross_catalog_query: true,
1322 });
1323
1324 let sql = r#"
1325 SELECT * FROM demo;
1326 EXPLAIN SELECT * FROM demo;
1327 CREATE DATABASE test_database;
1328 SHOW DATABASES;
1329 "#;
1330 let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1331 assert_eq!(stmts.len(), 4);
1332 for stmt in stmts {
1333 let re = check_permission(plugins.clone(), &stmt, &query_ctx);
1334 re.unwrap();
1335 }
1336
1337 let sql = r#"
1338 SHOW CREATE TABLE demo;
1339 ALTER TABLE demo ADD COLUMN new_col INT;
1340 "#;
1341 let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1342 assert_eq!(stmts.len(), 2);
1343 for stmt in stmts {
1344 let re = check_permission(plugins.clone(), &stmt, &query_ctx);
1345 re.unwrap();
1346 }
1347
1348 fn replace_test(template_sql: &str, plugins: Plugins, query_ctx: &QueryContextRef) {
1349 let right = vec![("", ""), ("", "public."), ("greptime.", "public.")];
1351 for (catalog, schema) in right {
1352 let sql = do_fmt(template_sql, catalog, schema);
1353 do_test(&sql, plugins.clone(), query_ctx, true);
1354 }
1355
1356 let wrong = vec![
1357 ("wrongcatalog.", "public."),
1358 ("wrongcatalog.", "wrongschema."),
1359 ];
1360 for (catalog, schema) in wrong {
1361 let sql = do_fmt(template_sql, catalog, schema);
1362 do_test(&sql, plugins.clone(), query_ctx, false);
1363 }
1364 }
1365
1366 fn do_fmt(template: &str, catalog: &str, schema: &str) -> String {
1367 let vars = HashMap::from([
1368 ("catalog".to_string(), catalog),
1369 ("schema".to_string(), schema),
1370 ]);
1371 template.format(&vars).unwrap()
1372 }
1373
1374 fn do_test(sql: &str, plugins: Plugins, query_ctx: &QueryContextRef, is_ok: bool) {
1375 let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
1376 let re = check_permission(plugins, stmt, query_ctx);
1377 if is_ok {
1378 re.unwrap();
1379 } else {
1380 assert!(re.is_err());
1381 }
1382 }
1383
1384 let sql = "INSERT INTO {catalog}{schema}monitor(host) VALUES ('host1');";
1386 replace_test(sql, plugins.clone(), &query_ctx);
1387
1388 let sql = r#"CREATE TABLE {catalog}{schema}demo(
1390 host STRING,
1391 ts TIMESTAMP,
1392 TIME INDEX (ts),
1393 PRIMARY KEY(host)
1394 ) engine=mito;"#;
1395 replace_test(sql, plugins.clone(), &query_ctx);
1396
1397 let sql = "DROP TABLE {catalog}{schema}demo;";
1399 replace_test(sql, plugins.clone(), &query_ctx);
1400
1401 let sql = "SHOW TABLES FROM public";
1403 let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1404 check_permission(plugins.clone(), &stmt[0], &query_ctx).unwrap();
1405
1406 let sql = "SHOW TABLES FROM private";
1407 let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1408 let re = check_permission(plugins.clone(), &stmt[0], &query_ctx);
1409 assert!(re.is_ok());
1410
1411 let sql = "DESC TABLE {catalog}{schema}demo;";
1413 replace_test(sql, plugins.clone(), &query_ctx);
1414
1415 let comment_flow_cases = [
1416 ("COMMENT ON FLOW my_flow IS 'comment';", true),
1417 ("COMMENT ON FLOW greptime.my_flow IS 'comment';", true),
1418 ("COMMENT ON FLOW wrongcatalog.my_flow IS 'comment';", false),
1419 ];
1420 for (sql, is_ok) in comment_flow_cases {
1421 let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
1422 let result = check_permission(plugins.clone(), stmt, &query_ctx);
1423 assert_eq!(result.is_ok(), is_ok);
1424 }
1425
1426 let show_flow_cases = [
1427 ("SHOW CREATE FLOW my_flow;", true),
1428 ("SHOW CREATE FLOW greptime.my_flow;", true),
1429 ("SHOW CREATE FLOW wrongcatalog.my_flow;", false),
1430 ];
1431 for (sql, is_ok) in show_flow_cases {
1432 let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
1433 let result = check_permission(plugins.clone(), stmt, &query_ctx);
1434 assert_eq!(result.is_ok(), is_ok);
1435 }
1436 }
1437}