1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::sync::{Arc, RwLock};
17use std::time::{Duration, SystemTime, UNIX_EPOCH};
18
19use api::v1::{CreateTableExpr, TableName};
20use catalog::CatalogManagerRef;
21use common_error::ext::BoxedError;
22use common_query::logical_plan::breakup_insert_plan;
23use common_telemetry::tracing::warn;
24use common_telemetry::{debug, info};
25use common_time::Timestamp;
26use datafusion::datasource::DefaultTableSource;
27use datafusion::sql::unparser::expr_to_sql;
28use datafusion_common::tree_node::{Transformed, TreeNode};
29use datafusion_common::utils::quote_identifier;
30use datafusion_common::{DFSchemaRef, TableReference};
31use datafusion_expr::{DmlStatement, LogicalPlan, WriteOp, col, lit};
32use datatypes::schema::Schema;
33use query::QueryEngineRef;
34use query::options::FLOW_INCREMENTAL_MODE;
35use query::query_engine::DefaultSerializer;
36use session::context::QueryContextRef;
37use snafu::{OptionExt, ResultExt};
38use sql::parsers::utils::is_tql;
39use store_api::mito_engine_options::MERGE_MODE_KEY;
40use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
41use table::table::adapter::DfTableProviderAdapter;
42use tokio::sync::oneshot::error::TryRecvError;
43use tokio::sync::{Mutex, oneshot};
44use tokio::time::Instant;
45
46use crate::batching_mode::BatchingModeOptions;
47use crate::batching_mode::checkpoint::checkpoint_mode_label;
48use crate::batching_mode::frontend_client::{FrontendClient, PeerDesc};
49use crate::batching_mode::state::{
50 CheckpointMode, DirtyTimeWindows, FilterExprInfo, TaskState, to_df_literal,
51};
52use crate::batching_mode::table_creator::{QueryType, create_table_with_expr};
53use crate::batching_mode::time_window::TimeWindowExpr;
54use crate::batching_mode::utils::{
55 AddFilterRewriter, ColumnMatcherRewriter, df_plan_to_sql, gen_plan_with_matching_schema,
56 get_table_info_df_schema, sql_to_df_plan,
57};
58use crate::df_optimizer::apply_df_optimizer;
59use crate::error::{
60 DatafusionSnafu, ExternalSnafu, InvalidQuerySnafu, SubstraitEncodeLogicalPlanSnafu,
61 UnexpectedSnafu,
62};
63use crate::metrics::{
64 METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME,
65 METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT,
66 METRIC_FLOW_ROWS,
67};
68use crate::{Error, FlowId};
69
70mod ckpt;
71mod inc;
72
73#[derive(Clone)]
75pub struct TaskConfig {
76 pub flow_id: FlowId,
77 pub query: String,
78 pub output_schema: DFSchemaRef,
80 pub time_window_expr: Option<TimeWindowExpr>,
81 pub expire_after: Option<i64>,
83 pub sink_table_name: [String; 3],
84 pub source_table_names: HashSet<[String; 3]>,
85 pub catalog_manager: CatalogManagerRef,
86 pub query_type: QueryType,
87 pub batch_opts: Arc<BatchingModeOptions>,
88 pub flow_eval_interval: Option<Duration>,
89}
90
91fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<QueryType, Error> {
92 let is_tql = is_tql(query_ctx.sql_dialect(), query)
93 .map_err(BoxedError::new)
94 .context(ExternalSnafu)?;
95 Ok(if is_tql {
96 QueryType::Tql
97 } else {
98 QueryType::Sql
99 })
100}
101
102fn is_merge_mode_last_non_null(options: &HashMap<String, String>) -> bool {
103 options
104 .get(MERGE_MODE_KEY)
105 .map(|mode| mode.eq_ignore_ascii_case("last_non_null"))
106 .unwrap_or(false)
107}
108
109fn encode_insert_plan_request(
110 insert_to: TableName,
111 insert_input_plan: &LogicalPlan,
112) -> Result<api::v1::QueryRequest, Error> {
113 let message = DFLogicalSubstraitConvertor {}
114 .encode(insert_input_plan, DefaultSerializer)
115 .context(SubstraitEncodeLogicalPlanSnafu)?;
116 Ok(api::v1::QueryRequest {
117 query: Some(api::v1::query_request::Query::InsertIntoPlan(
118 api::v1::InsertIntoPlan {
119 table_name: Some(insert_to),
120 logical_plan: message.to_vec(),
121 },
122 )),
123 })
124}
125
126fn format_insert_target_columns(plan: &LogicalPlan) -> String {
127 plan.schema()
128 .fields()
129 .iter()
130 .map(|field| quote_identifier(field.name()).to_string())
131 .collect::<Vec<_>>()
132 .join(", ")
133}
134
135#[derive(Clone)]
136pub struct BatchingTask {
137 pub config: Arc<TaskConfig>,
138 pub state: Arc<RwLock<TaskState>>,
139 execution_lock: Arc<Mutex<()>>,
143}
144
145pub struct TaskArgs<'a> {
147 pub flow_id: FlowId,
148 pub query: &'a str,
149 pub plan: LogicalPlan,
150 pub time_window_expr: Option<TimeWindowExpr>,
151 pub expire_after: Option<i64>,
152 pub sink_table_name: [String; 3],
153 pub source_table_names: Vec<[String; 3]>,
154 pub query_ctx: QueryContextRef,
155 pub catalog_manager: CatalogManagerRef,
156 pub shutdown_rx: oneshot::Receiver<()>,
157 pub batch_opts: Arc<BatchingModeOptions>,
158 pub flow_eval_interval: Option<Duration>,
159}
160
161pub struct PlanInfo {
162 pub plan: LogicalPlan,
163 pub dirty_restore: DirtyRestore,
164 pub coverage: QueryCoverage,
165}
166
167#[derive(Clone)]
168pub enum QueryCoverage {
169 UnfilteredFull,
174 ScopedBaseRepair,
177 FencedRepairChunk { high: BTreeMap<u64, u64> },
181 IncrementalDelta,
183}
184
185impl QueryCoverage {
186 fn is_incremental_delta(&self) -> bool {
189 matches!(self, Self::IncrementalDelta)
190 }
191
192 fn snapshot_seqs(&self) -> HashMap<u64, u64> {
195 match self {
196 Self::FencedRepairChunk { high } => high.iter().map(|(k, v)| (*k, *v)).collect(),
197 _ => HashMap::new(),
198 }
199 }
200}
201
202pub enum DirtyRestore {
203 Scoped(FilterExprInfo),
206 Unscoped(DirtyTimeWindows),
213}
214
215struct ExecuteOnceOutcome {
216 new_query: Option<PlanInfo>,
217 result: Result<Option<(usize, Duration)>, Error>,
223}
224
225impl BatchingTask {
226 #[allow(clippy::too_many_arguments)]
227 pub fn try_new(
228 TaskArgs {
229 flow_id,
230 query,
231 plan,
232 time_window_expr,
233 expire_after,
234 sink_table_name,
235 source_table_names,
236 query_ctx,
237 catalog_manager,
238 shutdown_rx,
239 batch_opts,
240 flow_eval_interval,
241 }: TaskArgs<'_>,
242 ) -> Result<Self, Error> {
243 let mut state = TaskState::with_dirty_time_windows(
244 query_ctx.clone(),
245 shutdown_rx,
246 DirtyTimeWindows::new(
247 batch_opts.experimental_max_filter_num_per_query,
248 batch_opts.experimental_time_window_merge_threshold,
249 ),
250 );
251 if !batch_opts.experimental_enable_incremental_read {
252 state.disable_incremental();
253 }
254
255 Ok(Self {
256 config: Arc::new(TaskConfig {
257 flow_id,
258 query: query.to_string(),
259 time_window_expr,
260 expire_after,
261 sink_table_name,
262 source_table_names: source_table_names.into_iter().collect(),
263 catalog_manager,
264 output_schema: plan.schema().clone(),
265 query_type: determine_query_type(query, &query_ctx)?,
266 batch_opts,
267 flow_eval_interval,
268 }),
269 state: Arc::new(RwLock::new(state)),
270 execution_lock: Arc::new(Mutex::new(())),
271 })
272 }
273
274 pub fn last_execution_time_millis(&self) -> Option<i64> {
275 self.state.read().unwrap().last_execution_time_millis()
276 }
277
278 pub fn mark_all_windows_as_dirty(&self) -> Result<(), Error> {
282 let now = SystemTime::now();
283 let now = Timestamp::new_second(
284 now.duration_since(UNIX_EPOCH)
285 .expect("Time went backwards")
286 .as_secs() as _,
287 );
288 let lower_bound = self
289 .config
290 .expire_after
291 .map(|e| now.sub_duration(Duration::from_secs(e as _)))
292 .transpose()
293 .map_err(BoxedError::new)
294 .context(ExternalSnafu)?
295 .unwrap_or(Timestamp::new_second(0));
296 debug!(
297 "Flow {} mark range ({:?}, {:?}) as dirty",
298 self.config.flow_id, lower_bound, now
299 );
300 self.state
301 .write()
302 .unwrap()
303 .dirty_time_windows
304 .add_window(lower_bound, Some(now));
305 Ok(())
306 }
307
308 pub async fn check_or_create_sink_table(
310 &self,
311 engine: &QueryEngineRef,
312 frontend_client: &Arc<FrontendClient>,
313 ) -> Result<Option<(usize, Duration)>, Error> {
314 if !self.is_table_exist(&self.config.sink_table_name).await? {
315 let create_table = self.gen_create_table_expr(engine.clone()).await?;
316 info!(
317 "Try creating sink table(if not exists) with expr: {:?}",
318 create_table
319 );
320 self.create_table(frontend_client, create_table).await?;
321 info!(
322 "Sink table {}(if not exists) created",
323 self.config.sink_table_name.join(".")
324 );
325 }
326
327 Ok(None)
328 }
329
330 pub async fn validate_sink_table_schema(&self, engine: &QueryEngineRef) -> Result<(), Error> {
336 let (table, _) = get_table_info_df_schema(
337 self.config.catalog_manager.clone(),
338 self.config.sink_table_name.clone(),
339 )
340 .await?;
341
342 let table_meta = &table.table_info().meta;
343 let merge_mode_last_non_null =
344 is_merge_mode_last_non_null(&table_meta.options.extra_options);
345 let primary_key_indices = table_meta.primary_key_indices.clone();
346 let query_ctx = self.state.read().unwrap().query_ctx.clone();
347
348 gen_plan_with_matching_schema(
349 &self.config.query,
350 query_ctx,
351 engine.clone(),
352 table_meta.schema.clone(),
353 &primary_key_indices,
354 merge_mode_last_non_null,
355 )
356 .await
357 .map(|_| ())
358 }
359
360 async fn is_table_exist(&self, table_name: &[String; 3]) -> Result<bool, Error> {
361 self.config
362 .catalog_manager
363 .table_exists(&table_name[0], &table_name[1], &table_name[2], None)
364 .await
365 .map_err(BoxedError::new)
366 .context(ExternalSnafu)
367 }
368
369 pub(crate) async fn execute_once_serialized(
370 &self,
371 engine: &QueryEngineRef,
372 frontend_client: &Arc<FrontendClient>,
373 max_window_cnt: Option<usize>,
374 ) -> Result<Option<(usize, Duration)>, Error> {
375 let outcome = self
376 .execute_once_serialized_with_outcome(engine, frontend_client, max_window_cnt)
377 .await;
378 outcome.result
379 }
380
381 async fn execute_once_serialized_with_outcome(
384 &self,
385 engine: &QueryEngineRef,
386 frontend_client: &Arc<FrontendClient>,
387 max_window_cnt: Option<usize>,
388 ) -> ExecuteOnceOutcome {
389 let _execution_guard = self.execution_lock.lock().await;
390 self.execute_once_unlocked(engine, frontend_client, max_window_cnt)
391 .await
392 }
393
394 async fn execute_once_unlocked(
396 &self,
397 engine: &QueryEngineRef,
398 frontend_client: &Arc<FrontendClient>,
399 max_window_cnt: Option<usize>,
400 ) -> ExecuteOnceOutcome {
401 let new_query = match self.gen_insert_plan_unlocked(engine, max_window_cnt).await {
402 Ok(new_query) => new_query,
403 Err(err) => {
404 return ExecuteOnceOutcome {
405 new_query: None,
406 result: Err(err),
407 };
408 }
409 };
410
411 if let Some(new_query) = new_query {
412 debug!("Generate new query: {}", new_query.plan);
413 let res = self
414 .execute_logical_plan_unlocked(
415 frontend_client,
416 &new_query.plan,
417 &new_query.dirty_restore,
418 &new_query.coverage,
419 )
420 .await;
421 if res.is_err() {
422 self.handle_executed_query_failure(Some(&new_query));
423 }
424 ExecuteOnceOutcome {
425 new_query: Some(new_query),
426 result: res,
427 }
428 } else {
429 debug!("Generate no query");
430 ExecuteOnceOutcome {
431 new_query: None,
432 result: Ok(None),
433 }
434 }
435 }
436
437 async fn gen_insert_plan_unlocked(
439 &self,
440 engine: &QueryEngineRef,
441 max_window_cnt: Option<usize>,
442 ) -> Result<Option<PlanInfo>, Error> {
443 let (table, df_schema) = get_table_info_df_schema(
444 self.config.catalog_manager.clone(),
445 self.config.sink_table_name.clone(),
446 )
447 .await?;
448
449 let table_meta = &table.table_info().meta;
450 let merge_mode_last_non_null =
451 is_merge_mode_last_non_null(&table_meta.options.extra_options);
452 let primary_key_indices = table_meta.primary_key_indices.clone();
453
454 let new_query = self
455 .gen_query_with_time_window(
456 engine.clone(),
457 &table.table_info().meta.schema,
458 &primary_key_indices,
459 merge_mode_last_non_null,
460 max_window_cnt,
461 )
462 .await?;
463
464 let Some(new_query) = new_query else {
465 return Ok(None);
466 };
467
468 let table_columns = df_schema
471 .columns()
472 .into_iter()
473 .map(|c| c.name)
474 .collect::<BTreeSet<_>>();
475 for column in new_query.plan.schema().columns() {
476 if !table_columns.contains(column.name()) {
477 self.restore_dirty_windows_after_failure(&new_query);
478 return InvalidQuerySnafu {
479 reason: format!(
480 "Column {} not found in sink table with columns {:?}",
481 column, table_columns
482 ),
483 }
484 .fail();
485 }
486 }
487
488 let table_provider = Arc::new(DfTableProviderAdapter::new(table));
489 let table_source = Arc::new(DefaultTableSource::new(table_provider));
490
491 let plan = LogicalPlan::Dml(DmlStatement::new(
493 datafusion_common::TableReference::Full {
494 catalog: self.config.sink_table_name[0].clone().into(),
495 schema: self.config.sink_table_name[1].clone().into(),
496 table: self.config.sink_table_name[2].clone().into(),
497 },
498 table_source,
499 WriteOp::Insert(datafusion_expr::dml::InsertOp::Append),
500 Arc::new(new_query.plan.clone()),
501 ));
502 let insert_into_info = PlanInfo {
503 plan,
504 dirty_restore: new_query.dirty_restore,
505 coverage: new_query.coverage,
506 };
507 let insert_into =
508 match insert_into_info
509 .plan
510 .clone()
511 .recompute_schema()
512 .context(DatafusionSnafu {
513 context: "Failed to recompute schema",
514 }) {
515 Ok(insert_into) => insert_into,
516 Err(err) => {
517 self.restore_dirty_windows_after_failure(&insert_into_info);
518 return Err(err);
519 }
520 };
521
522 Ok(Some(PlanInfo {
523 plan: insert_into,
524 dirty_restore: insert_into_info.dirty_restore,
525 coverage: insert_into_info.coverage,
526 }))
527 }
528
529 pub async fn create_table(
530 &self,
531 frontend_client: &Arc<FrontendClient>,
532 expr: CreateTableExpr,
533 ) -> Result<(), Error> {
534 let catalog = &self.config.sink_table_name[0];
535 let schema = &self.config.sink_table_name[1];
536 frontend_client
537 .create(expr.clone(), catalog, schema)
538 .await?;
539 Ok(())
540 }
541
542 async fn execute_logical_plan_unlocked(
544 &self,
545 frontend_client: &Arc<FrontendClient>,
546 plan: &LogicalPlan,
547 dirty_restore: &DirtyRestore,
548 coverage: &QueryCoverage,
549 ) -> Result<Option<(usize, Duration)>, Error> {
550 let instant = Instant::now();
551 let flow_id = self.config.flow_id;
552
553 debug!(
554 "Executing flow {flow_id}(expire_after={:?} secs) with query {}",
555 self.config.expire_after, &plan
556 );
557
558 let catalog = &self.config.sink_table_name[0];
559 let schema = &self.config.sink_table_name[1];
560
561 let plan = plan
563 .clone()
564 .transform_down_with_subqueries(|p| {
565 if let LogicalPlan::TableScan(mut table_scan) = p {
566 let resolved = table_scan.table_name.resolve(catalog, schema);
567 table_scan.table_name = resolved.into();
568 Ok(Transformed::yes(LogicalPlan::TableScan(table_scan)))
569 } else {
570 Ok(Transformed::no(p))
571 }
572 })
573 .with_context(|_| DatafusionSnafu {
574 context: format!("Failed to fix table ref in logical plan, plan={:?}", plan),
575 })?
576 .data;
577
578 let incremental_plan = if coverage.is_incremental_delta() {
581 self.prepare_plan_for_incremental(&plan).await?
582 } else {
583 None
584 };
585 let incremental_safe = incremental_plan.is_some();
586 if coverage.is_incremental_delta() && !incremental_safe {
587 warn!(
588 "Flow {flow_id} skipped unsafe incremental delta fallback; \
589 restored dirty signal instead of executing an unfiltered full snapshot"
590 );
591 self.restore_dirty_windows(dirty_restore);
592 return Ok(None);
593 }
594 let plan = incremental_plan.unwrap_or_else(|| plan.clone());
595
596 let extensions = self
597 .build_flow_query_extensions(incremental_safe, coverage.is_incremental_delta())
598 .await?;
599 let extension_refs = extensions
600 .iter()
601 .map(|(key, value)| (*key, value.as_str()))
602 .collect::<Vec<_>>();
603 let query_mode = if extensions
604 .iter()
605 .any(|(key, _)| *key == FLOW_INCREMENTAL_MODE)
606 {
607 CheckpointMode::Incremental
608 } else {
609 CheckpointMode::FullSnapshot
610 };
611 Self::record_query_mode(flow_id, query_mode);
612 debug!(
613 "Flow {flow_id} executing batching query with checkpoint_mode={}, extension_count={}",
614 checkpoint_mode_label(query_mode),
615 extensions.len()
616 );
617
618 let mut peer_desc = None;
619 let res = {
620 let _timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME
621 .with_label_values(&[flow_id.to_string().as_str()])
622 .start_timer();
623
624 let req = if let Some((insert_to, insert_input_plan)) =
625 breakup_insert_plan(&plan, catalog, schema)
626 {
627 if query_mode == CheckpointMode::FullSnapshot
628 && matches!(self.config.query_type, QueryType::Sql)
629 && self.config.flow_eval_interval.is_some()
630 && self.config.time_window_expr.is_none()
631 {
632 match df_plan_to_sql(&insert_input_plan) {
642 Ok(select_sql) => {
643 let target_columns = format_insert_target_columns(&insert_input_plan);
644 let sql = format!(
645 "INSERT INTO {} ({}) {}",
646 TableReference::full(
647 insert_to.catalog_name.as_str(),
648 insert_to.schema_name.as_str(),
649 insert_to.table_name.as_str(),
650 )
651 .to_quoted_string(),
652 target_columns,
653 select_sql
654 );
655 api::v1::QueryRequest {
656 query: Some(api::v1::query_request::Query::Sql(sql)),
657 }
658 }
659 Err(err) => {
660 warn!(
661 "Failed to unparse full-snapshot SQL flow {} plan; \
662 falling back to InsertIntoPlan: {:?}",
663 flow_id, err
664 );
665 encode_insert_plan_request(insert_to, &insert_input_plan)?
666 }
667 }
668 } else {
669 encode_insert_plan_request(insert_to, &insert_input_plan)?
670 }
671 } else {
672 let message = DFLogicalSubstraitConvertor {}
673 .encode(&plan, DefaultSerializer)
674 .context(SubstraitEncodeLogicalPlanSnafu)?;
675
676 api::v1::QueryRequest {
677 query: Some(api::v1::query_request::Query::LogicalPlan(message.to_vec())),
678 }
679 };
680
681 let snapshot_seqs = coverage.snapshot_seqs();
682 frontend_client
683 .query_with_terminal_metrics(
684 catalog,
685 schema,
686 req,
687 &extension_refs,
688 &snapshot_seqs,
689 &mut peer_desc,
690 )
691 .await
692 };
693
694 let elapsed = instant.elapsed();
695 let peer_label = peer_desc
696 .as_ref()
697 .map(ToString::to_string)
698 .unwrap_or_else(|| PeerDesc::default().to_string());
699 if let Err(err) = &res {
700 warn!(
701 "Failed to execute Flow {flow_id} on frontend {peer_label}, result: {err:?}, elapsed: {:?} with query: {}",
702 elapsed, &plan
703 );
704 let decision = {
705 let mut state = self.state.write().unwrap();
706 let reason = Self::query_failure_reason(err, coverage);
707 Self::apply_query_failure_to_state(&mut state, elapsed, coverage, reason)
708 };
709 if let Some(decision) = decision {
710 Self::record_checkpoint_decision(flow_id, decision);
711 }
712 }
713
714 if elapsed >= self.config.batch_opts.slow_query_threshold {
716 warn!(
717 "Flow {flow_id} on frontend {peer_label} executed for {:?} before complete, query: {}",
718 elapsed, &plan
719 );
720 let flow_id = flow_id.to_string();
721 METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY
722 .with_label_values(&[flow_id.as_str(), peer_label.as_str()])
723 .observe(elapsed.as_secs_f64());
724 }
725
726 let res = res?;
727 let (affected_rows, _) = res.output.extract_rows_and_cost();
728 debug!(
729 "Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}, watermark: {:?}",
730 elapsed,
731 res.region_watermark_map()
732 );
733 METRIC_FLOW_ROWS
734 .with_label_values(&[format!("{}-out-batching", flow_id).as_str()])
735 .inc_by(affected_rows as _);
736 let decision = {
737 let mut state = self.state.write().unwrap();
738 Self::apply_query_result_to_state(&mut state, &res, elapsed, coverage)
739 };
740 Self::record_checkpoint_decision(flow_id, decision);
741
742 Ok(Some((affected_rows, elapsed)))
743 }
744
745 fn restore_dirty_windows(&self, dirty_restore: &DirtyRestore) {
749 match dirty_restore {
750 DirtyRestore::Scoped(filter) => self.restore_scoped_dirty_windows(filter),
751 DirtyRestore::Unscoped(dirty_windows) => self
752 .state
753 .write()
754 .unwrap()
755 .dirty_time_windows
756 .add_dirty_windows(dirty_windows),
757 }
758 }
759
760 fn restore_dirty_windows_after_failure(&self, query: &PlanInfo) {
763 self.restore_dirty_windows(&query.dirty_restore);
764 }
765
766 fn restore_scoped_dirty_windows(&self, filter: &FilterExprInfo) {
769 self.state.write().unwrap().restore_scoped_windows(filter);
770 }
771
772 fn restore_scoped_dirty_windows_on_err<T>(
775 &self,
776 filter: &FilterExprInfo,
777 result: Result<T, Error>,
778 ) -> Result<T, Error> {
779 result.inspect_err(|_| {
780 self.restore_scoped_dirty_windows(filter);
781 })
782 }
783
784 fn restore_unscoped_dirty_windows(&self, dirty_windows: &DirtyTimeWindows) {
787 self.state
788 .write()
789 .unwrap()
790 .dirty_time_windows
791 .add_dirty_windows(dirty_windows);
792 }
793
794 fn restore_unscoped_dirty_windows_on_err<T>(
797 &self,
798 dirty_windows: &DirtyTimeWindows,
799 result: Result<T, Error>,
800 ) -> Result<T, Error> {
801 result.inspect_err(|_| {
802 self.restore_unscoped_dirty_windows(dirty_windows);
803 })
804 }
805
806 fn drain_dirty_windows_signal(&self) -> (bool, DirtyTimeWindows) {
809 let mut state = self.state.write().unwrap();
810 let dirty_windows_to_restore = state.dirty_time_windows.clone();
811 let is_dirty = !dirty_windows_to_restore.is_empty();
812 state.dirty_time_windows.clean();
813 (is_dirty, dirty_windows_to_restore)
814 }
815
816 #[allow(clippy::too_many_arguments)]
817 async fn gen_unfiltered_plan_info(
820 &self,
821 engine: QueryEngineRef,
822 query_ctx: QueryContextRef,
823 sink_table_schema: Arc<Schema>,
824 primary_key_indices: &[usize],
825 allow_partial: bool,
826 dirty_windows_to_restore: DirtyTimeWindows,
827 retention_filter: Option<(&str, Timestamp, &'static str)>,
828 coverage: QueryCoverage,
829 ) -> Result<PlanInfo, Error> {
830 let mut plan = self.restore_unscoped_dirty_windows_on_err(
831 &dirty_windows_to_restore,
832 gen_plan_with_matching_schema(
833 &self.config.query,
834 query_ctx,
835 engine,
836 sink_table_schema,
837 primary_key_indices,
838 allow_partial,
839 )
840 .await,
841 )?;
842
843 if let Some((col_name, lower_bound, context)) = retention_filter {
844 let lower = self.restore_unscoped_dirty_windows_on_err(
845 &dirty_windows_to_restore,
846 to_df_literal(lower_bound),
847 )?;
848 let retention_filter = col(col_name).gt_eq(lit(lower));
849 let mut add_filter = AddFilterRewriter::new(retention_filter);
850 plan = self.restore_unscoped_dirty_windows_on_err(
851 &dirty_windows_to_restore,
852 plan.clone()
853 .rewrite(&mut add_filter)
854 .with_context(|_| DatafusionSnafu {
855 context: format!(
856 "Failed to apply {context} expire_after filter to plan:\n {}\n",
857 plan
858 ),
859 })
860 .map(|rewrite| rewrite.data),
861 )?;
862 }
863
864 Ok(PlanInfo {
865 plan,
866 dirty_restore: DirtyRestore::Unscoped(dirty_windows_to_restore),
867 coverage,
868 })
869 }
870
871 #[allow(clippy::too_many_arguments)]
872 async fn gen_unfiltered_plan_info_if_dirty(
875 &self,
876 engine: QueryEngineRef,
877 query_ctx: QueryContextRef,
878 sink_table_schema: Arc<Schema>,
879 primary_key_indices: &[usize],
880 allow_partial: bool,
881 retention_filter: Option<(&str, Timestamp, &'static str)>,
882 coverage: QueryCoverage,
883 ) -> Result<Option<PlanInfo>, Error> {
884 let (is_dirty, dirty_windows_to_restore) = self.drain_dirty_windows_signal();
885 if !is_dirty {
886 debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
887 return Ok(None);
888 }
889
890 self.gen_unfiltered_plan_info(
891 engine,
892 query_ctx,
893 sink_table_schema,
894 primary_key_indices,
895 allow_partial,
896 dirty_windows_to_restore,
897 retention_filter,
898 coverage,
899 )
900 .await
901 .map(Some)
902 }
903
904 fn handle_executed_query_failure(&self, query: Option<&PlanInfo>) {
905 if let Some(query) = query {
906 self.restore_dirty_windows_after_failure(query);
907 }
908 }
909
910 pub async fn start_executing_loop(
914 &self,
915 engine: QueryEngineRef,
916 frontend_client: Arc<FrontendClient>,
917 ) {
918 let flow_id_str = self.config.flow_id.to_string();
919 let mut max_window_cnt = None;
920 let mut interval = self
921 .config
922 .flow_eval_interval
923 .map(|d| tokio::time::interval(d));
924 if let Some(tick) = &mut interval {
925 tick.tick().await; }
927 loop {
928 {
931 let mut state = self.state.write().unwrap();
932 match state.shutdown_rx.try_recv() {
933 Ok(()) => break,
934 Err(TryRecvError::Closed) => {
935 warn!(
936 "Unexpected shutdown flow {}, shutdown anyway",
937 self.config.flow_id
938 );
939 break;
940 }
941 Err(TryRecvError::Empty) => (),
942 }
943 }
944 METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT
945 .with_label_values(&[&flow_id_str])
946 .inc();
947
948 let min_refresh = self.config.batch_opts.experimental_min_refresh_duration;
949
950 let outcome = self
951 .execute_once_serialized_with_outcome(&engine, &frontend_client, max_window_cnt)
952 .await;
953
954 match outcome.result {
955 Ok(Some(_)) => {
957 max_window_cnt = max_window_cnt.map(|cnt| {
959 (cnt + 1).min(self.config.batch_opts.experimental_max_filter_num_per_query)
960 });
961
962 if let Some(eval_interval) = &mut interval {
964 eval_interval.tick().await;
965 } else {
966 let sleep_until = {
969 let state = self.state.write().unwrap();
970
971 let time_window_size = self
972 .config
973 .time_window_expr
974 .as_ref()
975 .and_then(|t| *t.time_window_size());
976
977 let prefer_short_incremental_cadence = state.checkpoint_mode()
978 == CheckpointMode::Incremental
979 && !state.is_incremental_disabled();
980
981 state.get_next_start_query_time(
982 self.config.flow_id,
983 &time_window_size,
984 min_refresh,
985 Some(self.config.batch_opts.query_timeout),
986 self.config.batch_opts.experimental_max_filter_num_per_query,
987 prefer_short_incremental_cadence,
988 )
989 };
990
991 tokio::time::sleep_until(sleep_until).await;
992 };
993 }
994 Ok(None) => {
996 debug!(
997 "Flow id = {:?} found no new data, sleep for {:?} then continue",
998 self.config.flow_id, min_refresh
999 );
1000 tokio::time::sleep(min_refresh).await;
1001 continue;
1002 }
1003 Err(err) => {
1005 METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT
1006 .with_label_values(&[&flow_id_str])
1007 .inc();
1008 match outcome.new_query {
1009 Some(query) => {
1010 common_telemetry::error!(err; "Failed to execute query for flow={} with query: {}", self.config.flow_id, query.plan);
1011 max_window_cnt = Some(1);
1016 }
1017 None => {
1018 common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id)
1019 }
1020 }
1021 tokio::time::sleep(min_refresh).await;
1023 }
1024 }
1025 }
1026 }
1027
1028 async fn gen_create_table_expr(
1035 &self,
1036 engine: QueryEngineRef,
1037 ) -> Result<CreateTableExpr, Error> {
1038 let query_ctx = self.state.read().unwrap().query_ctx.clone();
1039 let plan =
1040 sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, true).await?;
1041 create_table_with_expr(&plan, &self.config.sink_table_name, &self.config.query_type)
1042 }
1043
1044 fn should_use_unfiltered_incremental_delta(&self) -> bool {
1047 let state = self.state.read().unwrap();
1048 state.checkpoint_mode() == CheckpointMode::Incremental
1049 && !state.is_incremental_disabled()
1050 && matches!(self.config.query_type, QueryType::Sql)
1051 }
1052
1053 async fn gen_query_with_time_window(
1056 &self,
1057 engine: QueryEngineRef,
1058 sink_table_schema: &Arc<Schema>,
1059 primary_key_indices: &[usize],
1060 allow_partial: bool,
1061 max_window_cnt: Option<usize>,
1062 ) -> Result<Option<PlanInfo>, Error> {
1063 let query_ctx = self.state.read().unwrap().query_ctx.clone();
1064 let start = SystemTime::now();
1065 let since_the_epoch = start
1066 .duration_since(UNIX_EPOCH)
1067 .expect("Time went backwards");
1068 let low_bound = self
1069 .config
1070 .expire_after
1071 .map(|e| since_the_epoch.as_secs() - e as u64)
1072 .unwrap_or(u64::MIN);
1073
1074 let low_bound = Timestamp::new_second(low_bound as i64);
1075
1076 let expire_time_window_bound = self
1077 .config
1078 .time_window_expr
1079 .as_ref()
1080 .map(|expr| expr.eval(low_bound))
1081 .transpose()?;
1082
1083 let (expire_lower_bound, expire_upper_bound) = match (
1084 expire_time_window_bound,
1085 &self.config.query_type,
1086 ) {
1087 (Some((Some(l), Some(u))), QueryType::Sql) => (l, u),
1088 (None, QueryType::Sql) if self.config.flow_eval_interval.is_none() => {
1089 return UnexpectedSnafu {
1090 reason: format!(
1091 "Flow id={} reached runtime without a time-window expression or EVAL INTERVAL; create-flow validation should have rejected it",
1092 self.config.flow_id
1093 ),
1094 }
1095 .fail();
1096 }
1097 _ => {
1098 let (_, dirty_windows_to_restore) = self.drain_dirty_windows_signal();
1104
1105 let plan_info = self
1106 .gen_unfiltered_plan_info(
1107 engine,
1108 query_ctx,
1109 sink_table_schema.clone(),
1110 primary_key_indices,
1111 allow_partial,
1112 dirty_windows_to_restore,
1113 None,
1114 QueryCoverage::UnfilteredFull,
1115 )
1116 .await?;
1117
1118 return Ok(Some(plan_info));
1119 }
1120 };
1121
1122 debug!(
1123 "Flow id = {:?}, found time window: precise_lower_bound={:?}, precise_upper_bound={:?} with dirty time windows: {:?}",
1124 self.config.flow_id,
1125 expire_lower_bound,
1126 expire_upper_bound,
1127 self.state.read().unwrap().dirty_time_windows
1128 );
1129 let window_size = expire_upper_bound
1130 .sub(&expire_lower_bound)
1131 .with_context(|| UnexpectedSnafu {
1132 reason: format!(
1133 "Can't get window size from {expire_upper_bound:?} - {expire_lower_bound:?}"
1134 ),
1135 })?;
1136 let col_name = self
1137 .config
1138 .time_window_expr
1139 .as_ref()
1140 .map(|expr| expr.column_name.clone())
1141 .with_context(|| UnexpectedSnafu {
1142 reason: format!(
1143 "Flow id={:?}, Failed to get column name from time window expr",
1144 self.config.flow_id
1145 ),
1146 })?;
1147
1148 if self.should_use_unfiltered_incremental_delta() {
1149 let retention_filter = self
1159 .config
1160 .expire_after
1161 .map(|_| (col_name.as_str(), expire_lower_bound, "incremental"));
1162 return self
1163 .gen_unfiltered_plan_info_if_dirty(
1164 engine,
1165 query_ctx,
1166 sink_table_schema.clone(),
1167 primary_key_indices,
1168 allow_partial,
1169 retention_filter,
1170 QueryCoverage::IncrementalDelta,
1171 )
1172 .await;
1173 }
1174
1175 let (expr, coverage) = {
1176 let mut state = self.state.write().unwrap();
1177 let window_cnt = max_window_cnt
1178 .unwrap_or(self.config.batch_opts.experimental_max_filter_num_per_query);
1179 let expr = state.gen_scoped_filter_exprs(
1180 &col_name,
1181 Some(expire_lower_bound),
1182 window_size,
1183 window_cnt,
1184 self.config.flow_id,
1185 Some(self),
1186 )?;
1187 let repair_high = state
1188 .pending_fenced_repair()
1189 .map(|repair| repair.high().clone());
1190 let coverage = if let Some(high) = repair_high {
1191 QueryCoverage::FencedRepairChunk { high }
1192 } else {
1193 QueryCoverage::ScopedBaseRepair
1194 };
1195 (expr, coverage)
1196 };
1197
1198 let Some(expr) = expr else {
1199 debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
1201 return Ok(None);
1202 };
1203
1204 let filter_sql = expr_to_sql(&expr.expr)
1205 .map(|sql| sql.to_string())
1206 .unwrap_or_else(|err| format!("<failed to format filter expr: {err}>"));
1207
1208 debug!(
1209 "Flow id={:?}, Generated filter expr: {:?}",
1210 self.config.flow_id, filter_sql
1211 );
1212
1213 let mut add_filter = AddFilterRewriter::new(expr.expr.clone());
1214 let mut add_auto_column = ColumnMatcherRewriter::new(
1215 sink_table_schema.clone(),
1216 primary_key_indices.to_vec(),
1217 allow_partial,
1218 );
1219
1220 let plan = self.restore_scoped_dirty_windows_on_err(
1221 &expr,
1222 sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false).await,
1223 )?;
1224 let rewrite = self.restore_scoped_dirty_windows_on_err(
1225 &expr,
1226 plan.clone()
1227 .rewrite(&mut add_filter)
1228 .and_then(|p| p.data.rewrite(&mut add_auto_column))
1229 .with_context(|_| DatafusionSnafu {
1230 context: format!("Failed to rewrite plan:\n {}\n", plan),
1231 })
1232 .map(|rewrite| rewrite.data),
1233 )?;
1234 let new_plan = self.restore_scoped_dirty_windows_on_err(
1236 &expr,
1237 apply_df_optimizer(rewrite, &query_ctx).await,
1238 )?;
1239
1240 let info = PlanInfo {
1241 plan: new_plan.clone(),
1242 dirty_restore: DirtyRestore::Scoped(expr),
1243 coverage,
1244 };
1245
1246 Ok(Some(info))
1247 }
1248}
1249
1250#[cfg(test)]
1251mod test;