1pub mod builder;
16mod dashboard;
17mod grpc;
18mod influxdb;
19mod jaeger;
20mod log_handler;
21mod logs;
22mod opentsdb;
23mod otlp;
24pub mod prom_store;
25mod promql;
26mod region_query;
27pub mod standalone;
28
29use std::pin::Pin;
30use std::sync::atomic::AtomicBool;
31use std::sync::{Arc, atomic};
32use std::time::{Duration, SystemTime};
33
34use async_stream::stream;
35use async_trait::async_trait;
36use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
37use catalog::CatalogManagerRef;
38use catalog::process_manager::{
39 ProcessManagerRef, QueryStatement as CatalogQueryStatement, SlowQueryTimer,
40};
41use client::OutputData;
42use common_base::Plugins;
43use common_base::cancellation::CancellableFuture;
44use common_error::ext::{BoxedError, ErrorExt};
45use common_event_recorder::EventRecorderRef;
46use common_meta::cache_invalidator::CacheInvalidatorRef;
47use common_meta::key::TableMetadataManagerRef;
48use common_meta::key::table_name::TableNameKey;
49use common_meta::node_manager::NodeManagerRef;
50use common_meta::procedure_executor::ProcedureExecutorRef;
51use common_query::Output;
52use common_recordbatch::RecordBatchStreamWrapper;
53use common_recordbatch::error::StreamTimeoutSnafu;
54use common_telemetry::logging::SlowQueryOptions;
55use common_telemetry::{debug, error, tracing};
56use dashmap::DashMap;
57use datafusion_expr::LogicalPlan;
58use futures::{Stream, StreamExt};
59use lazy_static::lazy_static;
60use operator::delete::DeleterRef;
61use operator::insert::InserterRef;
62use operator::statement::{StatementExecutor, StatementExecutorRef};
63use partition::manager::PartitionRuleManagerRef;
64use pipeline::pipeline_operator::PipelineOperator;
65use prometheus::HistogramTimer;
66use promql_parser::label::Matcher;
67use query::QueryEngineRef;
68use query::metrics::OnDone;
69use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
70use query::query_engine::DescribeResult;
71use query::query_engine::options::{QueryOptions, validate_catalog_and_schema};
72use servers::error::{
73 self as server_error, AuthSnafu, CommonMetaSnafu, ExecuteQuerySnafu,
74 OtlpMetricModeIncompatibleSnafu, ParsePromQLSnafu, UnexpectedResultSnafu,
75};
76use servers::interceptor::{
77 PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef,
78};
79use servers::otlp::metrics::legacy_normalize_otlp_name;
80use servers::prometheus_handler::PrometheusHandler;
81use servers::query_handler::sql::SqlQueryHandler;
82use session::context::{Channel, QueryContextRef};
83use session::table_name::table_idents_to_full_name;
84use snafu::prelude::*;
85use sql::ast::ObjectNamePartExt;
86use sql::dialect::Dialect;
87use sql::parser::{ParseOptions, ParserContext};
88use sql::statements::comment::CommentObject;
89use sql::statements::copy::{CopyDatabase, CopyTable};
90use sql::statements::statement::Statement;
91use sql::statements::tql::Tql;
92use sqlparser::ast::ObjectName;
93pub use standalone::StandaloneDatanodeManager;
94use table::requests::{OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM};
95use tracing::Span;
96
97use crate::error::{
98 self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, InvalidSqlSnafu,
99 ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu,
100 StatementTimeoutSnafu, TableOperationSnafu,
101};
102use crate::service_config::InfluxdbMergeMode;
103use crate::stream_wrapper::CancellableStreamWrapper;
104
105lazy_static! {
106 static ref OTLP_LEGACY_DEFAULT_VALUE: String = "legacy".to_string();
107}
108
109#[derive(Clone)]
113pub struct Instance {
114 frontend_peer_addr: String,
115 catalog_manager: CatalogManagerRef,
116 pipeline_operator: Arc<PipelineOperator>,
117 statement_executor: Arc<StatementExecutor>,
118 query_engine: QueryEngineRef,
119 plugins: Plugins,
120 inserter: InserterRef,
121 deleter: DeleterRef,
122 table_metadata_manager: TableMetadataManagerRef,
123 event_recorder: Option<EventRecorderRef>,
124 process_manager: ProcessManagerRef,
125 slow_query_options: SlowQueryOptions,
126 influxdb_default_merge_mode: InfluxdbMergeMode,
127 suspend: Arc<AtomicBool>,
128
129 otlp_metrics_table_legacy_cache: DashMap<String, DashMap<String, bool>>,
134}
135
136impl Instance {
137 pub fn frontend_peer_addr(&self) -> &str {
138 &self.frontend_peer_addr
139 }
140
141 pub fn catalog_manager(&self) -> &CatalogManagerRef {
142 &self.catalog_manager
143 }
144
145 pub fn query_engine(&self) -> &QueryEngineRef {
146 &self.query_engine
147 }
148
149 pub fn plugins(&self) -> &Plugins {
150 &self.plugins
151 }
152
153 pub fn statement_executor(&self) -> &StatementExecutorRef {
154 &self.statement_executor
155 }
156
157 pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
158 &self.table_metadata_manager
159 }
160
161 pub fn inserter(&self) -> &InserterRef {
162 &self.inserter
163 }
164
165 pub fn process_manager(&self) -> &ProcessManagerRef {
166 &self.process_manager
167 }
168
169 pub fn node_manager(&self) -> &NodeManagerRef {
170 self.inserter.node_manager()
171 }
172
173 pub fn partition_manager(&self) -> &PartitionRuleManagerRef {
174 self.inserter.partition_manager()
175 }
176
177 pub fn cache_invalidator(&self) -> &CacheInvalidatorRef {
178 self.statement_executor.cache_invalidator()
179 }
180
181 pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
182 self.statement_executor.procedure_executor()
183 }
184
185 pub fn suspend_state(&self) -> Arc<AtomicBool> {
186 self.suspend.clone()
187 }
188
189 pub(crate) fn is_suspended(&self) -> bool {
190 self.suspend.load(atomic::Ordering::Relaxed)
191 }
192}
193
194fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result<Vec<Statement>> {
195 ParserContext::create_with_dialect(sql, dialect, ParseOptions::default()).context(ParseSqlSnafu)
196}
197
198impl Instance {
199 async fn query_statement(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
200 check_permission(self.plugins.clone(), &stmt, &query_ctx)?;
201
202 let query_interceptor = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
203 let query_interceptor = query_interceptor.as_ref();
204
205 let is_readonly_stmt = stmt.is_readonly();
206 if should_track_statement_process(&stmt) {
207 let slow_query_timer = if is_readonly_stmt {
208 self.slow_query_options
209 .enable
210 .then(|| self.event_recorder.clone())
211 .flatten()
212 .map(|event_recorder| {
213 SlowQueryTimer::new(
214 CatalogQueryStatement::Sql(stmt.clone()),
215 self.slow_query_options.threshold,
216 self.slow_query_options.sample_ratio,
217 self.slow_query_options.record_type,
218 event_recorder,
219 )
220 })
221 } else {
222 None
223 };
224
225 let ticket = self.process_manager.register_query(
226 query_ctx.current_catalog().to_string(),
227 vec![query_ctx.current_schema()],
228 stmt.to_string(),
229 query_ctx.conn_info().to_string(),
230 Some(query_ctx.process_id()),
231 slow_query_timer,
232 );
233
234 let query_fut = self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor);
235
236 CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
237 .await
238 .map_err(|_| error::CancelledSnafu.build())?
239 .map(|output| {
240 let Output { meta, data } = output;
241
242 let data = match data {
243 OutputData::Stream(stream) => OutputData::Stream(Box::pin(
244 CancellableStreamWrapper::new(stream, ticket),
245 )),
246 other => other,
247 };
248 Output { data, meta }
249 })
250 } else {
251 self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor)
252 .await
253 }
254 }
255
256 async fn exec_statement_with_timeout(
257 &self,
258 stmt: Statement,
259 query_ctx: QueryContextRef,
260 query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
261 ) -> Result<Output> {
262 let timeout = derive_timeout(&stmt, &query_ctx);
263 match timeout {
264 Some(timeout) => {
265 let start = tokio::time::Instant::now();
266 let output = tokio::time::timeout(
267 timeout,
268 self.exec_statement(stmt, query_ctx, query_interceptor),
269 )
270 .await
271 .map_err(|_| StatementTimeoutSnafu.build())??;
272 let remaining_timeout = timeout.checked_sub(start.elapsed()).unwrap_or_default();
274 attach_timeout(output, remaining_timeout)
275 }
276 None => {
277 self.exec_statement(stmt, query_ctx, query_interceptor)
278 .await
279 }
280 }
281 }
282
283 async fn exec_statement(
284 &self,
285 stmt: Statement,
286 query_ctx: QueryContextRef,
287 query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
288 ) -> Result<Output> {
289 match stmt {
290 Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
291 if let Statement::Explain(explain) = &stmt
293 && let Some(format) = explain.format()
294 {
295 query_ctx.set_explain_format(format.to_string());
296 }
297
298 self.plan_and_exec_sql(stmt, &query_ctx, query_interceptor)
299 .await
300 }
301 Statement::Tql(tql) => {
302 self.plan_and_exec_tql(&query_ctx, query_interceptor, tql)
303 .await
304 }
305 _ => {
306 query_interceptor.pre_execute(Some(&stmt), None, query_ctx.clone())?;
307 self.statement_executor
308 .execute_sql(stmt, query_ctx)
309 .await
310 .context(TableOperationSnafu)
311 }
312 }
313 }
314
315 async fn plan_and_exec_sql(
316 &self,
317 stmt: Statement,
318 query_ctx: &QueryContextRef,
319 query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
320 ) -> Result<Output> {
321 let stmt = QueryStatement::Sql(stmt);
322 let plan = self
323 .statement_executor
324 .plan(&stmt, query_ctx.clone())
325 .await?;
326 let QueryStatement::Sql(stmt) = stmt else {
327 unreachable!()
328 };
329 query_interceptor.pre_execute(Some(&stmt), Some(&plan), query_ctx.clone())?;
330
331 self.statement_executor
332 .exec_plan(plan, query_ctx.clone())
333 .await
334 .context(TableOperationSnafu)
335 }
336
337 async fn plan_and_exec_tql(
338 &self,
339 query_ctx: &QueryContextRef,
340 query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
341 tql: Tql,
342 ) -> Result<Output> {
343 let plan = self
344 .statement_executor
345 .plan_tql(tql.clone(), query_ctx)
346 .await?;
347 query_interceptor.pre_execute(
348 Some(&Statement::Tql(tql)),
349 Some(&plan),
350 query_ctx.clone(),
351 )?;
352 self.statement_executor
353 .exec_plan(plan, query_ctx.clone())
354 .await
355 .context(TableOperationSnafu)
356 }
357
358 async fn check_otlp_legacy(
359 &self,
360 names: &[&String],
361 ctx: QueryContextRef,
362 ) -> server_error::Result<bool> {
363 let db_string = ctx.get_db_string();
364 let cache = self
366 .otlp_metrics_table_legacy_cache
367 .entry(db_string.clone())
368 .or_default();
369 if let Some(flag) = fast_legacy_check(&cache, names)? {
370 return Ok(flag);
371 }
372 drop(cache);
374
375 let catalog = ctx.current_catalog();
376 let schema = ctx.current_schema();
377
378 let normalized_names = names
380 .iter()
381 .map(|n| legacy_normalize_otlp_name(n))
382 .collect::<Vec<_>>();
383 let table_names = normalized_names
384 .iter()
385 .map(|n| TableNameKey::new(catalog, &schema, n))
386 .collect::<Vec<_>>();
387 let table_values = self
388 .table_metadata_manager()
389 .table_name_manager()
390 .batch_get(table_names)
391 .await
392 .context(CommonMetaSnafu)?;
393 let table_ids = table_values
394 .into_iter()
395 .filter_map(|v| v.map(|vi| vi.table_id()))
396 .collect::<Vec<_>>();
397
398 if table_ids.is_empty() {
400 let cache = self
401 .otlp_metrics_table_legacy_cache
402 .entry(db_string)
403 .or_default();
404 names.iter().for_each(|name| {
405 cache.insert((*name).clone(), false);
406 });
407 return Ok(false);
408 }
409
410 let table_infos = self
412 .table_metadata_manager()
413 .table_info_manager()
414 .batch_get(&table_ids)
415 .await
416 .context(CommonMetaSnafu)?;
417 let options = table_infos
418 .values()
419 .map(|info| {
420 info.table_info
421 .meta
422 .options
423 .extra_options
424 .get(OTLP_METRIC_COMPAT_KEY)
425 .unwrap_or(&OTLP_LEGACY_DEFAULT_VALUE)
426 })
427 .collect::<Vec<_>>();
428 let cache = self
429 .otlp_metrics_table_legacy_cache
430 .entry(db_string)
431 .or_default();
432 if !options.is_empty() {
433 let has_prom = options.iter().any(|opt| *opt == OTLP_METRIC_COMPAT_PROM);
435 let has_legacy = options
436 .iter()
437 .any(|opt| *opt == OTLP_LEGACY_DEFAULT_VALUE.as_str());
438 ensure!(!(has_prom && has_legacy), OtlpMetricModeIncompatibleSnafu);
439 let flag = has_legacy;
440 names.iter().for_each(|name| {
441 cache.insert((*name).clone(), flag);
442 });
443 Ok(flag)
444 } else {
445 names.iter().for_each(|name| {
447 cache.insert((*name).clone(), false);
448 });
449 Ok(false)
450 }
451 }
452}
453
454fn fast_legacy_check(
455 cache: &DashMap<String, bool>,
456 names: &[&String],
457) -> server_error::Result<Option<bool>> {
458 let hit_cache = names
459 .iter()
460 .filter_map(|name| cache.get(*name))
461 .collect::<Vec<_>>();
462 if !hit_cache.is_empty() {
463 let hit_legacy = hit_cache.iter().any(|en| *en.value());
464 let hit_prom = hit_cache.iter().any(|en| !*en.value());
465
466 ensure!(!(hit_legacy && hit_prom), OtlpMetricModeIncompatibleSnafu);
470
471 let flag = hit_legacy;
472 drop(hit_cache);
474
475 names.iter().for_each(|name| {
477 if !cache.contains_key(*name) {
478 cache.insert((*name).clone(), flag);
479 }
480 });
481 Ok(Some(flag))
482 } else {
483 Ok(None)
484 }
485}
486
487fn derive_timeout(stmt: &Statement, query_ctx: &QueryContextRef) -> Option<Duration> {
490 let query_timeout = query_ctx.query_timeout()?;
491 if query_timeout.is_zero() {
492 return None;
493 }
494 match query_ctx.channel() {
495 Channel::Mysql if stmt.is_readonly() => Some(query_timeout),
496 Channel::Postgres => Some(query_timeout),
497 _ => None,
498 }
499}
500
501fn derive_timeout_for_plan(plan: &LogicalPlan, query_ctx: &QueryContextRef) -> Option<Duration> {
503 let query_timeout = query_ctx.query_timeout()?;
504 if query_timeout.is_zero() {
505 return None;
506 }
507 match query_ctx.channel() {
508 Channel::Mysql if is_readonly_plan(plan) => Some(query_timeout),
509 Channel::Postgres => Some(query_timeout),
510 _ => None,
511 }
512}
513
514fn attach_timeout(output: Output, mut timeout: Duration) -> Result<Output> {
515 if timeout.is_zero() {
516 return StatementTimeoutSnafu.fail();
517 }
518
519 let output = match output.data {
520 OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
521 OutputData::Stream(mut stream) => {
522 let schema = stream.schema();
523 let s = Box::pin(stream! {
524 let mut start = tokio::time::Instant::now();
525 while let Some(item) = tokio::time::timeout(timeout, stream.next()).await.map_err(|_| StreamTimeoutSnafu.build())? {
526 yield item;
527
528 let now = tokio::time::Instant::now();
529 timeout = timeout.checked_sub(now - start).unwrap_or(Duration::ZERO);
530 start = now;
531 if timeout.is_zero() {
533 StreamTimeoutSnafu.fail()?;
534 }
535 }
536 }) as Pin<Box<dyn Stream<Item = _> + Send>>;
537 let stream = RecordBatchStreamWrapper {
538 schema,
539 stream: s,
540 output_ordering: None,
541 metrics: Default::default(),
542 span: Span::current(),
543 };
544 Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
545 }
546 };
547
548 Ok(output)
549}
550
551impl Instance {
552 #[tracing::instrument(skip_all, name = "SqlQueryHandler::do_query")]
553 async fn do_query_inner(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
554 if self.is_suspended() {
555 return vec![error::SuspendedSnafu {}.fail()];
556 }
557
558 let query_interceptor_opt = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
559 let query_interceptor = query_interceptor_opt.as_ref();
560 let query = match query_interceptor.pre_parsing(query, query_ctx.clone()) {
561 Ok(q) => q,
562 Err(e) => return vec![Err(e)],
563 };
564
565 let checker_ref = self.plugins.get::<PermissionCheckerRef>();
566 let checker = checker_ref.as_ref();
567
568 match parse_stmt(query.as_ref(), query_ctx.sql_dialect())
569 .and_then(|stmts| query_interceptor.post_parsing(stmts, query_ctx.clone()))
570 {
571 Ok(stmts) => {
572 if stmts.is_empty() {
573 return vec![
574 InvalidSqlSnafu {
575 err_msg: "empty statements",
576 }
577 .fail(),
578 ];
579 }
580
581 let mut results = Vec::with_capacity(stmts.len());
582 for stmt in stmts {
583 if let Err(e) = checker
584 .check_permission(
585 query_ctx.current_user(),
586 PermissionReq::SqlStatement(&stmt),
587 )
588 .context(PermissionSnafu)
589 {
590 results.push(Err(e));
591 break;
592 }
593
594 match self.query_statement(stmt.clone(), query_ctx.clone()).await {
595 Ok(output) => {
596 let output_result =
597 query_interceptor.post_execute(output, query_ctx.clone());
598 results.push(output_result);
599 }
600 Err(e) => {
601 if e.status_code().should_log_error() {
602 error!(e; "Failed to execute query: {stmt}");
603 } else {
604 debug!("Failed to execute query: {stmt}, {e}");
605 }
606 results.push(Err(e));
607 break;
608 }
609 }
610 }
611 results
612 }
613 Err(e) => {
614 vec![Err(e)]
615 }
616 }
617 }
618
619 async fn exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
620 self.query_engine
621 .execute(plan, query_ctx)
622 .await
623 .context(ExecLogicalPlanSnafu)
624 }
625
626 async fn exec_plan_with_timeout(
627 &self,
628 plan: LogicalPlan,
629 query_ctx: QueryContextRef,
630 ) -> Result<Output> {
631 let timeout = derive_timeout_for_plan(&plan, &query_ctx);
632 match timeout {
633 Some(timeout) => {
634 let start = tokio::time::Instant::now();
635 let output = tokio::time::timeout(timeout, self.exec_plan(plan, query_ctx))
636 .await
637 .map_err(|_| StatementTimeoutSnafu.build())??;
638 let remaining_timeout = timeout.checked_sub(start.elapsed()).unwrap_or_default();
639 attach_timeout(output, remaining_timeout)
640 }
641 None => self.exec_plan(plan, query_ctx).await,
642 }
643 }
644
645 async fn do_exec_plan_inner(
646 &self,
647 plan: LogicalPlan,
648 stmt: Option<Statement>,
649 query_ctx: QueryContextRef,
650 ) -> Result<Output> {
651 ensure!(!self.is_suspended(), error::SuspendedSnafu);
652
653 let query_interceptor_opt = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
654 let query_interceptor = query_interceptor_opt.as_ref();
655
656 query_interceptor.pre_execute(stmt.as_ref(), Some(&plan), query_ctx.clone())?;
657
658 let query = stmt
659 .as_ref()
660 .map(|s| s.to_string())
661 .unwrap_or_else(|| plan.display_indent().to_string());
662
663 let plan_is_readonly = is_readonly_plan(&plan);
664 let result = if should_track_plan_process(stmt.as_ref(), &plan) {
665 let slow_query_timer = if plan_is_readonly {
666 self.slow_query_options
667 .enable
668 .then(|| self.event_recorder.clone())
669 .flatten()
670 .map(|event_recorder| {
671 SlowQueryTimer::new(
672 CatalogQueryStatement::Plan(query.clone()),
673 self.slow_query_options.threshold,
674 self.slow_query_options.sample_ratio,
675 self.slow_query_options.record_type,
676 event_recorder,
677 )
678 })
679 } else {
680 None
681 };
682
683 let ticket = self.process_manager.register_query(
684 query_ctx.current_catalog().to_string(),
685 vec![query_ctx.current_schema()],
686 query,
687 query_ctx.conn_info().to_string(),
688 Some(query_ctx.process_id()),
689 slow_query_timer,
690 );
691
692 let query_fut = self.exec_plan_with_timeout(plan, query_ctx.clone());
693
694 CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
695 .await
696 .map_err(|_| error::CancelledSnafu.build())?
697 .map(|output| {
698 let Output { meta, data } = output;
699
700 let data = match data {
701 OutputData::Stream(stream) => OutputData::Stream(Box::pin(
702 CancellableStreamWrapper::new(stream, ticket),
703 )),
704 other => other,
705 };
706 Output { data, meta }
707 })
708 } else {
709 self.exec_plan_with_timeout(plan, query_ctx.clone()).await
710 };
711
712 result.and_then(|output| query_interceptor.post_execute(output, query_ctx))
713 }
714
715 #[tracing::instrument(skip_all, name = "SqlQueryHandler::do_promql_query")]
716 async fn do_promql_query_inner(
717 &self,
718 query: &PromQuery,
719 query_ctx: QueryContextRef,
720 ) -> Vec<Result<Output>> {
721 if self.is_suspended() {
722 return vec![error::SuspendedSnafu {}.fail()];
723 }
724
725 let result = PrometheusHandler::do_query(self, query, query_ctx)
727 .await
728 .with_context(|_| ExecutePromqlSnafu {
729 query: format!("{query:?}"),
730 });
731 vec![result]
732 }
733
734 async fn do_describe_inner(
735 &self,
736 stmt: Statement,
737 query_ctx: QueryContextRef,
738 ) -> Result<Option<DescribeResult>> {
739 ensure!(!self.is_suspended(), error::SuspendedSnafu);
740
741 let is_inner_plannable = |s: &Statement| {
745 matches!(
746 s,
747 Statement::Insert(_) | Statement::Query(_) | Statement::Delete(_)
748 )
749 };
750 let plannable = is_inner_plannable(&stmt)
751 || matches!(&stmt, Statement::Explain(explain) if is_inner_plannable(explain.statement.as_ref()));
752
753 if plannable {
754 self.plugins
755 .get::<PermissionCheckerRef>()
756 .as_ref()
757 .check_permission(query_ctx.current_user(), PermissionReq::SqlStatement(&stmt))
758 .context(PermissionSnafu)?;
759
760 let plan = self
761 .query_engine
762 .planner()
763 .plan(&QueryStatement::Sql(stmt), query_ctx.clone())
764 .await
765 .context(PlanStatementSnafu)?;
766 self.query_engine
767 .describe(plan, query_ctx)
768 .await
769 .map(Some)
770 .context(error::DescribeStatementSnafu)
771 } else {
772 Ok(None)
773 }
774 }
775
776 async fn is_valid_schema_inner(&self, catalog: &str, schema: &str) -> Result<bool> {
777 self.catalog_manager
778 .schema_exists(catalog, schema, None)
779 .await
780 .context(error::CatalogSnafu)
781 }
782}
783
784#[async_trait]
785impl SqlQueryHandler for Instance {
786 async fn do_query(
787 &self,
788 query: &str,
789 query_ctx: QueryContextRef,
790 ) -> Vec<server_error::Result<Output>> {
791 self.do_query_inner(query, query_ctx)
792 .await
793 .into_iter()
794 .map(|result| result.map_err(BoxedError::new).context(ExecuteQuerySnafu))
795 .collect()
796 }
797
798 async fn do_exec_plan(
799 &self,
800 plan: LogicalPlan,
801 stmt: Option<Statement>,
802 query_ctx: QueryContextRef,
803 ) -> server_error::Result<Output> {
804 self.do_exec_plan_inner(plan, stmt, query_ctx)
805 .await
806 .map_err(BoxedError::new)
807 .context(server_error::ExecutePlanSnafu)
808 }
809
810 async fn do_promql_query(
811 &self,
812 query: &PromQuery,
813 query_ctx: QueryContextRef,
814 ) -> Vec<server_error::Result<Output>> {
815 self.do_promql_query_inner(query, query_ctx)
816 .await
817 .into_iter()
818 .map(|result| result.map_err(BoxedError::new).context(ExecuteQuerySnafu))
819 .collect()
820 }
821
822 async fn do_describe(
823 &self,
824 stmt: Statement,
825 query_ctx: QueryContextRef,
826 ) -> server_error::Result<Option<DescribeResult>> {
827 self.do_describe_inner(stmt, query_ctx)
828 .await
829 .map_err(BoxedError::new)
830 .context(server_error::DescribeStatementSnafu)
831 }
832
833 async fn is_valid_schema(&self, catalog: &str, schema: &str) -> server_error::Result<bool> {
834 self.is_valid_schema_inner(catalog, schema)
835 .await
836 .map_err(BoxedError::new)
837 .context(server_error::CheckDatabaseValiditySnafu)
838 }
839}
840
841pub fn attach_timer(output: Output, timer: HistogramTimer) -> Output {
843 match output.data {
844 OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
845 OutputData::Stream(stream) => {
846 let stream = OnDone::new(stream, move || {
847 timer.observe_duration();
848 });
849 Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
850 }
851 }
852}
853
854#[async_trait]
855impl PrometheusHandler for Instance {
856 #[tracing::instrument(skip_all)]
857 async fn do_query(
858 &self,
859 query: &PromQuery,
860 query_ctx: QueryContextRef,
861 ) -> server_error::Result<Output> {
862 let interceptor = self
863 .plugins
864 .get::<PromQueryInterceptorRef<server_error::Error>>();
865
866 self.plugins
867 .get::<PermissionCheckerRef>()
868 .as_ref()
869 .check_permission(query_ctx.current_user(), PermissionReq::PromQuery)
870 .context(AuthSnafu)?;
871
872 let stmt = QueryLanguageParser::parse_promql(query, &query_ctx).with_context(|_| {
873 ParsePromQLSnafu {
874 query: query.clone(),
875 }
876 })?;
877
878 let plan = self
879 .statement_executor
880 .plan(&stmt, query_ctx.clone())
881 .await
882 .map_err(BoxedError::new)
883 .context(ExecuteQuerySnafu)?;
884
885 let QueryStatement::Promql(eval_stmt, _) = &stmt else {
886 unreachable!("query is parsed from promql");
887 };
888
889 interceptor.pre_execute(query, &eval_stmt.expr, Some(&plan), query_ctx.clone())?;
890
891 let query_statement = if let QueryStatement::Promql(eval_stmt, alias) = stmt {
893 CatalogQueryStatement::Promql(eval_stmt, alias)
894 } else {
895 return UnexpectedResultSnafu {
897 reason: "The query should always be promql.".to_string(),
898 }
899 .fail();
900 };
901 let raw_query = query_statement.to_string();
902
903 let slow_query_timer = self
904 .slow_query_options
905 .enable
906 .then(|| self.event_recorder.clone())
907 .flatten()
908 .map(|event_recorder| {
909 SlowQueryTimer::new(
910 query_statement,
911 self.slow_query_options.threshold,
912 self.slow_query_options.sample_ratio,
913 self.slow_query_options.record_type,
914 event_recorder,
915 )
916 });
917
918 let ticket = self.process_manager.register_query(
919 query_ctx.current_catalog().to_string(),
920 vec![query_ctx.current_schema()],
921 raw_query,
922 query_ctx.conn_info().to_string(),
923 Some(query_ctx.process_id()),
924 slow_query_timer,
925 );
926
927 let query_fut = self.statement_executor.exec_plan(plan, query_ctx.clone());
928
929 let output = CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
930 .await
931 .map_err(|_| servers::error::CancelledSnafu.build())?
932 .map(|output| {
933 let Output { meta, data } = output;
934 let data = match data {
935 OutputData::Stream(stream) => {
936 OutputData::Stream(Box::pin(CancellableStreamWrapper::new(stream, ticket)))
937 }
938 other => other,
939 };
940 Output { data, meta }
941 })
942 .map_err(BoxedError::new)
943 .context(ExecuteQuerySnafu)?;
944
945 Ok(interceptor.post_execute(output, query_ctx)?)
946 }
947
948 async fn query_metric_names(
949 &self,
950 matchers: Vec<Matcher>,
951 ctx: &QueryContextRef,
952 ) -> server_error::Result<Vec<String>> {
953 self.handle_query_metric_names(matchers, ctx)
954 .await
955 .map_err(BoxedError::new)
956 .context(ExecuteQuerySnafu)
957 }
958
959 async fn query_label_values(
960 &self,
961 metric: String,
962 label_name: String,
963 matchers: Vec<Matcher>,
964 start: SystemTime,
965 end: SystemTime,
966 ctx: &QueryContextRef,
967 ) -> server_error::Result<Vec<String>> {
968 self.handle_query_label_values(metric, label_name, matchers, start, end, ctx)
969 .await
970 .map_err(BoxedError::new)
971 .context(ExecuteQuerySnafu)
972 }
973
974 fn catalog_manager(&self) -> CatalogManagerRef {
975 self.catalog_manager.clone()
976 }
977}
978
979macro_rules! validate_db_permission {
981 ($stmt: expr, $query_ctx: expr) => {
982 if let Some(database) = &$stmt.database {
983 validate_catalog_and_schema($query_ctx.current_catalog(), database, $query_ctx)
984 .map_err(BoxedError::new)
985 .context(SqlExecInterceptedSnafu)?;
986 }
987 };
988}
989
990pub fn check_permission(
991 plugins: Plugins,
992 stmt: &Statement,
993 query_ctx: &QueryContextRef,
994) -> Result<()> {
995 let need_validate = plugins
996 .get::<QueryOptions>()
997 .map(|opts| opts.disallow_cross_catalog_query)
998 .unwrap_or_default();
999
1000 if !need_validate {
1001 return Ok(());
1002 }
1003
1004 match stmt {
1005 Statement::Admin(_) => {}
1008 Statement::Query(_)
1010 | Statement::Explain(_)
1011 | Statement::Tql(_)
1012 | Statement::Delete(_)
1013 | Statement::DeclareCursor(_)
1014 | Statement::Copy(sql::statements::copy::Copy::CopyQueryTo(_)) => {}
1015 Statement::CreateDatabase(_)
1017 | Statement::ShowDatabases(_)
1018 | Statement::DropDatabase(_)
1019 | Statement::AlterDatabase(_)
1020 | Statement::DropFlow(_)
1021 | Statement::Use(_) => {}
1022 #[cfg(feature = "enterprise")]
1023 Statement::DropTrigger(_) => {}
1024 Statement::ShowCreateDatabase(stmt) => {
1025 validate_database(&stmt.database_name, query_ctx)?;
1026 }
1027 Statement::ShowCreateTable(stmt) => {
1028 validate_param(&stmt.table_name, query_ctx)?;
1029 }
1030 Statement::ShowCreateFlow(stmt) => {
1031 validate_flow(&stmt.flow_name, query_ctx)?;
1032 }
1033 #[cfg(feature = "enterprise")]
1034 Statement::ShowCreateTrigger(stmt) => {
1035 validate_param(&stmt.trigger_name, query_ctx)?;
1036 }
1037 Statement::ShowCreateView(stmt) => {
1038 validate_param(&stmt.view_name, query_ctx)?;
1039 }
1040 Statement::CreateExternalTable(stmt) => {
1041 validate_param(&stmt.name, query_ctx)?;
1042 }
1043 Statement::CreateFlow(stmt) => {
1044 validate_param(&stmt.sink_table_name, query_ctx)?;
1046 }
1047 #[cfg(feature = "enterprise")]
1048 Statement::CreateTrigger(stmt) => {
1049 validate_param(&stmt.trigger_name, query_ctx)?;
1050 }
1051 Statement::CreateView(stmt) => {
1052 validate_param(&stmt.name, query_ctx)?;
1053 }
1054 Statement::AlterTable(stmt) => {
1055 validate_param(stmt.table_name(), query_ctx)?;
1056 }
1057 #[cfg(feature = "enterprise")]
1058 Statement::AlterTrigger(_) => {}
1059 Statement::SetVariables(_) | Statement::ShowVariables(_) => {}
1061 Statement::ShowCharset(_) | Statement::ShowCollation(_) => {}
1063
1064 Statement::Comment(comment) => match &comment.object {
1065 CommentObject::Table(table) => validate_param(table, query_ctx)?,
1066 CommentObject::Column { table, .. } => validate_param(table, query_ctx)?,
1067 CommentObject::Flow(flow) => validate_flow(flow, query_ctx)?,
1068 },
1069
1070 Statement::Insert(insert) => {
1071 let name = insert.table_name().context(ParseSqlSnafu)?;
1072 validate_param(name, query_ctx)?;
1073 }
1074 Statement::CreateTable(stmt) => {
1075 validate_param(&stmt.name, query_ctx)?;
1076 }
1077 Statement::CreateTableLike(stmt) => {
1078 validate_param(&stmt.table_name, query_ctx)?;
1079 validate_param(&stmt.source_name, query_ctx)?;
1080 }
1081 Statement::DropTable(drop_stmt) => {
1082 for table_name in drop_stmt.table_names() {
1083 validate_param(table_name, query_ctx)?;
1084 }
1085 }
1086 Statement::DropView(stmt) => {
1087 validate_param(&stmt.view_name, query_ctx)?;
1088 }
1089 Statement::ShowTables(stmt) => {
1090 validate_db_permission!(stmt, query_ctx);
1091 }
1092 Statement::ShowTableStatus(stmt) => {
1093 validate_db_permission!(stmt, query_ctx);
1094 }
1095 Statement::ShowColumns(stmt) => {
1096 validate_db_permission!(stmt, query_ctx);
1097 }
1098 Statement::ShowIndex(stmt) => {
1099 validate_db_permission!(stmt, query_ctx);
1100 }
1101 Statement::ShowRegion(stmt) => {
1102 validate_db_permission!(stmt, query_ctx);
1103 }
1104 Statement::ShowViews(stmt) => {
1105 validate_db_permission!(stmt, query_ctx);
1106 }
1107 Statement::ShowFlows(stmt) => {
1108 validate_db_permission!(stmt, query_ctx);
1109 }
1110 #[cfg(feature = "enterprise")]
1111 Statement::ShowTriggers(_stmt) => {
1112 }
1115 Statement::ShowStatus(_stmt) => {}
1116 Statement::ShowSearchPath(_stmt) => {}
1117 Statement::DescribeTable(stmt) => {
1118 validate_param(stmt.name(), query_ctx)?;
1119 }
1120 Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => match stmt {
1121 CopyTable::To(copy_table_to) => validate_param(©_table_to.table_name, query_ctx)?,
1122 CopyTable::From(copy_table_from) => {
1123 validate_param(©_table_from.table_name, query_ctx)?
1124 }
1125 },
1126 Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
1127 match copy_database {
1128 CopyDatabase::To(stmt) => validate_database(&stmt.database_name, query_ctx)?,
1129 CopyDatabase::From(stmt) => validate_database(&stmt.database_name, query_ctx)?,
1130 }
1131 }
1132 Statement::TruncateTable(stmt) => {
1133 validate_param(stmt.table_name(), query_ctx)?;
1134 }
1135 Statement::FetchCursor(_) | Statement::CloseCursor(_) => {}
1137 Statement::Kill(_) => {}
1139 Statement::ShowProcesslist(_) => {}
1141 }
1142 Ok(())
1143}
1144
1145fn validate_param(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1146 let (catalog, schema, _) = table_idents_to_full_name(name, query_ctx)
1147 .map_err(BoxedError::new)
1148 .context(ExternalSnafu)?;
1149
1150 validate_catalog_and_schema(&catalog, &schema, query_ctx)
1151 .map_err(BoxedError::new)
1152 .context(SqlExecInterceptedSnafu)
1153}
1154
1155fn validate_flow(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1156 let catalog = match &name.0[..] {
1157 [_flow] => query_ctx.current_catalog().to_string(),
1158 [catalog, _flow] => catalog.to_string_unquoted(),
1159 _ => {
1160 return InvalidSqlSnafu {
1161 err_msg: format!(
1162 "expect flow name to be <catalog>.<flow_name> or <flow_name>, actual: {name}",
1163 ),
1164 }
1165 .fail();
1166 }
1167 };
1168
1169 let schema = query_ctx.current_schema();
1170
1171 validate_catalog_and_schema(&catalog, &schema, query_ctx)
1172 .map_err(BoxedError::new)
1173 .context(SqlExecInterceptedSnafu)
1174}
1175
1176fn validate_database(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1177 let (catalog, schema) = match &name.0[..] {
1178 [schema] => (
1179 query_ctx.current_catalog().to_string(),
1180 schema.to_string_unquoted(),
1181 ),
1182 [catalog, schema] => (catalog.to_string_unquoted(), schema.to_string_unquoted()),
1183 _ => InvalidSqlSnafu {
1184 err_msg: format!(
1185 "expect database name to be <catalog>.<schema> or <schema>, actual: {name}",
1186 ),
1187 }
1188 .fail()?,
1189 };
1190
1191 validate_catalog_and_schema(&catalog, &schema, query_ctx)
1192 .map_err(BoxedError::new)
1193 .context(SqlExecInterceptedSnafu)
1194}
1195
1196fn is_readonly_plan(plan: &LogicalPlan) -> bool {
1197 !matches!(plan, LogicalPlan::Dml(_) | LogicalPlan::Ddl(_))
1198}
1199
1200fn should_track_statement_process(stmt: &Statement) -> bool {
1201 stmt.is_readonly()
1202 || matches!(stmt, Statement::Insert(insert) if insert.has_non_values_query_source())
1203}
1204
1205fn should_track_plan_process(stmt: Option<&Statement>, plan: &LogicalPlan) -> bool {
1206 is_readonly_plan(plan)
1207 || matches!(stmt, Some(Statement::Insert(insert)) if insert.has_non_values_query_source())
1208}
1209
1210#[cfg(test)]
1211mod tests {
1212 use std::collections::HashMap;
1213 use std::future::Future;
1214 use std::pin::Pin;
1215 use std::sync::atomic::{AtomicBool, Ordering};
1216 use std::sync::{Arc, Barrier};
1217 use std::task::{Context, Poll};
1218 use std::thread;
1219 use std::time::{Duration, Instant};
1220
1221 use api::v1::meta::{ProcedureDetailResponse, ReconcileRequest, ReconcileResponse};
1222 use catalog::process_manager::ProcessManager;
1223 use common_base::Plugins;
1224 use common_error::ext::{BoxedError, PlainError};
1225 use common_error::status_code::StatusCode;
1226 use common_meta::cache::LayeredCacheRegistryBuilder;
1227 use common_meta::kv_backend::memory::MemoryKvBackend;
1228 use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutor};
1229 use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
1230 use common_meta::rpc::procedure::{
1231 MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
1232 };
1233 use common_query::Output;
1234 use common_recordbatch::{
1235 OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream,
1236 };
1237 use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
1238 use datafusion_expr::dml::InsertOp;
1239 use datafusion_expr::{LogicalPlanBuilder, LogicalTableSource};
1240 use datatypes::prelude::ConcreteDataType;
1241 use datatypes::schema::{ColumnSchema, Schema as GtSchema, SchemaRef as GtSchemaRef};
1242 use query::query_engine::options::QueryOptions;
1243 use session::context::{Channel, ConnInfo, QueryContext, QueryContextBuilder};
1244 use snafu::{Location, Snafu};
1245 use sql::dialect::GreptimeDbDialect;
1246 use store_api::data_source::DataSource;
1247 use store_api::storage::ScanRequest;
1248 use strfmt::Format;
1249 use table::metadata::{FilterPushDownType, TableInfo, TableInfoBuilder, TableMetaBuilder};
1250 use table::test_util::EmptyTable;
1251 use table::{Table, TableRef};
1252 use tokio::sync::{mpsc, oneshot};
1253
1254 use super::*;
1255 use crate::frontend::FrontendOptions;
1256 use crate::instance::builder::FrontendBuilder;
1257
1258 #[derive(Debug, Snafu)]
1259 enum TestError {
1260 #[snafu(display("Failed to build test cache registry"))]
1261 BuildCacheRegistry {
1262 source: cache::error::Error,
1263 #[snafu(implicit)]
1264 location: Location,
1265 },
1266
1267 #[snafu(display("Failed to build test table meta for table: {table_name}"))]
1268 BuildTableMeta {
1269 table_name: String,
1270 source: table::metadata::TableMetaBuilderError,
1271 #[snafu(implicit)]
1272 location: Location,
1273 },
1274
1275 #[snafu(display("Failed to build test table info for table: {table_name}"))]
1276 BuildTableInfo {
1277 table_name: String,
1278 source: table::metadata::TableInfoBuilderError,
1279 #[snafu(implicit)]
1280 location: Location,
1281 },
1282
1283 #[snafu(display("Failed to register test table: {table_name}"))]
1284 RegisterTable {
1285 table_name: String,
1286 source: catalog::error::Error,
1287 #[snafu(implicit)]
1288 location: Location,
1289 },
1290
1291 #[snafu(display("Failed to build test frontend instance"))]
1292 BuildFrontend {
1293 source: crate::error::Error,
1294 #[snafu(implicit)]
1295 location: Location,
1296 },
1297
1298 #[snafu(display("Expected exactly one output for SQL `{sql}`, got {actual}"))]
1299 UnexpectedOutputCount {
1300 sql: String,
1301 actual: usize,
1302 #[snafu(implicit)]
1303 location: Location,
1304 },
1305
1306 #[snafu(display("Failed to execute SQL `{sql}`"))]
1307 ExecuteSql {
1308 sql: String,
1309 source: crate::error::Error,
1310 #[snafu(implicit)]
1311 location: Location,
1312 },
1313
1314 #[snafu(display("Timed out waiting for insert-select start notification"))]
1315 InsertStartTimeout {
1316 source: tokio::time::error::Elapsed,
1317 #[snafu(implicit)]
1318 location: Location,
1319 },
1320
1321 #[snafu(display("Insert-select start notification channel closed"))]
1322 InsertStartChannelClosed {
1323 #[snafu(implicit)]
1324 location: Location,
1325 },
1326
1327 #[snafu(display("Failed to release blocking insert-select interceptor"))]
1328 ReleaseBlockedInsert {
1329 #[snafu(implicit)]
1330 location: Location,
1331 },
1332
1333 #[snafu(display("Timed out waiting for insert-select source to be polled"))]
1334 SourcePollTimeout {
1335 source: tokio::time::error::Elapsed,
1336 #[snafu(implicit)]
1337 location: Location,
1338 },
1339
1340 #[snafu(display("Insert-select source poll notification channel closed"))]
1341 SourcePollChannelClosed {
1342 source: oneshot::error::RecvError,
1343 #[snafu(implicit)]
1344 location: Location,
1345 },
1346
1347 #[snafu(display("Timed out waiting for insert task to finish"))]
1348 InsertTaskTimeout {
1349 source: tokio::time::error::Elapsed,
1350 #[snafu(implicit)]
1351 location: Location,
1352 },
1353
1354 #[snafu(display("Insert task panicked"))]
1355 InsertTaskPanic {
1356 source: tokio::task::JoinError,
1357 #[snafu(implicit)]
1358 location: Location,
1359 },
1360
1361 #[snafu(display("Expected insert-select to be cancelled"))]
1362 InsertSelectNotCancelled {
1363 #[snafu(implicit)]
1364 location: Location,
1365 },
1366 }
1367
1368 type TestResult<T> = std::result::Result<T, TestError>;
1369
1370 fn parse_one_sql(sql: &str) -> Statement {
1371 parse_stmt(sql, &GreptimeDbDialect {}).unwrap().remove(0)
1372 }
1373
1374 fn test_query_ctx(process_id: u32) -> QueryContextRef {
1375 Arc::new(
1376 QueryContextBuilder::default()
1377 .channel(Channel::Mysql)
1378 .conn_info(ConnInfo::new(None, Channel::Mysql))
1379 .process_id(process_id)
1380 .build(),
1381 )
1382 }
1383
1384 struct BlockingInsertSelectInterceptor {
1385 started_tx: mpsc::UnboundedSender<()>,
1386 finish_rx: std::sync::Mutex<Option<oneshot::Receiver<()>>>,
1387 }
1388
1389 impl BlockingInsertSelectInterceptor {
1390 fn new(started_tx: mpsc::UnboundedSender<()>, finish_rx: oneshot::Receiver<()>) -> Self {
1391 Self {
1392 started_tx,
1393 finish_rx: std::sync::Mutex::new(Some(finish_rx)),
1394 }
1395 }
1396 }
1397
1398 impl SqlQueryInterceptor for BlockingInsertSelectInterceptor {
1399 type Error = Error;
1400
1401 fn pre_execute(
1402 &self,
1403 statement: Option<&Statement>,
1404 _plan: Option<&LogicalPlan>,
1405 _query_ctx: QueryContextRef,
1406 ) -> Result<()> {
1407 let Some(Statement::Insert(insert)) = statement else {
1408 return Ok(());
1409 };
1410 if !insert.has_non_values_query_source() {
1411 return Ok(());
1412 }
1413
1414 let finish_rx = self.finish_rx.lock().unwrap().take().unwrap();
1415 let _ = self.started_tx.send(());
1416 tokio::task::block_in_place(|| {
1417 tokio::runtime::Handle::current()
1418 .block_on(finish_rx)
1419 .unwrap();
1420 });
1421 Ok(())
1422 }
1423 }
1424
1425 struct PendingRecordBatchStream {
1426 schema: GtSchemaRef,
1427 polled_tx: Option<oneshot::Sender<()>>,
1428 _finish_tx: oneshot::Sender<()>,
1429 finish_rx: Pin<Box<oneshot::Receiver<()>>>,
1430 }
1431
1432 impl RecordBatchStream for PendingRecordBatchStream {
1433 fn schema(&self) -> GtSchemaRef {
1434 self.schema.clone()
1435 }
1436
1437 fn output_ordering(&self) -> Option<&[OrderOption]> {
1438 None
1439 }
1440
1441 fn metrics(&self) -> Option<common_recordbatch::adapter::RecordBatchMetrics> {
1442 None
1443 }
1444 }
1445
1446 impl Stream for PendingRecordBatchStream {
1447 type Item = common_recordbatch::error::Result<RecordBatch>;
1448
1449 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1450 if let Some(polled_tx) = self.polled_tx.take() {
1451 let _ = polled_tx.send(());
1452 }
1453
1454 match self.finish_rx.as_mut().poll(cx) {
1455 Poll::Ready(_) => Poll::Ready(None),
1456 Poll::Pending => Poll::Pending,
1457 }
1458 }
1459 }
1460
1461 impl Unpin for PendingRecordBatchStream {}
1462
1463 struct PendingDataSource {
1464 schema: GtSchemaRef,
1465 polled_tx: std::sync::Mutex<Option<oneshot::Sender<()>>>,
1466 }
1467
1468 impl DataSource for PendingDataSource {
1469 fn get_stream(
1470 &self,
1471 _request: ScanRequest,
1472 ) -> std::result::Result<SendableRecordBatchStream, BoxedError> {
1473 let (finish_tx, finish_rx) = oneshot::channel();
1474 let mut polled_tx = self.polled_tx.lock().map_err(|_| {
1475 BoxedError::new(PlainError::new(
1476 "pending data source lock poisoned".to_string(),
1477 StatusCode::Unexpected,
1478 ))
1479 })?;
1480 Ok(Box::pin(PendingRecordBatchStream {
1481 schema: self.schema.clone(),
1482 polled_tx: polled_tx.take(),
1483 _finish_tx: finish_tx,
1484 finish_rx: Box::pin(finish_rx),
1485 }))
1486 }
1487 }
1488
1489 struct NoopProcedureExecutor;
1490
1491 #[async_trait::async_trait]
1492 impl ProcedureExecutor for NoopProcedureExecutor {
1493 async fn submit_ddl_task(
1494 &self,
1495 _ctx: &ExecutorContext,
1496 _request: SubmitDdlTaskRequest,
1497 ) -> common_meta::error::Result<SubmitDdlTaskResponse> {
1498 common_meta::error::UnsupportedSnafu {
1499 operation: "submit_ddl_task",
1500 }
1501 .fail()
1502 }
1503
1504 async fn migrate_region(
1505 &self,
1506 _ctx: &ExecutorContext,
1507 _request: MigrateRegionRequest,
1508 ) -> common_meta::error::Result<MigrateRegionResponse> {
1509 common_meta::error::UnsupportedSnafu {
1510 operation: "migrate_region",
1511 }
1512 .fail()
1513 }
1514
1515 async fn reconcile(
1516 &self,
1517 _ctx: &ExecutorContext,
1518 _request: ReconcileRequest,
1519 ) -> common_meta::error::Result<ReconcileResponse> {
1520 common_meta::error::UnsupportedSnafu {
1521 operation: "reconcile",
1522 }
1523 .fail()
1524 }
1525
1526 async fn query_procedure_state(
1527 &self,
1528 _ctx: &ExecutorContext,
1529 _pid: &str,
1530 ) -> common_meta::error::Result<ProcedureStateResponse> {
1531 common_meta::error::UnsupportedSnafu {
1532 operation: "query_procedure_state",
1533 }
1534 .fail()
1535 }
1536
1537 async fn list_procedures(
1538 &self,
1539 _ctx: &ExecutorContext,
1540 ) -> common_meta::error::Result<ProcedureDetailResponse> {
1541 common_meta::error::UnsupportedSnafu {
1542 operation: "list_procedures",
1543 }
1544 .fail()
1545 }
1546 }
1547
1548 fn test_cache_registry(
1549 kv_backend: common_meta::kv_backend::KvBackendRef,
1550 ) -> TestResult<common_meta::cache::LayeredCacheRegistryRef> {
1551 Ok(Arc::new(
1552 cache::with_default_composite_cache_registry(
1553 LayeredCacheRegistryBuilder::default()
1554 .add_cache_registry(cache::build_fundamental_cache_registry(kv_backend)),
1555 )
1556 .context(BuildCacheRegistrySnafu)?
1557 .build(),
1558 ))
1559 }
1560
1561 fn test_table_info(table_id: u32, table_name: &str) -> TestResult<TableInfo> {
1562 let schema = Arc::new(GtSchema::new(vec![
1563 ColumnSchema::new("id", ConcreteDataType::int32_datatype(), false),
1564 ColumnSchema::new(
1565 "ts",
1566 ConcreteDataType::timestamp_millisecond_datatype(),
1567 false,
1568 )
1569 .with_time_index(true),
1570 ]));
1571 let table_meta = TableMetaBuilder::empty()
1572 .schema(schema)
1573 .primary_key_indices(vec![0])
1574 .value_indices(vec![1])
1575 .next_column_id(1024)
1576 .build()
1577 .with_context(|_| BuildTableMetaSnafu {
1578 table_name: table_name.to_string(),
1579 })?;
1580
1581 TableInfoBuilder::new(table_name, table_meta)
1582 .table_id(table_id)
1583 .build()
1584 .with_context(|_| BuildTableInfoSnafu {
1585 table_name: table_name.to_string(),
1586 })
1587 }
1588
1589 fn test_table(table_id: u32, table_name: &str) -> TestResult<table::TableRef> {
1590 let table_info = test_table_info(table_id, table_name)?;
1591 Ok(EmptyTable::from_table_info(&table_info))
1592 }
1593
1594 fn pending_table(
1595 table_id: u32,
1596 table_name: &str,
1597 polled_tx: oneshot::Sender<()>,
1598 ) -> TestResult<table::TableRef> {
1599 let table_info = test_table_info(table_id, table_name)?;
1600 let data_source = Arc::new(PendingDataSource {
1601 schema: table_info.meta.schema.clone(),
1602 polled_tx: std::sync::Mutex::new(Some(polled_tx)),
1603 });
1604
1605 Ok(Arc::new(Table::new(
1606 Arc::new(table_info),
1607 FilterPushDownType::Unsupported,
1608 data_source,
1609 )))
1610 }
1611
1612 async fn test_instance_with_tables(
1613 source_table: TableRef,
1614 target_table: TableRef,
1615 ) -> TestResult<Instance> {
1616 test_instance_with_plugins(source_table, target_table, Plugins::new()).await
1617 }
1618
1619 async fn test_instance_with_insert_select_interceptor(
1620 interceptor: SqlQueryInterceptorRef<Error>,
1621 ) -> TestResult<Instance> {
1622 let plugins = Plugins::new();
1623 plugins.insert::<SqlQueryInterceptorRef<Error>>(interceptor);
1624
1625 test_instance_with_plugins(
1626 test_table(1024, "source")?,
1627 test_table(1025, "target")?,
1628 plugins,
1629 )
1630 .await
1631 }
1632
1633 async fn test_instance_with_plugins(
1634 source_table: TableRef,
1635 target_table: TableRef,
1636 plugins: Plugins,
1637 ) -> TestResult<Instance> {
1638 let kv_backend = Arc::new(MemoryKvBackend::new());
1639 let process_manager = Arc::new(ProcessManager::new("test-frontend".to_string(), None));
1640 let catalog_manager = catalog::memory::MemoryCatalogManager::new_with_table(source_table);
1641 let target_table_name = "target";
1642 catalog_manager
1643 .register_table_sync(catalog::RegisterTableRequest {
1644 catalog: "greptime".to_string(),
1645 schema: "public".to_string(),
1646 table_name: target_table_name.to_string(),
1647 table_id: 1025,
1648 table: target_table,
1649 })
1650 .with_context(|_| RegisterTableSnafu {
1651 table_name: target_table_name.to_string(),
1652 })?;
1653 catalog_manager.register_process_list_table(process_manager.clone());
1654
1655 let cache_registry = test_cache_registry(kv_backend.clone())?;
1656
1657 FrontendBuilder::new(
1658 FrontendOptions::default(),
1659 kv_backend,
1660 cache_registry,
1661 catalog_manager,
1662 Arc::new(client::client_manager::NodeClients::default()),
1663 Arc::new(NoopProcedureExecutor),
1664 process_manager,
1665 )
1666 .with_plugin(plugins)
1667 .try_build()
1668 .await
1669 .context(BuildFrontendSnafu)
1670 }
1671
1672 async fn execute_one_sql(
1673 instance: &Instance,
1674 sql: &str,
1675 query_ctx: QueryContextRef,
1676 ) -> TestResult<Output> {
1677 let mut results = instance.do_query_inner(sql, query_ctx).await;
1678 ensure!(
1679 results.len() == 1,
1680 UnexpectedOutputCountSnafu {
1681 sql: sql.to_string(),
1682 actual: results.len(),
1683 }
1684 );
1685 results.remove(0).with_context(|_| ExecuteSqlSnafu {
1686 sql: sql.to_string(),
1687 })
1688 }
1689
1690 #[test]
1691 fn test_fast_legacy_check_deadlock_prevention() {
1692 let cache = DashMap::new();
1694
1695 cache.insert("metric1".to_string(), true); cache.insert("metric2".to_string(), false); cache.insert("metric3".to_string(), true); let metric1 = "metric1".to_string();
1702 let metric4 = "metric4".to_string();
1703 let names1 = vec![&metric1, &metric4];
1704 let result = fast_legacy_check(&cache, &names1);
1705 assert!(result.is_ok());
1706 assert_eq!(result.unwrap(), Some(true)); assert!(cache.contains_key("metric4"));
1710 assert!(*cache.get("metric4").unwrap().value());
1711
1712 let metric5 = "metric5".to_string();
1714 let metric6 = "metric6".to_string();
1715 let names2 = vec![&metric5, &metric6];
1716 let result = fast_legacy_check(&cache, &names2);
1717 assert!(result.is_ok());
1718 assert_eq!(result.unwrap(), None); let cache_incompatible = DashMap::new();
1722 cache_incompatible.insert("metric1".to_string(), true); cache_incompatible.insert("metric2".to_string(), false); let metric1_test = "metric1".to_string();
1725 let metric2_test = "metric2".to_string();
1726 let names3 = vec![&metric1_test, &metric2_test];
1727 let result = fast_legacy_check(&cache_incompatible, &names3);
1728 assert!(result.is_err()); let cache_concurrent = Arc::new(DashMap::new());
1734 cache_concurrent.insert("shared_metric".to_string(), true);
1735
1736 let num_threads = 8;
1737 let operations_per_thread = 100;
1738 let barrier = Arc::new(Barrier::new(num_threads));
1739 let success_flag = Arc::new(AtomicBool::new(true));
1740
1741 let handles: Vec<_> = (0..num_threads)
1742 .map(|thread_id| {
1743 let cache_clone = Arc::clone(&cache_concurrent);
1744 let barrier_clone = Arc::clone(&barrier);
1745 let success_flag_clone = Arc::clone(&success_flag);
1746
1747 thread::spawn(move || {
1748 barrier_clone.wait();
1750
1751 let start_time = Instant::now();
1752 for i in 0..operations_per_thread {
1753 let shared_metric = "shared_metric".to_string();
1755 let new_metric = format!("thread_{}_metric_{}", thread_id, i);
1756 let names = vec![&shared_metric, &new_metric];
1757
1758 match fast_legacy_check(&cache_clone, &names) {
1759 Ok(_) => {}
1760 Err(_) => {
1761 success_flag_clone.store(false, Ordering::Relaxed);
1762 return;
1763 }
1764 }
1765
1766 if start_time.elapsed() > Duration::from_secs(10) {
1768 success_flag_clone.store(false, Ordering::Relaxed);
1769 return;
1770 }
1771 }
1772 })
1773 })
1774 .collect();
1775
1776 let start_time = Instant::now();
1778 for (i, handle) in handles.into_iter().enumerate() {
1779 let join_result = handle.join();
1780
1781 if start_time.elapsed() > Duration::from_secs(30) {
1783 panic!("Test timed out - possible deadlock detected!");
1784 }
1785
1786 if join_result.is_err() {
1787 panic!("Thread {} panicked during execution", i);
1788 }
1789 }
1790
1791 assert!(
1793 success_flag.load(Ordering::Relaxed),
1794 "Some operations failed"
1795 );
1796
1797 let final_count = cache_concurrent.len();
1799 assert!(
1800 final_count > 1 + num_threads * operations_per_thread / 2,
1801 "Expected more cache entries, got {}",
1802 final_count
1803 );
1804 }
1805
1806 #[test]
1807 fn test_should_track_statement_process() {
1808 assert!(should_track_statement_process(&parse_one_sql(
1809 "SELECT * FROM demo"
1810 )));
1811 assert!(should_track_statement_process(&parse_one_sql(
1812 "INSERT INTO demo SELECT * FROM source"
1813 )));
1814 assert!(!should_track_statement_process(&parse_one_sql(
1815 "INSERT INTO demo VALUES (1)"
1816 )));
1817 assert!(!should_track_statement_process(&parse_one_sql(
1818 "INSERT INTO demo VALUES (now())"
1819 )));
1820 }
1821
1822 #[test]
1823 fn test_should_track_plan_process() {
1824 let select_stmt = parse_one_sql("SELECT * FROM demo");
1825 let insert_select_stmt = parse_one_sql("INSERT INTO demo SELECT * FROM source");
1826 let insert_values_stmt = parse_one_sql("INSERT INTO demo VALUES (now())");
1827
1828 let empty_plan = LogicalPlanBuilder::empty(false).build().unwrap();
1829 assert!(should_track_plan_process(Some(&select_stmt), &empty_plan));
1830 assert!(should_track_plan_process(
1831 Some(&insert_select_stmt),
1832 &insert_dml_plan()
1833 ));
1834 assert!(!should_track_plan_process(
1835 Some(&insert_values_stmt),
1836 &insert_dml_plan()
1837 ));
1838 assert!(!should_track_plan_process(None, &insert_dml_plan()));
1839 }
1840
1841 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1842 async fn test_insert_select_is_visible_in_show_processlist() -> TestResult<()> {
1843 let insert_sql = "INSERT INTO target SELECT * FROM source";
1844 let (started_tx, mut started_rx) = mpsc::unbounded_channel();
1845 let (finish_tx, finish_rx) = oneshot::channel();
1846 let interceptor = Arc::new(BlockingInsertSelectInterceptor::new(started_tx, finish_rx));
1847 let instance = Arc::new(test_instance_with_insert_select_interceptor(interceptor).await?);
1848
1849 let insert_task = tokio::spawn({
1850 let instance = instance.clone();
1851 async move { execute_one_sql(&instance, insert_sql, test_query_ctx(4242)).await }
1852 });
1853
1854 tokio::time::timeout(Duration::from_secs(5), started_rx.recv())
1855 .await
1856 .context(InsertStartTimeoutSnafu)?
1857 .context(InsertStartChannelClosedSnafu)?;
1858
1859 let output = execute_one_sql(&instance, "SHOW PROCESSLIST", test_query_ctx(43)).await?;
1860 let process_list = output.data.pretty_print().await;
1861 assert!(
1862 process_list.contains(insert_sql),
1863 "process list did not contain running insert:\n{process_list}"
1864 );
1865
1866 finish_tx
1867 .send(())
1868 .map_err(|_| ReleaseBlockedInsertSnafu.build())?;
1869 insert_task.await.context(InsertTaskPanicSnafu)??;
1870
1871 Ok(())
1872 }
1873
1874 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1875 async fn test_kill_query_cancels_insert_select() -> TestResult<()> {
1876 assert_kill_cancels_insert_select("KILL QUERY 4242").await
1877 }
1878
1879 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1880 async fn test_kill_process_id_cancels_insert_select() -> TestResult<()> {
1881 assert_kill_cancels_insert_select("KILL 'test-frontend/4242'").await
1882 }
1883
1884 async fn assert_kill_cancels_insert_select(kill_sql: &str) -> TestResult<()> {
1885 let insert_sql = "INSERT INTO target SELECT * FROM source";
1886 let (source_polled_tx, source_polled_rx) = oneshot::channel();
1887 let instance = Arc::new(
1888 test_instance_with_tables(
1889 pending_table(1024, "source", source_polled_tx)?,
1890 test_table(1025, "target")?,
1891 )
1892 .await?,
1893 );
1894
1895 let insert_task = tokio::spawn({
1896 let instance = instance.clone();
1897 async move { execute_one_sql(&instance, insert_sql, test_query_ctx(4242)).await }
1898 });
1899
1900 tokio::time::timeout(Duration::from_secs(5), source_polled_rx)
1901 .await
1902 .context(SourcePollTimeoutSnafu)?
1903 .context(SourcePollChannelClosedSnafu)?;
1904
1905 let output = execute_one_sql(&instance, kill_sql, test_query_ctx(43)).await?;
1906 assert!(matches!(output.data, OutputData::AffectedRows(1)));
1907
1908 let insert_result = tokio::time::timeout(Duration::from_secs(5), insert_task)
1909 .await
1910 .context(InsertTaskTimeoutSnafu)?
1911 .context(InsertTaskPanicSnafu)?;
1912 let err = match insert_result {
1913 Ok(_) => return InsertSelectNotCancelledSnafu.fail(),
1914 Err(TestError::ExecuteSql { source, .. }) => source,
1915 Err(err) => return Err(err),
1916 };
1917 assert_eq!(StatusCode::Cancelled, err.status_code());
1918
1919 let output = execute_one_sql(&instance, "SHOW PROCESSLIST", test_query_ctx(43)).await?;
1920 let process_list = output.data.pretty_print().await;
1921 assert!(
1922 !process_list.contains(insert_sql),
1923 "process list still contains killed insert:\n{process_list}"
1924 );
1925
1926 Ok(())
1927 }
1928
1929 fn insert_dml_plan() -> LogicalPlan {
1930 let schema = SchemaRef::new(Schema::new(vec![Field::new(
1931 "value",
1932 DataType::Int64,
1933 true,
1934 )]));
1935 let target = Arc::new(LogicalTableSource::new(schema));
1936 let input = LogicalPlanBuilder::empty(false).build().unwrap();
1937
1938 LogicalPlanBuilder::insert_into(input, "demo", target, InsertOp::Append)
1939 .unwrap()
1940 .build()
1941 .unwrap()
1942 }
1943
1944 #[test]
1945 fn test_exec_validation() {
1946 let query_ctx = QueryContext::arc();
1947 let plugins: Plugins = Plugins::new();
1948 plugins.insert(QueryOptions {
1949 disallow_cross_catalog_query: true,
1950 });
1951
1952 let sql = r#"
1953 SELECT * FROM demo;
1954 EXPLAIN SELECT * FROM demo;
1955 CREATE DATABASE test_database;
1956 SHOW DATABASES;
1957 "#;
1958 let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1959 assert_eq!(stmts.len(), 4);
1960 for stmt in stmts {
1961 let re = check_permission(plugins.clone(), &stmt, &query_ctx);
1962 re.unwrap();
1963 }
1964
1965 let sql = r#"
1966 SHOW CREATE TABLE demo;
1967 ALTER TABLE demo ADD COLUMN new_col INT;
1968 "#;
1969 let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1970 assert_eq!(stmts.len(), 2);
1971 for stmt in stmts {
1972 let re = check_permission(plugins.clone(), &stmt, &query_ctx);
1973 re.unwrap();
1974 }
1975
1976 fn replace_test(template_sql: &str, plugins: Plugins, query_ctx: &QueryContextRef) {
1977 let right = vec![("", ""), ("", "public."), ("greptime.", "public.")];
1979 for (catalog, schema) in right {
1980 let sql = do_fmt(template_sql, catalog, schema);
1981 do_test(&sql, plugins.clone(), query_ctx, true);
1982 }
1983
1984 let wrong = vec![
1985 ("wrongcatalog.", "public."),
1986 ("wrongcatalog.", "wrongschema."),
1987 ];
1988 for (catalog, schema) in wrong {
1989 let sql = do_fmt(template_sql, catalog, schema);
1990 do_test(&sql, plugins.clone(), query_ctx, false);
1991 }
1992 }
1993
1994 fn do_fmt(template: &str, catalog: &str, schema: &str) -> String {
1995 let vars = HashMap::from([
1996 ("catalog".to_string(), catalog),
1997 ("schema".to_string(), schema),
1998 ]);
1999 template.format(&vars).unwrap()
2000 }
2001
2002 fn do_test(sql: &str, plugins: Plugins, query_ctx: &QueryContextRef, is_ok: bool) {
2003 let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
2004 let re = check_permission(plugins, stmt, query_ctx);
2005 if is_ok {
2006 re.unwrap();
2007 } else {
2008 assert!(re.is_err());
2009 }
2010 }
2011
2012 let sql = "INSERT INTO {catalog}{schema}monitor(host) VALUES ('host1');";
2014 replace_test(sql, plugins.clone(), &query_ctx);
2015
2016 let sql = r#"CREATE TABLE {catalog}{schema}demo(
2018 host STRING,
2019 ts TIMESTAMP,
2020 TIME INDEX (ts),
2021 PRIMARY KEY(host)
2022 ) engine=mito;"#;
2023 replace_test(sql, plugins.clone(), &query_ctx);
2024
2025 let sql = "DROP TABLE {catalog}{schema}demo;";
2027 replace_test(sql, plugins.clone(), &query_ctx);
2028
2029 let sql = "SHOW TABLES FROM public";
2031 let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
2032 check_permission(plugins.clone(), &stmt[0], &query_ctx).unwrap();
2033
2034 let sql = "SHOW TABLES FROM private";
2035 let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
2036 let re = check_permission(plugins.clone(), &stmt[0], &query_ctx);
2037 assert!(re.is_ok());
2038
2039 let sql = "DESC TABLE {catalog}{schema}demo;";
2041 replace_test(sql, plugins.clone(), &query_ctx);
2042
2043 let comment_flow_cases = [
2044 ("COMMENT ON FLOW my_flow IS 'comment';", true),
2045 ("COMMENT ON FLOW greptime.my_flow IS 'comment';", true),
2046 ("COMMENT ON FLOW wrongcatalog.my_flow IS 'comment';", false),
2047 ];
2048 for (sql, is_ok) in comment_flow_cases {
2049 let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
2050 let result = check_permission(plugins.clone(), stmt, &query_ctx);
2051 assert_eq!(result.is_ok(), is_ok);
2052 }
2053
2054 let show_flow_cases = [
2055 ("SHOW CREATE FLOW my_flow;", true),
2056 ("SHOW CREATE FLOW greptime.my_flow;", true),
2057 ("SHOW CREATE FLOW wrongcatalog.my_flow;", false),
2058 ];
2059 for (sql, is_ok) in show_flow_cases {
2060 let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
2061 let result = check_permission(plugins.clone(), stmt, &query_ctx);
2062 assert_eq!(result.is_ok(), is_ok);
2063 }
2064 }
2065}