1use std::collections::HashMap;
16use std::fmt;
17use std::num::NonZeroUsize;
18use std::sync::{Arc, RwLock};
19
20use async_trait::async_trait;
21use catalog::CatalogManagerRef;
22use common_base::Plugins;
23use common_function::aggrs::aggr_wrapper::fix_order::FixStateUdafOrderingAnalyzer;
24use common_function::function_factory::ScalarFunctionFactory;
25use common_function::function_registry::FUNCTION_REGISTRY;
26use common_function::handlers::{
27 FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef,
28};
29use common_function::state::FunctionState;
30use common_stat::get_total_memory_bytes;
31use common_telemetry::warn;
32use datafusion::catalog::TableFunction;
33use datafusion::dataframe::DataFrame;
34use datafusion::error::Result as DfResult;
35use datafusion::execution::SessionStateBuilder;
36use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionContext, SessionState};
37use datafusion::execution::memory_pool::{
38 GreedyMemoryPool, MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation,
39 TrackConsumersPool,
40};
41use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder};
42use datafusion::physical_optimizer::PhysicalOptimizerRule;
43use datafusion::physical_optimizer::optimizer::PhysicalOptimizer;
44use datafusion::physical_optimizer::sanity_checker::SanityCheckPlan;
45use datafusion::physical_plan::ExecutionPlan;
46use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner};
47use datafusion_expr::{AggregateUDF, LogicalPlan as DfLogicalPlan, WindowUDF};
48use datafusion_optimizer::Analyzer;
49use datafusion_optimizer::analyzer::function_rewrite::ApplyFunctionRewrites;
50use datafusion_optimizer::optimizer::Optimizer;
51use partition::manager::PartitionRuleManagerRef;
52use promql::extension_plan::PromExtensionPlanner;
53use table::TableRef;
54use table::table::adapter::DfTableProviderAdapter;
55
56use crate::QueryEngineContext;
57use crate::dist_plan::{
58 DistExtensionPlanner, DistPlannerAnalyzer, DistPlannerOptions, MergeSortExtensionPlanner,
59};
60use crate::metrics::{QUERY_MEMORY_POOL_REJECTED_TOTAL, QUERY_MEMORY_POOL_USAGE_BYTES};
61use crate::optimizer::ExtensionAnalyzerRule;
62use crate::optimizer::const_normalization::ConstNormalizationRule;
63use crate::optimizer::constant_term::MatchesConstantTermOptimizer;
64use crate::optimizer::count_nest_aggr::CountNestAggrRule;
65use crate::optimizer::count_wildcard::CountWildcardToTimeIndexRule;
66use crate::optimizer::json_type_concretize::JsonTypeConcretizeRule;
67use crate::optimizer::parallelize_scan::ParallelizeScan;
68use crate::optimizer::pass_distribution::PassDistribution;
69use crate::optimizer::remove_duplicate::RemoveDuplicate;
70use crate::optimizer::scan_hint::ScanHintRule;
71use crate::optimizer::string_normalization::StringNormalizationRule;
72use crate::optimizer::transcribe_atat::TranscribeAtatRule;
73use crate::optimizer::type_conversion::TypeConversionRule;
74use crate::optimizer::windowed_sort::WindowedSortPhysicalRule;
75use crate::options::QueryOptions as QueryOptionsNew;
76use crate::query_engine::DefaultSerializer;
77use crate::query_engine::options::QueryOptions;
78use crate::range_select::planner::RangeSelectPlanner;
79use crate::region_query::RegionQueryHandlerRef;
80
81#[derive(Clone)]
83pub struct QueryEngineState {
84 df_context: SessionContext,
85 catalog_manager: CatalogManagerRef,
86 function_state: Arc<FunctionState>,
87 scalar_functions: Arc<RwLock<HashMap<String, ScalarFunctionFactory>>>,
88 aggr_functions: Arc<RwLock<HashMap<String, AggregateUDF>>>,
89 table_functions: Arc<RwLock<HashMap<String, Arc<TableFunction>>>>,
90 extension_rules: Vec<Arc<dyn ExtensionAnalyzerRule + Send + Sync>>,
91 plugins: Plugins,
92}
93
94impl fmt::Debug for QueryEngineState {
95 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
96 f.debug_struct("QueryEngineState")
97 .field("state", &self.df_context.state())
98 .finish()
99 }
100}
101
102impl QueryEngineState {
103 #[allow(clippy::too_many_arguments)]
104 pub fn new(
105 catalog_list: CatalogManagerRef,
106 partition_rule_manager: Option<PartitionRuleManagerRef>,
107 region_query_handler: Option<RegionQueryHandlerRef>,
108 table_mutation_handler: Option<TableMutationHandlerRef>,
109 procedure_service_handler: Option<ProcedureServiceHandlerRef>,
110 flow_service_handler: Option<FlowServiceHandlerRef>,
111 with_dist_planner: bool,
112 plugins: Plugins,
113 options: QueryOptionsNew,
114 ) -> Self {
115 let total_memory = get_total_memory_bytes().max(0) as u64;
116 let memory_pool_size = options.memory_pool_size.resolve(total_memory) as usize;
117 let runtime_env = if memory_pool_size > 0 {
118 Arc::new(
119 RuntimeEnvBuilder::new()
120 .with_memory_pool(Arc::new(MetricsMemoryPool::new(memory_pool_size)))
121 .build()
122 .expect("Failed to build RuntimeEnv"),
123 )
124 } else {
125 Arc::new(RuntimeEnv::default())
126 };
127 let mut session_config = SessionConfig::new().with_create_default_catalog_and_schema(false);
128 if options.parallelism > 0 {
129 session_config = session_config.with_target_partitions(options.parallelism);
130 }
131 if options.allow_query_fallback {
132 session_config
133 .options_mut()
134 .extensions
135 .insert(DistPlannerOptions {
136 allow_query_fallback: true,
137 });
138 }
139
140 session_config
143 .options_mut()
144 .execution
145 .skip_physical_aggregate_schema_check = true;
146
147 let mut extension_rules = Vec::new();
149
150 extension_rules.insert(0, Arc::new(TypeConversionRule) as _);
152 extension_rules.push(Arc::new(CountNestAggrRule) as _);
153
154 let mut analyzer = Analyzer::new();
156 analyzer.rules.insert(0, Arc::new(TranscribeAtatRule));
157 analyzer.rules.insert(0, Arc::new(StringNormalizationRule));
158 analyzer
159 .rules
160 .insert(0, Arc::new(CountWildcardToTimeIndexRule));
161 analyzer.rules.push(Arc::new(ConstNormalizationRule));
162
163 analyzer.rules.insert(
167 0,
168 Arc::new(ApplyFunctionRewrites::new(
169 FUNCTION_REGISTRY.function_rewrites(),
170 )),
171 );
172
173 if with_dist_planner {
174 analyzer.rules.push(Arc::new(DistPlannerAnalyzer));
175 }
176 analyzer.rules.push(Arc::new(FixStateUdafOrderingAnalyzer));
177
178 let mut optimizer = Optimizer::new();
179 optimizer.rules.push(Arc::new(ScanHintRule));
180 optimizer.rules.push(Arc::new(JsonTypeConcretizeRule));
181
182 let mut physical_optimizer = PhysicalOptimizer::new();
184 physical_optimizer
186 .rules
187 .insert(5, Arc::new(ParallelizeScan));
188 physical_optimizer
190 .rules
191 .insert(6, Arc::new(PassDistribution));
192 physical_optimizer.rules.insert(
194 7,
195 Arc::new(datafusion::physical_optimizer::enforce_sorting::EnforceSorting {}),
196 );
197 physical_optimizer
199 .rules
200 .push(Arc::new(WindowedSortPhysicalRule));
201 physical_optimizer
206 .rules
207 .push(Arc::new(MatchesConstantTermOptimizer));
208 physical_optimizer.rules.push(Arc::new(RemoveDuplicate));
210 Self::remove_physical_optimizer_rule(
212 &mut physical_optimizer.rules,
213 SanityCheckPlan {}.name(),
214 );
215 physical_optimizer.rules.push(Arc::new(SanityCheckPlan {}));
216
217 let session_state = SessionStateBuilder::new()
218 .with_config(session_config)
219 .with_runtime_env(runtime_env)
220 .with_default_features()
221 .with_analyzer_rules(analyzer.rules)
222 .with_serializer_registry(Arc::new(DefaultSerializer))
223 .with_query_planner(Arc::new(DfQueryPlanner::new(
224 catalog_list.clone(),
225 partition_rule_manager,
226 region_query_handler,
227 )))
228 .with_optimizer_rules(optimizer.rules)
229 .with_physical_optimizer_rules(physical_optimizer.rules)
230 .build();
231
232 let df_context = SessionContext::new_with_state(session_state);
233 register_function_aliases(&df_context);
234
235 Self {
236 df_context,
237 catalog_manager: catalog_list,
238 function_state: Arc::new(FunctionState {
239 table_mutation_handler,
240 procedure_service_handler,
241 flow_service_handler,
242 }),
243 aggr_functions: Arc::new(RwLock::new(HashMap::new())),
244 table_functions: Arc::new(RwLock::new(HashMap::new())),
245 extension_rules,
246 plugins,
247 scalar_functions: Arc::new(RwLock::new(HashMap::new())),
248 }
249 }
250
251 fn remove_physical_optimizer_rule(
252 rules: &mut Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
253 name: &str,
254 ) {
255 rules.retain(|rule| rule.name() != name);
256 }
257
258 pub fn optimize_by_extension_rules(
260 &self,
261 plan: DfLogicalPlan,
262 context: &QueryEngineContext,
263 ) -> DfResult<DfLogicalPlan> {
264 self.extension_rules
265 .iter()
266 .try_fold(plan, |acc_plan, rule| {
267 rule.analyze(acc_plan, context, self.session_state().config_options())
268 })
269 }
270
271 pub fn optimize_logical_plan(&self, plan: DfLogicalPlan) -> DfResult<DfLogicalPlan> {
273 self.session_state().optimize(&plan)
274 }
275
276 pub fn scalar_function(&self, function_name: &str) -> Option<ScalarFunctionFactory> {
278 self.scalar_functions
279 .read()
280 .unwrap()
281 .get(function_name)
282 .cloned()
283 }
284
285 pub fn scalar_names(&self) -> Vec<String> {
287 self.scalar_functions
288 .read()
289 .unwrap()
290 .keys()
291 .cloned()
292 .collect()
293 }
294
295 pub fn aggr_function(&self, function_name: &str) -> Option<AggregateUDF> {
297 self.aggr_functions
298 .read()
299 .unwrap()
300 .get(function_name)
301 .cloned()
302 }
303
304 pub fn aggr_names(&self) -> Vec<String> {
306 self.aggr_functions
307 .read()
308 .unwrap()
309 .keys()
310 .cloned()
311 .collect()
312 }
313
314 pub fn table_function(&self, function_name: &str) -> Option<Arc<TableFunction>> {
316 self.table_functions
317 .read()
318 .unwrap()
319 .get(function_name)
320 .cloned()
321 }
322
323 pub fn table_function_names(&self) -> Vec<String> {
325 self.table_functions
326 .read()
327 .unwrap()
328 .keys()
329 .cloned()
330 .collect()
331 }
332
333 pub fn register_scalar_function(&self, func: ScalarFunctionFactory) {
336 let name = func.name().to_string();
337 let x = self
338 .scalar_functions
339 .write()
340 .unwrap()
341 .insert(name.clone(), func);
342
343 if x.is_some() {
344 warn!("Already registered scalar function '{name}'");
345 }
346 }
347
348 pub fn register_aggr_function(&self, func: AggregateUDF) {
357 let name = func.name().to_string();
358 let x = self
359 .aggr_functions
360 .write()
361 .unwrap()
362 .insert(name.clone(), func);
363 assert!(
364 x.is_none(),
365 "Already registered aggregate function '{name}'"
366 );
367 }
368
369 pub fn register_table_function(&self, func: Arc<TableFunction>) {
370 let name = func.name();
371 let x = self
372 .table_functions
373 .write()
374 .unwrap()
375 .insert(name.to_string(), func.clone());
376
377 if x.is_some() {
378 warn!("Already registered table function '{name}'");
379 }
380 }
381
382 pub fn register_window_function(&self, func: WindowUDF) {
387 self.df_context.register_udwf(func);
388 }
389
390 pub fn catalog_manager(&self) -> &CatalogManagerRef {
391 &self.catalog_manager
392 }
393
394 pub fn function_state(&self) -> Arc<FunctionState> {
395 self.function_state.clone()
396 }
397
398 pub fn table_mutation_handler(&self) -> Option<&TableMutationHandlerRef> {
400 self.function_state.table_mutation_handler.as_ref()
401 }
402
403 pub fn procedure_service_handler(&self) -> Option<&ProcedureServiceHandlerRef> {
405 self.function_state.procedure_service_handler.as_ref()
406 }
407
408 pub(crate) fn disallow_cross_catalog_query(&self) -> bool {
409 self.plugins
410 .map::<QueryOptions, _, _>(|x| x.disallow_cross_catalog_query)
411 .unwrap_or(false)
412 }
413
414 pub fn session_state(&self) -> SessionState {
415 self.df_context.state()
416 }
417
418 pub fn read_table(&self, table: TableRef) -> DfResult<DataFrame> {
420 self.df_context
421 .read_table(Arc::new(DfTableProviderAdapter::new(table)))
422 }
423}
424
425struct DfQueryPlanner {
426 physical_planner: DefaultPhysicalPlanner,
427}
428
429impl fmt::Debug for DfQueryPlanner {
430 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
431 f.debug_struct("DfQueryPlanner").finish()
432 }
433}
434
435#[async_trait]
436impl QueryPlanner for DfQueryPlanner {
437 async fn create_physical_plan(
438 &self,
439 logical_plan: &DfLogicalPlan,
440 session_state: &SessionState,
441 ) -> DfResult<Arc<dyn ExecutionPlan>> {
442 self.physical_planner
443 .create_physical_plan(logical_plan, session_state)
444 .await
445 }
446}
447
448const SCALAR_FUNCTION_ALIASES: &[(&str, &str)] = &[
450 ("upper", "ucase"),
451 ("lower", "lcase"),
452 ("ceil", "ceiling"),
453 ("substr", "mid"),
454 ("random", "rand"),
455];
456
457const AGGREGATE_FUNCTION_ALIASES: &[(&str, &str)] =
459 &[("stddev_pop", "std"), ("var_pop", "variance")];
460
461fn register_function_aliases(ctx: &SessionContext) {
466 let state = ctx.state();
467
468 for (target, alias) in SCALAR_FUNCTION_ALIASES {
469 if let Some(func) = state.scalar_functions().get(*target) {
470 let aliased = func.as_ref().clone().with_aliases([*alias]);
471 ctx.register_udf(aliased);
472 }
473 }
474
475 for (target, alias) in AGGREGATE_FUNCTION_ALIASES {
476 if let Some(func) = state.aggregate_functions().get(*target) {
477 let aliased = func.as_ref().clone().with_aliases([*alias]);
478 ctx.register_udaf(aliased);
479 }
480 }
481}
482
483impl DfQueryPlanner {
484 fn new(
485 catalog_manager: CatalogManagerRef,
486 partition_rule_manager: Option<PartitionRuleManagerRef>,
487 region_query_handler: Option<RegionQueryHandlerRef>,
488 ) -> Self {
489 let mut planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>> =
490 vec![Arc::new(PromExtensionPlanner), Arc::new(RangeSelectPlanner)];
491 if let (Some(region_query_handler), Some(partition_rule_manager)) =
492 (region_query_handler, partition_rule_manager)
493 {
494 planners.push(Arc::new(DistExtensionPlanner::new(
495 catalog_manager,
496 partition_rule_manager,
497 region_query_handler,
498 )));
499 planners.push(Arc::new(MergeSortExtensionPlanner {}));
500 }
501 Self {
502 physical_planner: DefaultPhysicalPlanner::with_extension_planners(planners),
503 }
504 }
505}
506
507#[derive(Debug)]
512struct MetricsMemoryPool {
513 inner: Arc<TrackConsumersPool<GreedyMemoryPool>>,
514}
515
516impl MetricsMemoryPool {
517 const TOP_CONSUMERS_TO_REPORT: usize = 5;
519
520 fn new(limit: usize) -> Self {
521 Self {
522 inner: Arc::new(TrackConsumersPool::new(
523 GreedyMemoryPool::new(limit),
524 NonZeroUsize::new(Self::TOP_CONSUMERS_TO_REPORT).unwrap(),
525 )),
526 }
527 }
528
529 #[inline]
530 fn update_metrics(&self) {
531 QUERY_MEMORY_POOL_USAGE_BYTES.set(self.inner.reserved() as i64);
532 }
533}
534
535impl MemoryPool for MetricsMemoryPool {
536 fn register(&self, consumer: &MemoryConsumer) {
537 self.inner.register(consumer);
538 }
539
540 fn unregister(&self, consumer: &MemoryConsumer) {
541 self.inner.unregister(consumer);
542 }
543
544 fn grow(&self, reservation: &MemoryReservation, additional: usize) {
545 self.inner.grow(reservation, additional);
546 self.update_metrics();
547 }
548
549 fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
550 self.inner.shrink(reservation, shrink);
551 self.update_metrics();
552 }
553
554 fn try_grow(
555 &self,
556 reservation: &MemoryReservation,
557 additional: usize,
558 ) -> datafusion_common::Result<()> {
559 let result = self.inner.try_grow(reservation, additional);
560 if result.is_err() {
561 QUERY_MEMORY_POOL_REJECTED_TOTAL.inc();
562 }
563 self.update_metrics();
564 result
565 }
566
567 fn reserved(&self) -> usize {
568 self.inner.reserved()
569 }
570
571 fn memory_limit(&self) -> MemoryLimit {
572 self.inner.memory_limit()
573 }
574}