1use std::collections::HashMap;
16use std::fmt;
17use std::num::NonZeroUsize;
18use std::sync::{Arc, RwLock};
19
20use async_trait::async_trait;
21use catalog::CatalogManagerRef;
22use common_base::Plugins;
23use common_function::aggrs::aggr_wrapper::fix_order::FixStateUdafOrderingAnalyzer;
24use common_function::function_factory::ScalarFunctionFactory;
25use common_function::function_registry::FUNCTION_REGISTRY;
26use common_function::handlers::{
27 FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef,
28};
29use common_function::state::FunctionState;
30use common_stat::get_total_memory_bytes;
31use common_telemetry::warn;
32use datafusion::catalog::TableFunction;
33use datafusion::dataframe::DataFrame;
34use datafusion::error::Result as DfResult;
35use datafusion::execution::SessionStateBuilder;
36use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionContext, SessionState};
37use datafusion::execution::memory_pool::{
38 GreedyMemoryPool, MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation,
39 TrackConsumersPool,
40};
41use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder};
42use datafusion::physical_optimizer::PhysicalOptimizerRule;
43use datafusion::physical_optimizer::optimizer::PhysicalOptimizer;
44use datafusion::physical_optimizer::sanity_checker::SanityCheckPlan;
45use datafusion::physical_plan::ExecutionPlan;
46use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner};
47use datafusion_expr::{AggregateUDF, LogicalPlan as DfLogicalPlan, WindowUDF};
48use datafusion_optimizer::Analyzer;
49use datafusion_optimizer::analyzer::function_rewrite::ApplyFunctionRewrites;
50use datafusion_optimizer::optimizer::Optimizer;
51use partition::manager::PartitionRuleManagerRef;
52use promql::extension_plan::PromExtensionPlanner;
53use table::TableRef;
54use table::table::adapter::DfTableProviderAdapter;
55
56use crate::QueryEngineContext;
57use crate::dist_plan::{
58 DistExtensionPlanner, DistPlannerAnalyzer, DistPlannerOptions, MergeSortExtensionPlanner,
59};
60use crate::metrics::{QUERY_MEMORY_POOL_REJECTED_TOTAL, QUERY_MEMORY_POOL_USAGE_BYTES};
61use crate::optimizer::ExtensionAnalyzerRule;
62use crate::optimizer::constant_term::MatchesConstantTermOptimizer;
63use crate::optimizer::count_nest_aggr::CountNestAggrRule;
64use crate::optimizer::count_wildcard::CountWildcardToTimeIndexRule;
65use crate::optimizer::json_type_concretize::JsonTypeConcretizeRule;
66use crate::optimizer::parallelize_scan::ParallelizeScan;
67use crate::optimizer::pass_distribution::PassDistribution;
68use crate::optimizer::remove_duplicate::RemoveDuplicate;
69use crate::optimizer::scan_hint::ScanHintRule;
70use crate::optimizer::string_normalization::StringNormalizationRule;
71use crate::optimizer::transcribe_atat::TranscribeAtatRule;
72use crate::optimizer::type_conversion::TypeConversionRule;
73use crate::optimizer::windowed_sort::WindowedSortPhysicalRule;
74use crate::options::QueryOptions as QueryOptionsNew;
75use crate::query_engine::DefaultSerializer;
76use crate::query_engine::options::QueryOptions;
77use crate::range_select::planner::RangeSelectPlanner;
78use crate::region_query::RegionQueryHandlerRef;
79
80#[derive(Clone)]
82pub struct QueryEngineState {
83 df_context: SessionContext,
84 catalog_manager: CatalogManagerRef,
85 function_state: Arc<FunctionState>,
86 scalar_functions: Arc<RwLock<HashMap<String, ScalarFunctionFactory>>>,
87 aggr_functions: Arc<RwLock<HashMap<String, AggregateUDF>>>,
88 table_functions: Arc<RwLock<HashMap<String, Arc<TableFunction>>>>,
89 extension_rules: Vec<Arc<dyn ExtensionAnalyzerRule + Send + Sync>>,
90 plugins: Plugins,
91}
92
93impl fmt::Debug for QueryEngineState {
94 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
95 f.debug_struct("QueryEngineState")
96 .field("state", &self.df_context.state())
97 .finish()
98 }
99}
100
101impl QueryEngineState {
102 #[allow(clippy::too_many_arguments)]
103 pub fn new(
104 catalog_list: CatalogManagerRef,
105 partition_rule_manager: Option<PartitionRuleManagerRef>,
106 region_query_handler: Option<RegionQueryHandlerRef>,
107 table_mutation_handler: Option<TableMutationHandlerRef>,
108 procedure_service_handler: Option<ProcedureServiceHandlerRef>,
109 flow_service_handler: Option<FlowServiceHandlerRef>,
110 with_dist_planner: bool,
111 plugins: Plugins,
112 options: QueryOptionsNew,
113 ) -> Self {
114 let total_memory = get_total_memory_bytes().max(0) as u64;
115 let memory_pool_size = options.memory_pool_size.resolve(total_memory) as usize;
116 let runtime_env = if memory_pool_size > 0 {
117 Arc::new(
118 RuntimeEnvBuilder::new()
119 .with_memory_pool(Arc::new(MetricsMemoryPool::new(memory_pool_size)))
120 .build()
121 .expect("Failed to build RuntimeEnv"),
122 )
123 } else {
124 Arc::new(RuntimeEnv::default())
125 };
126 let mut session_config = SessionConfig::new().with_create_default_catalog_and_schema(false);
127 if options.parallelism > 0 {
128 session_config = session_config.with_target_partitions(options.parallelism);
129 }
130 if options.allow_query_fallback {
131 session_config
132 .options_mut()
133 .extensions
134 .insert(DistPlannerOptions {
135 allow_query_fallback: true,
136 });
137 }
138
139 session_config
142 .options_mut()
143 .execution
144 .skip_physical_aggregate_schema_check = true;
145
146 let mut extension_rules = Vec::new();
148
149 extension_rules.insert(0, Arc::new(TypeConversionRule) as _);
151 extension_rules.push(Arc::new(CountNestAggrRule) as _);
152
153 let mut analyzer = Analyzer::new();
155 analyzer.rules.insert(0, Arc::new(TranscribeAtatRule));
156 analyzer.rules.insert(0, Arc::new(StringNormalizationRule));
157 analyzer
158 .rules
159 .insert(0, Arc::new(CountWildcardToTimeIndexRule));
160
161 analyzer.rules.insert(
165 0,
166 Arc::new(ApplyFunctionRewrites::new(
167 FUNCTION_REGISTRY.function_rewrites(),
168 )),
169 );
170
171 if with_dist_planner {
172 analyzer.rules.push(Arc::new(DistPlannerAnalyzer));
173 }
174 analyzer.rules.push(Arc::new(FixStateUdafOrderingAnalyzer));
175
176 let mut optimizer = Optimizer::new();
177 optimizer.rules.push(Arc::new(ScanHintRule));
178 optimizer.rules.push(Arc::new(JsonTypeConcretizeRule));
179
180 let mut physical_optimizer = PhysicalOptimizer::new();
182 physical_optimizer
184 .rules
185 .insert(5, Arc::new(ParallelizeScan));
186 physical_optimizer
188 .rules
189 .insert(6, Arc::new(PassDistribution));
190 physical_optimizer.rules.insert(
192 7,
193 Arc::new(datafusion::physical_optimizer::enforce_sorting::EnforceSorting {}),
194 );
195 physical_optimizer
197 .rules
198 .push(Arc::new(WindowedSortPhysicalRule));
199 physical_optimizer
204 .rules
205 .push(Arc::new(MatchesConstantTermOptimizer));
206 physical_optimizer.rules.push(Arc::new(RemoveDuplicate));
208 Self::remove_physical_optimizer_rule(
210 &mut physical_optimizer.rules,
211 SanityCheckPlan {}.name(),
212 );
213 physical_optimizer.rules.push(Arc::new(SanityCheckPlan {}));
214
215 let session_state = SessionStateBuilder::new()
216 .with_config(session_config)
217 .with_runtime_env(runtime_env)
218 .with_default_features()
219 .with_analyzer_rules(analyzer.rules)
220 .with_serializer_registry(Arc::new(DefaultSerializer))
221 .with_query_planner(Arc::new(DfQueryPlanner::new(
222 catalog_list.clone(),
223 partition_rule_manager,
224 region_query_handler,
225 )))
226 .with_optimizer_rules(optimizer.rules)
227 .with_physical_optimizer_rules(physical_optimizer.rules)
228 .build();
229
230 let df_context = SessionContext::new_with_state(session_state);
231 register_function_aliases(&df_context);
232
233 Self {
234 df_context,
235 catalog_manager: catalog_list,
236 function_state: Arc::new(FunctionState {
237 table_mutation_handler,
238 procedure_service_handler,
239 flow_service_handler,
240 }),
241 aggr_functions: Arc::new(RwLock::new(HashMap::new())),
242 table_functions: Arc::new(RwLock::new(HashMap::new())),
243 extension_rules,
244 plugins,
245 scalar_functions: Arc::new(RwLock::new(HashMap::new())),
246 }
247 }
248
249 fn remove_physical_optimizer_rule(
250 rules: &mut Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
251 name: &str,
252 ) {
253 rules.retain(|rule| rule.name() != name);
254 }
255
256 pub fn optimize_by_extension_rules(
258 &self,
259 plan: DfLogicalPlan,
260 context: &QueryEngineContext,
261 ) -> DfResult<DfLogicalPlan> {
262 self.extension_rules
263 .iter()
264 .try_fold(plan, |acc_plan, rule| {
265 rule.analyze(acc_plan, context, self.session_state().config_options())
266 })
267 }
268
269 pub fn optimize_logical_plan(&self, plan: DfLogicalPlan) -> DfResult<DfLogicalPlan> {
271 self.session_state().optimize(&plan)
272 }
273
274 pub fn scalar_function(&self, function_name: &str) -> Option<ScalarFunctionFactory> {
276 self.scalar_functions
277 .read()
278 .unwrap()
279 .get(function_name)
280 .cloned()
281 }
282
283 pub fn scalar_names(&self) -> Vec<String> {
285 self.scalar_functions
286 .read()
287 .unwrap()
288 .keys()
289 .cloned()
290 .collect()
291 }
292
293 pub fn aggr_function(&self, function_name: &str) -> Option<AggregateUDF> {
295 self.aggr_functions
296 .read()
297 .unwrap()
298 .get(function_name)
299 .cloned()
300 }
301
302 pub fn aggr_names(&self) -> Vec<String> {
304 self.aggr_functions
305 .read()
306 .unwrap()
307 .keys()
308 .cloned()
309 .collect()
310 }
311
312 pub fn table_function(&self, function_name: &str) -> Option<Arc<TableFunction>> {
314 self.table_functions
315 .read()
316 .unwrap()
317 .get(function_name)
318 .cloned()
319 }
320
321 pub fn table_function_names(&self) -> Vec<String> {
323 self.table_functions
324 .read()
325 .unwrap()
326 .keys()
327 .cloned()
328 .collect()
329 }
330
331 pub fn register_scalar_function(&self, func: ScalarFunctionFactory) {
334 let name = func.name().to_string();
335 let x = self
336 .scalar_functions
337 .write()
338 .unwrap()
339 .insert(name.clone(), func);
340
341 if x.is_some() {
342 warn!("Already registered scalar function '{name}'");
343 }
344 }
345
346 pub fn register_aggr_function(&self, func: AggregateUDF) {
355 let name = func.name().to_string();
356 let x = self
357 .aggr_functions
358 .write()
359 .unwrap()
360 .insert(name.clone(), func);
361 assert!(
362 x.is_none(),
363 "Already registered aggregate function '{name}'"
364 );
365 }
366
367 pub fn register_table_function(&self, func: Arc<TableFunction>) {
368 let name = func.name();
369 let x = self
370 .table_functions
371 .write()
372 .unwrap()
373 .insert(name.to_string(), func.clone());
374
375 if x.is_some() {
376 warn!("Already registered table function '{name}'");
377 }
378 }
379
380 pub fn register_window_function(&self, func: WindowUDF) {
385 self.df_context.register_udwf(func);
386 }
387
388 pub fn catalog_manager(&self) -> &CatalogManagerRef {
389 &self.catalog_manager
390 }
391
392 pub fn function_state(&self) -> Arc<FunctionState> {
393 self.function_state.clone()
394 }
395
396 pub fn table_mutation_handler(&self) -> Option<&TableMutationHandlerRef> {
398 self.function_state.table_mutation_handler.as_ref()
399 }
400
401 pub fn procedure_service_handler(&self) -> Option<&ProcedureServiceHandlerRef> {
403 self.function_state.procedure_service_handler.as_ref()
404 }
405
406 pub(crate) fn disallow_cross_catalog_query(&self) -> bool {
407 self.plugins
408 .map::<QueryOptions, _, _>(|x| x.disallow_cross_catalog_query)
409 .unwrap_or(false)
410 }
411
412 pub fn session_state(&self) -> SessionState {
413 self.df_context.state()
414 }
415
416 pub fn read_table(&self, table: TableRef) -> DfResult<DataFrame> {
418 self.df_context
419 .read_table(Arc::new(DfTableProviderAdapter::new(table)))
420 }
421}
422
423struct DfQueryPlanner {
424 physical_planner: DefaultPhysicalPlanner,
425}
426
427impl fmt::Debug for DfQueryPlanner {
428 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
429 f.debug_struct("DfQueryPlanner").finish()
430 }
431}
432
433#[async_trait]
434impl QueryPlanner for DfQueryPlanner {
435 async fn create_physical_plan(
436 &self,
437 logical_plan: &DfLogicalPlan,
438 session_state: &SessionState,
439 ) -> DfResult<Arc<dyn ExecutionPlan>> {
440 self.physical_planner
441 .create_physical_plan(logical_plan, session_state)
442 .await
443 }
444}
445
446const SCALAR_FUNCTION_ALIASES: &[(&str, &str)] = &[
448 ("upper", "ucase"),
449 ("lower", "lcase"),
450 ("ceil", "ceiling"),
451 ("substr", "mid"),
452 ("random", "rand"),
453];
454
455const AGGREGATE_FUNCTION_ALIASES: &[(&str, &str)] =
457 &[("stddev_pop", "std"), ("var_pop", "variance")];
458
459fn register_function_aliases(ctx: &SessionContext) {
464 let state = ctx.state();
465
466 for (target, alias) in SCALAR_FUNCTION_ALIASES {
467 if let Some(func) = state.scalar_functions().get(*target) {
468 let aliased = func.as_ref().clone().with_aliases([*alias]);
469 ctx.register_udf(aliased);
470 }
471 }
472
473 for (target, alias) in AGGREGATE_FUNCTION_ALIASES {
474 if let Some(func) = state.aggregate_functions().get(*target) {
475 let aliased = func.as_ref().clone().with_aliases([*alias]);
476 ctx.register_udaf(aliased);
477 }
478 }
479}
480
481impl DfQueryPlanner {
482 fn new(
483 catalog_manager: CatalogManagerRef,
484 partition_rule_manager: Option<PartitionRuleManagerRef>,
485 region_query_handler: Option<RegionQueryHandlerRef>,
486 ) -> Self {
487 let mut planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>> =
488 vec![Arc::new(PromExtensionPlanner), Arc::new(RangeSelectPlanner)];
489 if let (Some(region_query_handler), Some(partition_rule_manager)) =
490 (region_query_handler, partition_rule_manager)
491 {
492 planners.push(Arc::new(DistExtensionPlanner::new(
493 catalog_manager,
494 partition_rule_manager,
495 region_query_handler,
496 )));
497 planners.push(Arc::new(MergeSortExtensionPlanner {}));
498 }
499 Self {
500 physical_planner: DefaultPhysicalPlanner::with_extension_planners(planners),
501 }
502 }
503}
504
505#[derive(Debug)]
510struct MetricsMemoryPool {
511 inner: Arc<TrackConsumersPool<GreedyMemoryPool>>,
512}
513
514impl MetricsMemoryPool {
515 const TOP_CONSUMERS_TO_REPORT: usize = 5;
517
518 fn new(limit: usize) -> Self {
519 Self {
520 inner: Arc::new(TrackConsumersPool::new(
521 GreedyMemoryPool::new(limit),
522 NonZeroUsize::new(Self::TOP_CONSUMERS_TO_REPORT).unwrap(),
523 )),
524 }
525 }
526
527 #[inline]
528 fn update_metrics(&self) {
529 QUERY_MEMORY_POOL_USAGE_BYTES.set(self.inner.reserved() as i64);
530 }
531}
532
533impl MemoryPool for MetricsMemoryPool {
534 fn register(&self, consumer: &MemoryConsumer) {
535 self.inner.register(consumer);
536 }
537
538 fn unregister(&self, consumer: &MemoryConsumer) {
539 self.inner.unregister(consumer);
540 }
541
542 fn grow(&self, reservation: &MemoryReservation, additional: usize) {
543 self.inner.grow(reservation, additional);
544 self.update_metrics();
545 }
546
547 fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
548 self.inner.shrink(reservation, shrink);
549 self.update_metrics();
550 }
551
552 fn try_grow(
553 &self,
554 reservation: &MemoryReservation,
555 additional: usize,
556 ) -> datafusion_common::Result<()> {
557 let result = self.inner.try_grow(reservation, additional);
558 if result.is_err() {
559 QUERY_MEMORY_POOL_REJECTED_TOTAL.inc();
560 }
561 self.update_metrics();
562 result
563 }
564
565 fn reserved(&self) -> usize {
566 self.inner.reserved()
567 }
568
569 fn memory_limit(&self) -> MemoryLimit {
570 self.inner.memory_limit()
571 }
572}