1use std::collections::HashMap;
16use std::fmt;
17use std::num::NonZeroUsize;
18use std::sync::{Arc, RwLock};
19
20use async_trait::async_trait;
21use catalog::CatalogManagerRef;
22use common_base::Plugins;
23use common_function::aggrs::aggr_wrapper::fix_order::FixStateUdafOrderingAnalyzer;
24use common_function::function_factory::ScalarFunctionFactory;
25use common_function::function_registry::FUNCTION_REGISTRY;
26use common_function::handlers::{
27 FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef,
28};
29use common_function::state::FunctionState;
30use common_stat::get_total_memory_bytes;
31use common_telemetry::warn;
32use datafusion::catalog::TableFunction;
33use datafusion::dataframe::DataFrame;
34use datafusion::error::Result as DfResult;
35use datafusion::execution::SessionStateBuilder;
36use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionContext, SessionState};
37use datafusion::execution::memory_pool::{
38 GreedyMemoryPool, MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation,
39 TrackConsumersPool,
40};
41use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder};
42use datafusion::physical_optimizer::PhysicalOptimizerRule;
43use datafusion::physical_optimizer::optimizer::PhysicalOptimizer;
44use datafusion::physical_optimizer::sanity_checker::SanityCheckPlan;
45use datafusion::physical_plan::ExecutionPlan;
46use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner};
47use datafusion_expr::{AggregateUDF, LogicalPlan as DfLogicalPlan, WindowUDF};
48use datafusion_optimizer::Analyzer;
49use datafusion_optimizer::analyzer::function_rewrite::ApplyFunctionRewrites;
50use datafusion_optimizer::optimizer::Optimizer;
51use partition::manager::PartitionRuleManagerRef;
52use promql::extension_plan::PromExtensionPlanner;
53use session::context::QueryContextRef;
54use table::TableRef;
55use table::table::adapter::DfTableProviderAdapter;
56
57use crate::QueryEngineContext;
58use crate::dist_plan::{
59 DistExtensionPlanner, DistPlannerAnalyzer, DistPlannerOptions, DynFilterRegistryManager,
60 MergeSortExtensionPlanner, RemoteDynFilterReceiverExtensionPlanner,
61 RemoteDynFilterRegistryLease,
62};
63use crate::metrics::{QUERY_MEMORY_POOL_REJECTED_TOTAL, QUERY_MEMORY_POOL_USAGE_BYTES};
64use crate::optimizer::ExtensionAnalyzerRule;
65use crate::optimizer::const_normalization::ConstNormalizationRule;
66use crate::optimizer::constant_term::MatchesConstantTermOptimizer;
67use crate::optimizer::count_nest_aggr::CountNestAggrRule;
68use crate::optimizer::count_wildcard::CountWildcardToTimeIndexRule;
69use crate::optimizer::json_type_concretize::JsonTypeConcretizeRule;
70use crate::optimizer::parallelize_scan::ParallelizeScan;
71use crate::optimizer::pass_distribution::PassDistribution;
72use crate::optimizer::promql_tsid_narrow_join::PromqlTsidNarrowJoin;
73use crate::optimizer::remove_duplicate::RemoveDuplicate;
74use crate::optimizer::scan_hint::ScanHintRule;
75use crate::optimizer::string_normalization::StringNormalizationRule;
76use crate::optimizer::transcribe_atat::TranscribeAtatRule;
77use crate::optimizer::type_conversion::TypeConversionRule;
78use crate::optimizer::windowed_sort::WindowedSortPhysicalRule;
79use crate::options::QueryOptions as QueryOptionsNew;
80use crate::query_engine::DefaultSerializer;
81use crate::query_engine::options::QueryOptions;
82use crate::range_select::planner::RangeSelectPlanner;
83use crate::region_query::RegionQueryHandlerRef;
84
85#[derive(Clone)]
87pub struct QueryEngineState {
88 df_context: SessionContext,
89 catalog_manager: CatalogManagerRef,
90 dyn_filter_registry_manager: Arc<DynFilterRegistryManager>,
91 function_state: Arc<FunctionState>,
92 scalar_functions: Arc<RwLock<HashMap<String, ScalarFunctionFactory>>>,
93 aggr_functions: Arc<RwLock<HashMap<String, AggregateUDF>>>,
94 table_functions: Arc<RwLock<HashMap<String, Arc<TableFunction>>>>,
95 extension_rules: Vec<Arc<dyn ExtensionAnalyzerRule + Send + Sync>>,
96 plugins: Plugins,
97}
98
99impl fmt::Debug for QueryEngineState {
100 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
101 f.debug_struct("QueryEngineState")
102 .field("state", &self.df_context.state())
103 .finish()
104 }
105}
106
107impl QueryEngineState {
108 #[allow(clippy::too_many_arguments)]
109 pub fn new(
110 catalog_list: CatalogManagerRef,
111 partition_rule_manager: Option<PartitionRuleManagerRef>,
112 region_query_handler: Option<RegionQueryHandlerRef>,
113 table_mutation_handler: Option<TableMutationHandlerRef>,
114 procedure_service_handler: Option<ProcedureServiceHandlerRef>,
115 flow_service_handler: Option<FlowServiceHandlerRef>,
116 with_dist_planner: bool,
117 plugins: Plugins,
118 options: QueryOptionsNew,
119 ) -> Self {
120 let total_memory = get_total_memory_bytes().max(0) as u64;
121 let memory_pool_size = options.memory_pool_size.resolve(total_memory) as usize;
122 let runtime_env = if memory_pool_size > 0 {
123 Arc::new(
124 RuntimeEnvBuilder::new()
125 .with_memory_pool(Arc::new(MetricsMemoryPool::new(memory_pool_size)))
126 .build()
127 .expect("Failed to build RuntimeEnv"),
128 )
129 } else {
130 Arc::new(RuntimeEnv::default())
131 };
132 let mut session_config = SessionConfig::new().with_create_default_catalog_and_schema(false);
133 if options.parallelism > 0 {
134 session_config = session_config.with_target_partitions(options.parallelism);
135 }
136 if options.allow_query_fallback {
137 session_config
138 .options_mut()
139 .extensions
140 .insert(DistPlannerOptions {
141 allow_query_fallback: true,
142 });
143 }
144
145 session_config
148 .options_mut()
149 .execution
150 .skip_physical_aggregate_schema_check = true;
151
152 let mut extension_rules = Vec::new();
154
155 extension_rules.insert(0, Arc::new(TypeConversionRule) as _);
157 extension_rules.push(Arc::new(CountNestAggrRule) as _);
158
159 let mut analyzer = Analyzer::new();
161 analyzer.rules.insert(0, Arc::new(TranscribeAtatRule));
162 analyzer.rules.insert(0, Arc::new(StringNormalizationRule));
163 analyzer
164 .rules
165 .insert(0, Arc::new(CountWildcardToTimeIndexRule));
166 analyzer.rules.push(Arc::new(ConstNormalizationRule));
167
168 analyzer.rules.insert(
172 0,
173 Arc::new(ApplyFunctionRewrites::new(
174 FUNCTION_REGISTRY.function_rewrites(),
175 )),
176 );
177
178 if with_dist_planner {
179 analyzer.rules.push(Arc::new(DistPlannerAnalyzer));
180 }
181 analyzer.rules.push(Arc::new(FixStateUdafOrderingAnalyzer));
182
183 let mut optimizer = Optimizer::new();
184 optimizer.rules.push(Arc::new(ScanHintRule));
185 optimizer.rules.push(Arc::new(JsonTypeConcretizeRule));
186
187 let mut physical_optimizer = PhysicalOptimizer::new();
189 physical_optimizer
191 .rules
192 .insert(5, Arc::new(ParallelizeScan));
193 physical_optimizer
195 .rules
196 .insert(6, Arc::new(PassDistribution));
197 physical_optimizer
199 .rules
200 .insert(7, Arc::new(PromqlTsidNarrowJoin));
201 physical_optimizer.rules.insert(
203 8,
204 Arc::new(datafusion::physical_optimizer::enforce_sorting::EnforceSorting {}),
205 );
206 physical_optimizer
208 .rules
209 .push(Arc::new(WindowedSortPhysicalRule));
210 physical_optimizer
215 .rules
216 .push(Arc::new(MatchesConstantTermOptimizer));
217 physical_optimizer.rules.push(Arc::new(RemoveDuplicate));
219 Self::remove_physical_optimizer_rule(
221 &mut physical_optimizer.rules,
222 SanityCheckPlan {}.name(),
223 );
224 physical_optimizer.rules.push(Arc::new(SanityCheckPlan {}));
225
226 let session_state = SessionStateBuilder::new()
227 .with_config(session_config)
228 .with_runtime_env(runtime_env)
229 .with_default_features()
230 .with_analyzer_rules(analyzer.rules)
231 .with_serializer_registry(Arc::new(DefaultSerializer))
232 .with_query_planner(Arc::new(DfQueryPlanner::new(
233 catalog_list.clone(),
234 partition_rule_manager,
235 region_query_handler.clone(),
236 options.enable_per_region_metrics,
237 )))
238 .with_optimizer_rules(optimizer.rules)
239 .with_physical_optimizer_rules(physical_optimizer.rules)
240 .build();
241
242 let df_context = SessionContext::new_with_state(session_state);
243 register_function_aliases(&df_context);
244
245 Self {
246 df_context,
247 catalog_manager: catalog_list,
248 dyn_filter_registry_manager: Arc::new(DynFilterRegistryManager::default()),
249 function_state: Arc::new(FunctionState {
250 table_mutation_handler,
251 procedure_service_handler,
252 flow_service_handler,
253 }),
254 aggr_functions: Arc::new(RwLock::new(HashMap::new())),
255 table_functions: Arc::new(RwLock::new(HashMap::new())),
256 extension_rules,
257 plugins,
258 scalar_functions: Arc::new(RwLock::new(HashMap::new())),
259 }
260 }
261
262 fn remove_physical_optimizer_rule(
263 rules: &mut Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
264 name: &str,
265 ) {
266 rules.retain(|rule| rule.name() != name);
267 }
268
269 pub fn optimize_by_extension_rules(
271 &self,
272 plan: DfLogicalPlan,
273 context: &QueryEngineContext,
274 ) -> DfResult<DfLogicalPlan> {
275 self.extension_rules
276 .iter()
277 .try_fold(plan, |acc_plan, rule| {
278 rule.analyze(acc_plan, context, self.session_state().config_options())
279 })
280 }
281
282 pub fn optimize_logical_plan(&self, plan: DfLogicalPlan) -> DfResult<DfLogicalPlan> {
284 self.session_state().optimize(&plan)
285 }
286
287 pub fn scalar_function(&self, function_name: &str) -> Option<ScalarFunctionFactory> {
289 self.scalar_functions
290 .read()
291 .unwrap()
292 .get(function_name)
293 .cloned()
294 }
295
296 pub fn scalar_names(&self) -> Vec<String> {
298 self.scalar_functions
299 .read()
300 .unwrap()
301 .keys()
302 .cloned()
303 .collect()
304 }
305
306 pub fn aggr_function(&self, function_name: &str) -> Option<AggregateUDF> {
308 self.aggr_functions
309 .read()
310 .unwrap()
311 .get(function_name)
312 .cloned()
313 }
314
315 pub fn aggr_names(&self) -> Vec<String> {
317 self.aggr_functions
318 .read()
319 .unwrap()
320 .keys()
321 .cloned()
322 .collect()
323 }
324
325 pub fn table_function(&self, function_name: &str) -> Option<Arc<TableFunction>> {
327 self.table_functions
328 .read()
329 .unwrap()
330 .get(function_name)
331 .cloned()
332 }
333
334 pub fn table_function_names(&self) -> Vec<String> {
336 self.table_functions
337 .read()
338 .unwrap()
339 .keys()
340 .cloned()
341 .collect()
342 }
343
344 pub fn register_scalar_function(&self, func: ScalarFunctionFactory) {
347 let name = func.name().to_string();
348 let x = self
349 .scalar_functions
350 .write()
351 .unwrap()
352 .insert(name.clone(), func);
353
354 if x.is_some() {
355 warn!("Already registered scalar function '{name}'");
356 }
357 }
358
359 pub fn register_aggr_function(&self, func: AggregateUDF) {
368 let name = func.name().to_string();
369 let x = self
370 .aggr_functions
371 .write()
372 .unwrap()
373 .insert(name.clone(), func);
374 assert!(
375 x.is_none(),
376 "Already registered aggregate function '{name}'"
377 );
378 }
379
380 pub fn register_table_function(&self, func: Arc<TableFunction>) {
381 let name = func.name();
382 let x = self
383 .table_functions
384 .write()
385 .unwrap()
386 .insert(name.to_string(), func.clone());
387
388 if x.is_some() {
389 warn!("Already registered table function '{name}'");
390 }
391 }
392
393 pub fn register_window_function(&self, func: WindowUDF) {
398 self.df_context.register_udwf(func);
399 }
400
401 pub fn catalog_manager(&self) -> &CatalogManagerRef {
402 &self.catalog_manager
403 }
404
405 pub fn dyn_filter_registry_manager(&self) -> Arc<DynFilterRegistryManager> {
406 self.dyn_filter_registry_manager.clone()
407 }
408
409 pub fn acquire_remote_dyn_filter_registry_lease(
410 &self,
411 query_ctx: &QueryContextRef,
412 ) -> Option<RemoteDynFilterRegistryLease> {
413 let query_id = query_ctx.remote_query_id_value()?;
414 Some(
415 self.dyn_filter_registry_manager
416 .clone()
417 .acquire_lease(query_id),
418 )
419 }
420
421 pub fn function_state(&self) -> Arc<FunctionState> {
422 self.function_state.clone()
423 }
424
425 pub fn table_mutation_handler(&self) -> Option<&TableMutationHandlerRef> {
427 self.function_state.table_mutation_handler.as_ref()
428 }
429
430 pub fn procedure_service_handler(&self) -> Option<&ProcedureServiceHandlerRef> {
432 self.function_state.procedure_service_handler.as_ref()
433 }
434
435 pub(crate) fn disallow_cross_catalog_query(&self) -> bool {
436 self.plugins
437 .map::<QueryOptions, _, _>(|x| x.disallow_cross_catalog_query)
438 .unwrap_or(false)
439 }
440
441 pub fn session_state(&self) -> SessionState {
442 self.df_context.state()
443 }
444
445 pub fn read_table(&self, table: TableRef) -> DfResult<DataFrame> {
447 self.df_context
448 .read_table(Arc::new(DfTableProviderAdapter::new(table)))
449 }
450}
451
452struct DfQueryPlanner {
453 physical_planner: DefaultPhysicalPlanner,
454}
455
456impl fmt::Debug for DfQueryPlanner {
457 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
458 f.debug_struct("DfQueryPlanner").finish()
459 }
460}
461
462#[async_trait]
463impl QueryPlanner for DfQueryPlanner {
464 async fn create_physical_plan(
465 &self,
466 logical_plan: &DfLogicalPlan,
467 session_state: &SessionState,
468 ) -> DfResult<Arc<dyn ExecutionPlan>> {
469 self.physical_planner
470 .create_physical_plan(logical_plan, session_state)
471 .await
472 }
473}
474
475const SCALAR_FUNCTION_ALIASES: &[(&str, &str)] = &[
477 ("upper", "ucase"),
478 ("lower", "lcase"),
479 ("ceil", "ceiling"),
480 ("substr", "mid"),
481 ("random", "rand"),
482];
483
484const AGGREGATE_FUNCTION_ALIASES: &[(&str, &str)] =
486 &[("stddev_pop", "std"), ("var_pop", "variance")];
487
488fn register_function_aliases(ctx: &SessionContext) {
493 let state = ctx.state();
494
495 for (target, alias) in SCALAR_FUNCTION_ALIASES {
496 if let Some(func) = state.scalar_functions().get(*target) {
497 let aliased = func.as_ref().clone().with_aliases([*alias]);
498 ctx.register_udf(aliased);
499 }
500 }
501
502 for (target, alias) in AGGREGATE_FUNCTION_ALIASES {
503 if let Some(func) = state.aggregate_functions().get(*target) {
504 let aliased = func.as_ref().clone().with_aliases([*alias]);
505 ctx.register_udaf(aliased);
506 }
507 }
508}
509
510impl DfQueryPlanner {
511 fn new(
512 catalog_manager: CatalogManagerRef,
513 partition_rule_manager: Option<PartitionRuleManagerRef>,
514 region_query_handler: Option<RegionQueryHandlerRef>,
515 enable_per_region_metrics: bool,
516 ) -> Self {
517 let mut planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>> = vec![
518 Arc::new(PromExtensionPlanner),
519 Arc::new(RangeSelectPlanner),
520 Arc::new(RemoteDynFilterReceiverExtensionPlanner),
521 ];
522 if let (Some(region_query_handler), Some(partition_rule_manager)) =
523 (region_query_handler, partition_rule_manager)
524 {
525 planners.push(Arc::new(DistExtensionPlanner::new(
526 catalog_manager,
527 partition_rule_manager,
528 region_query_handler,
529 enable_per_region_metrics,
530 )));
531 planners.push(Arc::new(MergeSortExtensionPlanner {}));
532 }
533 Self {
534 physical_planner: DefaultPhysicalPlanner::with_extension_planners(planners),
535 }
536 }
537}
538
539#[derive(Debug)]
544struct MetricsMemoryPool {
545 inner: Arc<TrackConsumersPool<GreedyMemoryPool>>,
546}
547
548impl MetricsMemoryPool {
549 const TOP_CONSUMERS_TO_REPORT: usize = 5;
551
552 fn new(limit: usize) -> Self {
553 Self {
554 inner: Arc::new(TrackConsumersPool::new(
555 GreedyMemoryPool::new(limit),
556 NonZeroUsize::new(Self::TOP_CONSUMERS_TO_REPORT).unwrap(),
557 )),
558 }
559 }
560
561 #[inline]
562 fn update_metrics(&self) {
563 QUERY_MEMORY_POOL_USAGE_BYTES.set(self.inner.reserved() as i64);
564 }
565}
566
567impl MemoryPool for MetricsMemoryPool {
568 fn register(&self, consumer: &MemoryConsumer) {
569 self.inner.register(consumer);
570 }
571
572 fn unregister(&self, consumer: &MemoryConsumer) {
573 self.inner.unregister(consumer);
574 }
575
576 fn grow(&self, reservation: &MemoryReservation, additional: usize) {
577 self.inner.grow(reservation, additional);
578 self.update_metrics();
579 }
580
581 fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
582 self.inner.shrink(reservation, shrink);
583 self.update_metrics();
584 }
585
586 fn try_grow(
587 &self,
588 reservation: &MemoryReservation,
589 additional: usize,
590 ) -> datafusion_common::Result<()> {
591 let result = self.inner.try_grow(reservation, additional);
592 if result.is_err() {
593 QUERY_MEMORY_POOL_REJECTED_TOTAL.inc();
594 }
595 self.update_metrics();
596 result
597 }
598
599 fn reserved(&self) -> usize {
600 self.inner.reserved()
601 }
602
603 fn memory_limit(&self) -> MemoryLimit {
604 self.inner.memory_limit()
605 }
606}
607
608#[cfg(test)]
609mod tests {
610 use common_base::Plugins;
611 use session::context::QueryContext;
612
613 use super::*;
614 use crate::options::QueryOptions;
615
616 fn new_query_engine_state() -> QueryEngineState {
617 QueryEngineState::new(
618 catalog::memory::new_memory_catalog_manager().unwrap(),
619 None,
620 None,
621 None,
622 None,
623 None,
624 false,
625 Plugins::default(),
626 QueryOptions::default(),
627 )
628 }
629
630 #[test]
631 fn query_engine_state_reuses_query_scoped_dyn_filter_registry_lease() {
632 let state = new_query_engine_state();
633 let query_ctx = QueryContext::arc();
634
635 let first = state
636 .acquire_remote_dyn_filter_registry_lease(&query_ctx)
637 .unwrap();
638 let second = state
639 .acquire_remote_dyn_filter_registry_lease(&query_ctx)
640 .unwrap();
641
642 assert!(first.ptr_eq(&second));
643 assert_eq!(state.dyn_filter_registry_manager().registry_count(), 1);
644 assert_eq!(
645 first.registry().query_id(),
646 query_ctx.remote_query_id_value().unwrap()
647 );
648 }
649
650 #[test]
651 fn query_engine_state_relies_on_query_context_remote_query_id_contract() {
652 let state = new_query_engine_state();
653 let query_ctx = QueryContext::arc();
654
655 assert!(query_ctx.remote_query_id_value().is_some());
656
657 let lease = state
658 .acquire_remote_dyn_filter_registry_lease(&query_ctx)
659 .unwrap();
660
661 assert_eq!(
662 lease.registry().query_id(),
663 query_ctx.remote_query_id_value().unwrap()
664 );
665 assert_eq!(state.dyn_filter_registry_manager().registry_count(), 1);
666 }
667
668 #[test]
669 fn query_engine_state_separates_registries_for_different_query_contexts() {
670 let state = new_query_engine_state();
671 let first_query_ctx = QueryContext::arc();
672 let second_query_ctx = QueryContext::arc();
673
674 let first = state
675 .acquire_remote_dyn_filter_registry_lease(&first_query_ctx)
676 .unwrap();
677 let second = state
678 .acquire_remote_dyn_filter_registry_lease(&second_query_ctx)
679 .unwrap();
680
681 assert!(!first.ptr_eq(&second));
682 assert_eq!(state.dyn_filter_registry_manager().registry_count(), 2);
683 assert_eq!(
684 first.registry().query_id(),
685 first_query_ctx.remote_query_id_value().unwrap()
686 );
687 assert_eq!(
688 second.registry().query_id(),
689 second_query_ctx.remote_query_id_value().unwrap()
690 );
691 }
692}