1use std::collections::HashMap;
16use std::fmt;
17use std::num::NonZeroUsize;
18use std::sync::{Arc, RwLock};
19
20use async_trait::async_trait;
21use catalog::CatalogManagerRef;
22use common_base::Plugins;
23use common_function::aggrs::aggr_wrapper::fix_order::FixStateUdafOrderingAnalyzer;
24use common_function::function_factory::ScalarFunctionFactory;
25use common_function::function_registry::FUNCTION_REGISTRY;
26use common_function::handlers::{
27 FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef,
28};
29use common_function::state::FunctionState;
30use common_stat::get_total_memory_bytes;
31use common_telemetry::warn;
32use datafusion::catalog::TableFunction;
33use datafusion::dataframe::DataFrame;
34use datafusion::error::Result as DfResult;
35use datafusion::execution::SessionStateBuilder;
36use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionContext, SessionState};
37use datafusion::execution::memory_pool::{
38 GreedyMemoryPool, MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation,
39 TrackConsumersPool,
40};
41use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder};
42use datafusion::physical_optimizer::PhysicalOptimizerRule;
43use datafusion::physical_optimizer::optimizer::PhysicalOptimizer;
44use datafusion::physical_optimizer::sanity_checker::SanityCheckPlan;
45use datafusion::physical_plan::ExecutionPlan;
46use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner};
47use datafusion_expr::{AggregateUDF, LogicalPlan as DfLogicalPlan, WindowUDF};
48use datafusion_optimizer::Analyzer;
49use datafusion_optimizer::analyzer::function_rewrite::ApplyFunctionRewrites;
50use datafusion_optimizer::optimizer::Optimizer;
51use partition::manager::PartitionRuleManagerRef;
52use promql::extension_plan::PromExtensionPlanner;
53use table::TableRef;
54use table::table::adapter::DfTableProviderAdapter;
55
56use crate::QueryEngineContext;
57use crate::dist_plan::{
58 DistExtensionPlanner, DistPlannerAnalyzer, DistPlannerOptions, MergeSortExtensionPlanner,
59};
60use crate::metrics::{QUERY_MEMORY_POOL_REJECTED_TOTAL, QUERY_MEMORY_POOL_USAGE_BYTES};
61use crate::optimizer::ExtensionAnalyzerRule;
62use crate::optimizer::constant_term::MatchesConstantTermOptimizer;
63use crate::optimizer::count_wildcard::CountWildcardToTimeIndexRule;
64use crate::optimizer::parallelize_scan::ParallelizeScan;
65use crate::optimizer::pass_distribution::PassDistribution;
66use crate::optimizer::remove_duplicate::RemoveDuplicate;
67use crate::optimizer::scan_hint::ScanHintRule;
68use crate::optimizer::string_normalization::StringNormalizationRule;
69use crate::optimizer::transcribe_atat::TranscribeAtatRule;
70use crate::optimizer::type_conversion::TypeConversionRule;
71use crate::optimizer::windowed_sort::WindowedSortPhysicalRule;
72use crate::options::QueryOptions as QueryOptionsNew;
73use crate::query_engine::DefaultSerializer;
74use crate::query_engine::options::QueryOptions;
75use crate::range_select::planner::RangeSelectPlanner;
76use crate::region_query::RegionQueryHandlerRef;
77
78#[derive(Clone)]
80pub struct QueryEngineState {
81 df_context: SessionContext,
82 catalog_manager: CatalogManagerRef,
83 function_state: Arc<FunctionState>,
84 scalar_functions: Arc<RwLock<HashMap<String, ScalarFunctionFactory>>>,
85 aggr_functions: Arc<RwLock<HashMap<String, AggregateUDF>>>,
86 table_functions: Arc<RwLock<HashMap<String, Arc<TableFunction>>>>,
87 extension_rules: Vec<Arc<dyn ExtensionAnalyzerRule + Send + Sync>>,
88 plugins: Plugins,
89}
90
91impl fmt::Debug for QueryEngineState {
92 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
93 f.debug_struct("QueryEngineState")
94 .field("state", &self.df_context.state())
95 .finish()
96 }
97}
98
99impl QueryEngineState {
100 #[allow(clippy::too_many_arguments)]
101 pub fn new(
102 catalog_list: CatalogManagerRef,
103 partition_rule_manager: Option<PartitionRuleManagerRef>,
104 region_query_handler: Option<RegionQueryHandlerRef>,
105 table_mutation_handler: Option<TableMutationHandlerRef>,
106 procedure_service_handler: Option<ProcedureServiceHandlerRef>,
107 flow_service_handler: Option<FlowServiceHandlerRef>,
108 with_dist_planner: bool,
109 plugins: Plugins,
110 options: QueryOptionsNew,
111 ) -> Self {
112 let total_memory = get_total_memory_bytes().max(0) as u64;
113 let memory_pool_size = options.memory_pool_size.resolve(total_memory) as usize;
114 let runtime_env = if memory_pool_size > 0 {
115 Arc::new(
116 RuntimeEnvBuilder::new()
117 .with_memory_pool(Arc::new(MetricsMemoryPool::new(memory_pool_size)))
118 .build()
119 .expect("Failed to build RuntimeEnv"),
120 )
121 } else {
122 Arc::new(RuntimeEnv::default())
123 };
124 let mut session_config = SessionConfig::new().with_create_default_catalog_and_schema(false);
125 if options.parallelism > 0 {
126 session_config = session_config.with_target_partitions(options.parallelism);
127 }
128 if options.allow_query_fallback {
129 session_config
130 .options_mut()
131 .extensions
132 .insert(DistPlannerOptions {
133 allow_query_fallback: true,
134 });
135 }
136
137 session_config
140 .options_mut()
141 .execution
142 .skip_physical_aggregate_schema_check = true;
143
144 let mut extension_rules = Vec::new();
146
147 extension_rules.insert(0, Arc::new(TypeConversionRule) as _);
149
150 let mut analyzer = Analyzer::new();
152 analyzer.rules.insert(0, Arc::new(TranscribeAtatRule));
153 analyzer.rules.insert(0, Arc::new(StringNormalizationRule));
154 analyzer
155 .rules
156 .insert(0, Arc::new(CountWildcardToTimeIndexRule));
157
158 analyzer.rules.insert(
162 0,
163 Arc::new(ApplyFunctionRewrites::new(
164 FUNCTION_REGISTRY.function_rewrites(),
165 )),
166 );
167
168 if with_dist_planner {
169 analyzer.rules.push(Arc::new(DistPlannerAnalyzer));
170 }
171 analyzer.rules.push(Arc::new(FixStateUdafOrderingAnalyzer));
172
173 let mut optimizer = Optimizer::new();
174 optimizer.rules.push(Arc::new(ScanHintRule));
175
176 let mut physical_optimizer = PhysicalOptimizer::new();
178 physical_optimizer
180 .rules
181 .insert(5, Arc::new(ParallelizeScan));
182 physical_optimizer
184 .rules
185 .insert(6, Arc::new(PassDistribution));
186 physical_optimizer.rules.insert(
188 7,
189 Arc::new(datafusion::physical_optimizer::enforce_sorting::EnforceSorting {}),
190 );
191 physical_optimizer
193 .rules
194 .push(Arc::new(WindowedSortPhysicalRule));
195 physical_optimizer
196 .rules
197 .push(Arc::new(MatchesConstantTermOptimizer));
198 physical_optimizer.rules.push(Arc::new(RemoveDuplicate));
200 Self::remove_physical_optimizer_rule(
202 &mut physical_optimizer.rules,
203 SanityCheckPlan {}.name(),
204 );
205 physical_optimizer.rules.push(Arc::new(SanityCheckPlan {}));
206
207 let session_state = SessionStateBuilder::new()
208 .with_config(session_config)
209 .with_runtime_env(runtime_env)
210 .with_default_features()
211 .with_analyzer_rules(analyzer.rules)
212 .with_serializer_registry(Arc::new(DefaultSerializer))
213 .with_query_planner(Arc::new(DfQueryPlanner::new(
214 catalog_list.clone(),
215 partition_rule_manager,
216 region_query_handler,
217 )))
218 .with_optimizer_rules(optimizer.rules)
219 .with_physical_optimizer_rules(physical_optimizer.rules)
220 .build();
221
222 let df_context = SessionContext::new_with_state(session_state);
223 register_function_aliases(&df_context);
224
225 Self {
226 df_context,
227 catalog_manager: catalog_list,
228 function_state: Arc::new(FunctionState {
229 table_mutation_handler,
230 procedure_service_handler,
231 flow_service_handler,
232 }),
233 aggr_functions: Arc::new(RwLock::new(HashMap::new())),
234 table_functions: Arc::new(RwLock::new(HashMap::new())),
235 extension_rules,
236 plugins,
237 scalar_functions: Arc::new(RwLock::new(HashMap::new())),
238 }
239 }
240
241 fn remove_physical_optimizer_rule(
242 rules: &mut Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
243 name: &str,
244 ) {
245 rules.retain(|rule| rule.name() != name);
246 }
247
248 pub fn optimize_by_extension_rules(
250 &self,
251 plan: DfLogicalPlan,
252 context: &QueryEngineContext,
253 ) -> DfResult<DfLogicalPlan> {
254 self.extension_rules
255 .iter()
256 .try_fold(plan, |acc_plan, rule| {
257 rule.analyze(acc_plan, context, self.session_state().config_options())
258 })
259 }
260
261 pub fn optimize_logical_plan(&self, plan: DfLogicalPlan) -> DfResult<DfLogicalPlan> {
263 self.session_state().optimize(&plan)
264 }
265
266 pub fn scalar_function(&self, function_name: &str) -> Option<ScalarFunctionFactory> {
268 self.scalar_functions
269 .read()
270 .unwrap()
271 .get(function_name)
272 .cloned()
273 }
274
275 pub fn scalar_names(&self) -> Vec<String> {
277 self.scalar_functions
278 .read()
279 .unwrap()
280 .keys()
281 .cloned()
282 .collect()
283 }
284
285 pub fn aggr_function(&self, function_name: &str) -> Option<AggregateUDF> {
287 self.aggr_functions
288 .read()
289 .unwrap()
290 .get(function_name)
291 .cloned()
292 }
293
294 pub fn aggr_names(&self) -> Vec<String> {
296 self.aggr_functions
297 .read()
298 .unwrap()
299 .keys()
300 .cloned()
301 .collect()
302 }
303
304 pub fn table_function(&self, function_name: &str) -> Option<Arc<TableFunction>> {
306 self.table_functions
307 .read()
308 .unwrap()
309 .get(function_name)
310 .cloned()
311 }
312
313 pub fn table_function_names(&self) -> Vec<String> {
315 self.table_functions
316 .read()
317 .unwrap()
318 .keys()
319 .cloned()
320 .collect()
321 }
322
323 pub fn register_scalar_function(&self, func: ScalarFunctionFactory) {
326 let name = func.name().to_string();
327 let x = self
328 .scalar_functions
329 .write()
330 .unwrap()
331 .insert(name.clone(), func);
332
333 if x.is_some() {
334 warn!("Already registered scalar function '{name}'");
335 }
336 }
337
338 pub fn register_aggr_function(&self, func: AggregateUDF) {
347 let name = func.name().to_string();
348 let x = self
349 .aggr_functions
350 .write()
351 .unwrap()
352 .insert(name.clone(), func);
353 assert!(
354 x.is_none(),
355 "Already registered aggregate function '{name}'"
356 );
357 }
358
359 pub fn register_table_function(&self, func: Arc<TableFunction>) {
360 let name = func.name();
361 let x = self
362 .table_functions
363 .write()
364 .unwrap()
365 .insert(name.to_string(), func.clone());
366
367 if x.is_some() {
368 warn!("Already registered table function '{name}'");
369 }
370 }
371
372 pub fn register_window_function(&self, func: WindowUDF) {
377 self.df_context.register_udwf(func);
378 }
379
380 pub fn catalog_manager(&self) -> &CatalogManagerRef {
381 &self.catalog_manager
382 }
383
384 pub fn function_state(&self) -> Arc<FunctionState> {
385 self.function_state.clone()
386 }
387
388 pub fn table_mutation_handler(&self) -> Option<&TableMutationHandlerRef> {
390 self.function_state.table_mutation_handler.as_ref()
391 }
392
393 pub fn procedure_service_handler(&self) -> Option<&ProcedureServiceHandlerRef> {
395 self.function_state.procedure_service_handler.as_ref()
396 }
397
398 pub(crate) fn disallow_cross_catalog_query(&self) -> bool {
399 self.plugins
400 .map::<QueryOptions, _, _>(|x| x.disallow_cross_catalog_query)
401 .unwrap_or(false)
402 }
403
404 pub fn session_state(&self) -> SessionState {
405 self.df_context.state()
406 }
407
408 pub fn read_table(&self, table: TableRef) -> DfResult<DataFrame> {
410 self.df_context
411 .read_table(Arc::new(DfTableProviderAdapter::new(table)))
412 }
413}
414
415struct DfQueryPlanner {
416 physical_planner: DefaultPhysicalPlanner,
417}
418
419impl fmt::Debug for DfQueryPlanner {
420 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
421 f.debug_struct("DfQueryPlanner").finish()
422 }
423}
424
425#[async_trait]
426impl QueryPlanner for DfQueryPlanner {
427 async fn create_physical_plan(
428 &self,
429 logical_plan: &DfLogicalPlan,
430 session_state: &SessionState,
431 ) -> DfResult<Arc<dyn ExecutionPlan>> {
432 self.physical_planner
433 .create_physical_plan(logical_plan, session_state)
434 .await
435 }
436}
437
438const SCALAR_FUNCTION_ALIASES: &[(&str, &str)] = &[
440 ("upper", "ucase"),
441 ("lower", "lcase"),
442 ("ceil", "ceiling"),
443 ("substr", "mid"),
444 ("random", "rand"),
445];
446
447const AGGREGATE_FUNCTION_ALIASES: &[(&str, &str)] =
449 &[("stddev_pop", "std"), ("var_pop", "variance")];
450
451fn register_function_aliases(ctx: &SessionContext) {
456 let state = ctx.state();
457
458 for (target, alias) in SCALAR_FUNCTION_ALIASES {
459 if let Some(func) = state.scalar_functions().get(*target) {
460 let aliased = func.as_ref().clone().with_aliases([*alias]);
461 ctx.register_udf(aliased);
462 }
463 }
464
465 for (target, alias) in AGGREGATE_FUNCTION_ALIASES {
466 if let Some(func) = state.aggregate_functions().get(*target) {
467 let aliased = func.as_ref().clone().with_aliases([*alias]);
468 ctx.register_udaf(aliased);
469 }
470 }
471}
472
473impl DfQueryPlanner {
474 fn new(
475 catalog_manager: CatalogManagerRef,
476 partition_rule_manager: Option<PartitionRuleManagerRef>,
477 region_query_handler: Option<RegionQueryHandlerRef>,
478 ) -> Self {
479 let mut planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>> =
480 vec![Arc::new(PromExtensionPlanner), Arc::new(RangeSelectPlanner)];
481 if let (Some(region_query_handler), Some(partition_rule_manager)) =
482 (region_query_handler, partition_rule_manager)
483 {
484 planners.push(Arc::new(DistExtensionPlanner::new(
485 catalog_manager,
486 partition_rule_manager,
487 region_query_handler,
488 )));
489 planners.push(Arc::new(MergeSortExtensionPlanner {}));
490 }
491 Self {
492 physical_planner: DefaultPhysicalPlanner::with_extension_planners(planners),
493 }
494 }
495}
496
497#[derive(Debug)]
502struct MetricsMemoryPool {
503 inner: Arc<TrackConsumersPool<GreedyMemoryPool>>,
504}
505
506impl MetricsMemoryPool {
507 const TOP_CONSUMERS_TO_REPORT: usize = 5;
509
510 fn new(limit: usize) -> Self {
511 Self {
512 inner: Arc::new(TrackConsumersPool::new(
513 GreedyMemoryPool::new(limit),
514 NonZeroUsize::new(Self::TOP_CONSUMERS_TO_REPORT).unwrap(),
515 )),
516 }
517 }
518
519 #[inline]
520 fn update_metrics(&self) {
521 QUERY_MEMORY_POOL_USAGE_BYTES.set(self.inner.reserved() as i64);
522 }
523}
524
525impl MemoryPool for MetricsMemoryPool {
526 fn register(&self, consumer: &MemoryConsumer) {
527 self.inner.register(consumer);
528 }
529
530 fn unregister(&self, consumer: &MemoryConsumer) {
531 self.inner.unregister(consumer);
532 }
533
534 fn grow(&self, reservation: &MemoryReservation, additional: usize) {
535 self.inner.grow(reservation, additional);
536 self.update_metrics();
537 }
538
539 fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
540 self.inner.shrink(reservation, shrink);
541 self.update_metrics();
542 }
543
544 fn try_grow(
545 &self,
546 reservation: &MemoryReservation,
547 additional: usize,
548 ) -> datafusion_common::Result<()> {
549 let result = self.inner.try_grow(reservation, additional);
550 if result.is_err() {
551 QUERY_MEMORY_POOL_REJECTED_TOTAL.inc();
552 }
553 self.update_metrics();
554 result
555 }
556
557 fn reserved(&self) -> usize {
558 self.inner.reserved()
559 }
560
561 fn memory_limit(&self) -> MemoryLimit {
562 self.inner.memory_limit()
563 }
564}