1use std::collections::{BTreeSet, HashMap, HashSet};
16use std::sync::{Arc, RwLock};
17use std::time::{Duration, SystemTime, UNIX_EPOCH};
18
19use api::v1::CreateTableExpr;
20use catalog::CatalogManagerRef;
21use common_error::ext::BoxedError;
22use common_query::logical_plan::breakup_insert_plan;
23use common_telemetry::tracing::warn;
24use common_telemetry::{debug, info};
25use common_time::Timestamp;
26use datafusion::datasource::DefaultTableSource;
27use datafusion::sql::unparser::expr_to_sql;
28use datafusion_common::DFSchemaRef;
29use datafusion_common::tree_node::{Transformed, TreeNode};
30use datafusion_expr::{DmlStatement, LogicalPlan, WriteOp};
31use datatypes::schema::Schema;
32use query::QueryEngineRef;
33use query::options::FLOW_INCREMENTAL_MODE;
34use query::query_engine::DefaultSerializer;
35use session::context::QueryContextRef;
36use snafu::{OptionExt, ResultExt};
37use sql::parsers::utils::is_tql;
38use store_api::mito_engine_options::MERGE_MODE_KEY;
39use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
40use table::table::adapter::DfTableProviderAdapter;
41use tokio::sync::oneshot;
42use tokio::sync::oneshot::error::TryRecvError;
43use tokio::time::Instant;
44
45use crate::batching_mode::BatchingModeOptions;
46use crate::batching_mode::checkpoint::checkpoint_mode_label;
47use crate::batching_mode::frontend_client::{FrontendClient, PeerDesc};
48use crate::batching_mode::state::{CheckpointMode, DirtyTimeWindows, FilterExprInfo, TaskState};
49use crate::batching_mode::table_creator::{QueryType, create_table_with_expr};
50use crate::batching_mode::time_window::TimeWindowExpr;
51use crate::batching_mode::utils::{
52 AddFilterRewriter, ColumnMatcherRewriter, gen_plan_with_matching_schema,
53 get_table_info_df_schema, sql_to_df_plan,
54};
55use crate::df_optimizer::apply_df_optimizer;
56use crate::error::{
57 DatafusionSnafu, ExternalSnafu, InvalidQuerySnafu, SubstraitEncodeLogicalPlanSnafu,
58 UnexpectedSnafu,
59};
60use crate::metrics::{
61 METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME,
62 METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT,
63 METRIC_FLOW_ROWS,
64};
65use crate::{Error, FlowId};
66
67mod ckpt;
68mod inc;
69
70const MAX_INCREMENTAL_DIRTY_WINDOW_FILTERS: usize = 4096;
75
76#[derive(Clone)]
78pub struct TaskConfig {
79 pub flow_id: FlowId,
80 pub query: String,
81 pub output_schema: DFSchemaRef,
83 pub time_window_expr: Option<TimeWindowExpr>,
84 pub expire_after: Option<i64>,
86 pub sink_table_name: [String; 3],
87 pub source_table_names: HashSet<[String; 3]>,
88 pub catalog_manager: CatalogManagerRef,
89 pub query_type: QueryType,
90 pub batch_opts: Arc<BatchingModeOptions>,
91 pub flow_eval_interval: Option<Duration>,
92}
93
94fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<QueryType, Error> {
95 let is_tql = is_tql(query_ctx.sql_dialect(), query)
96 .map_err(BoxedError::new)
97 .context(ExternalSnafu)?;
98 Ok(if is_tql {
99 QueryType::Tql
100 } else {
101 QueryType::Sql
102 })
103}
104
105fn is_merge_mode_last_non_null(options: &HashMap<String, String>) -> bool {
106 options
107 .get(MERGE_MODE_KEY)
108 .map(|mode| mode.eq_ignore_ascii_case("last_non_null"))
109 .unwrap_or(false)
110}
111
112#[derive(Clone)]
113pub struct BatchingTask {
114 pub config: Arc<TaskConfig>,
115 pub state: Arc<RwLock<TaskState>>,
116}
117
118pub struct TaskArgs<'a> {
120 pub flow_id: FlowId,
121 pub query: &'a str,
122 pub plan: LogicalPlan,
123 pub time_window_expr: Option<TimeWindowExpr>,
124 pub expire_after: Option<i64>,
125 pub sink_table_name: [String; 3],
126 pub source_table_names: Vec<[String; 3]>,
127 pub query_ctx: QueryContextRef,
128 pub catalog_manager: CatalogManagerRef,
129 pub shutdown_rx: oneshot::Receiver<()>,
130 pub batch_opts: Arc<BatchingModeOptions>,
131 pub flow_eval_interval: Option<Duration>,
132}
133
134pub struct PlanInfo {
135 pub plan: LogicalPlan,
136 pub dirty_restore: DirtyRestore,
137 pub can_advance_checkpoints: bool,
138}
139
140pub enum DirtyRestore {
141 Scoped(FilterExprInfo),
144 Unscoped(DirtyTimeWindows),
151}
152
153impl BatchingTask {
154 #[allow(clippy::too_many_arguments)]
155 pub fn try_new(
156 TaskArgs {
157 flow_id,
158 query,
159 plan,
160 time_window_expr,
161 expire_after,
162 sink_table_name,
163 source_table_names,
164 query_ctx,
165 catalog_manager,
166 shutdown_rx,
167 batch_opts,
168 flow_eval_interval,
169 }: TaskArgs<'_>,
170 ) -> Result<Self, Error> {
171 Ok(Self {
172 config: Arc::new(TaskConfig {
173 flow_id,
174 query: query.to_string(),
175 time_window_expr,
176 expire_after,
177 sink_table_name,
178 source_table_names: source_table_names.into_iter().collect(),
179 catalog_manager,
180 output_schema: plan.schema().clone(),
181 query_type: determine_query_type(query, &query_ctx)?,
182 batch_opts,
183 flow_eval_interval,
184 }),
185 state: Arc::new(RwLock::new(TaskState::new(query_ctx, shutdown_rx))),
186 })
187 }
188
189 pub fn last_execution_time_millis(&self) -> Option<i64> {
190 self.state.read().unwrap().last_execution_time_millis()
191 }
192
193 pub fn mark_all_windows_as_dirty(&self) -> Result<(), Error> {
197 let now = SystemTime::now();
198 let now = Timestamp::new_second(
199 now.duration_since(UNIX_EPOCH)
200 .expect("Time went backwards")
201 .as_secs() as _,
202 );
203 let lower_bound = self
204 .config
205 .expire_after
206 .map(|e| now.sub_duration(Duration::from_secs(e as _)))
207 .transpose()
208 .map_err(BoxedError::new)
209 .context(ExternalSnafu)?
210 .unwrap_or(Timestamp::new_second(0));
211 debug!(
212 "Flow {} mark range ({:?}, {:?}) as dirty",
213 self.config.flow_id, lower_bound, now
214 );
215 self.state
216 .write()
217 .unwrap()
218 .dirty_time_windows
219 .add_window(lower_bound, Some(now));
220 Ok(())
221 }
222
223 pub async fn check_or_create_sink_table(
225 &self,
226 engine: &QueryEngineRef,
227 frontend_client: &Arc<FrontendClient>,
228 ) -> Result<Option<(usize, Duration)>, Error> {
229 if !self.is_table_exist(&self.config.sink_table_name).await? {
230 let create_table = self.gen_create_table_expr(engine.clone()).await?;
231 info!(
232 "Try creating sink table(if not exists) with expr: {:?}",
233 create_table
234 );
235 self.create_table(frontend_client, create_table).await?;
236 info!(
237 "Sink table {}(if not exists) created",
238 self.config.sink_table_name.join(".")
239 );
240 }
241
242 Ok(None)
243 }
244
245 async fn is_table_exist(&self, table_name: &[String; 3]) -> Result<bool, Error> {
246 self.config
247 .catalog_manager
248 .table_exists(&table_name[0], &table_name[1], &table_name[2], None)
249 .await
250 .map_err(BoxedError::new)
251 .context(ExternalSnafu)
252 }
253
254 pub async fn gen_exec_once(
255 &self,
256 engine: &QueryEngineRef,
257 frontend_client: &Arc<FrontendClient>,
258 max_window_cnt: Option<usize>,
259 ) -> Result<Option<(usize, Duration)>, Error> {
260 if let Some(new_query) = self.gen_insert_plan(engine, max_window_cnt).await? {
261 debug!("Generate new query: {}", new_query.plan);
262 let dirty_filter = match &new_query.dirty_restore {
263 DirtyRestore::Scoped(f) => Some(f),
264 _ => None,
265 };
266 match self
267 .execute_logical_plan(
268 frontend_client,
269 &new_query.plan,
270 dirty_filter,
271 new_query.can_advance_checkpoints,
272 )
273 .await
274 {
275 Ok(result) => Ok(result),
276 Err(err) => {
277 self.handle_executed_query_failure(Some(&new_query));
278 Err(err)
279 }
280 }
281 } else {
282 debug!("Generate no query");
283 Ok(None)
284 }
285 }
286
287 pub async fn gen_insert_plan(
288 &self,
289 engine: &QueryEngineRef,
290 max_window_cnt: Option<usize>,
291 ) -> Result<Option<PlanInfo>, Error> {
292 let (table, df_schema) = get_table_info_df_schema(
293 self.config.catalog_manager.clone(),
294 self.config.sink_table_name.clone(),
295 )
296 .await?;
297
298 let table_meta = &table.table_info().meta;
299 let merge_mode_last_non_null =
300 is_merge_mode_last_non_null(&table_meta.options.extra_options);
301 let primary_key_indices = table_meta.primary_key_indices.clone();
302
303 let new_query = self
304 .gen_query_with_time_window(
305 engine.clone(),
306 &table.table_info().meta.schema,
307 &primary_key_indices,
308 merge_mode_last_non_null,
309 max_window_cnt,
310 )
311 .await?;
312
313 let Some(new_query) = new_query else {
314 return Ok(None);
315 };
316
317 let table_columns = df_schema
320 .columns()
321 .into_iter()
322 .map(|c| c.name)
323 .collect::<BTreeSet<_>>();
324 for column in new_query.plan.schema().columns() {
325 if !table_columns.contains(column.name()) {
326 self.restore_dirty_windows_after_failure(&new_query);
327 return InvalidQuerySnafu {
328 reason: format!(
329 "Column {} not found in sink table with columns {:?}",
330 column, table_columns
331 ),
332 }
333 .fail();
334 }
335 }
336
337 let table_provider = Arc::new(DfTableProviderAdapter::new(table));
338 let table_source = Arc::new(DefaultTableSource::new(table_provider));
339
340 let plan = LogicalPlan::Dml(DmlStatement::new(
342 datafusion_common::TableReference::Full {
343 catalog: self.config.sink_table_name[0].clone().into(),
344 schema: self.config.sink_table_name[1].clone().into(),
345 table: self.config.sink_table_name[2].clone().into(),
346 },
347 table_source,
348 WriteOp::Insert(datafusion_expr::dml::InsertOp::Append),
349 Arc::new(new_query.plan.clone()),
350 ));
351 let insert_into_info = PlanInfo {
352 plan,
353 dirty_restore: new_query.dirty_restore,
354 can_advance_checkpoints: new_query.can_advance_checkpoints,
355 };
356 let insert_into =
357 match insert_into_info
358 .plan
359 .clone()
360 .recompute_schema()
361 .context(DatafusionSnafu {
362 context: "Failed to recompute schema",
363 }) {
364 Ok(insert_into) => insert_into,
365 Err(err) => {
366 self.restore_dirty_windows_after_failure(&insert_into_info);
367 return Err(err);
368 }
369 };
370
371 Ok(Some(PlanInfo {
372 plan: insert_into,
373 dirty_restore: insert_into_info.dirty_restore,
374 can_advance_checkpoints: insert_into_info.can_advance_checkpoints,
375 }))
376 }
377
378 pub async fn create_table(
379 &self,
380 frontend_client: &Arc<FrontendClient>,
381 expr: CreateTableExpr,
382 ) -> Result<(), Error> {
383 let catalog = &self.config.sink_table_name[0];
384 let schema = &self.config.sink_table_name[1];
385 frontend_client
386 .create(expr.clone(), catalog, schema)
387 .await?;
388 Ok(())
389 }
390
391 pub async fn execute_logical_plan(
392 &self,
393 frontend_client: &Arc<FrontendClient>,
394 plan: &LogicalPlan,
395 dirty_filter: Option<&FilterExprInfo>,
396 can_advance_checkpoints: bool,
397 ) -> Result<Option<(usize, Duration)>, Error> {
398 let instant = Instant::now();
399 let flow_id = self.config.flow_id;
400
401 debug!(
402 "Executing flow {flow_id}(expire_after={:?} secs) with query {}",
403 self.config.expire_after, &plan
404 );
405
406 let catalog = &self.config.sink_table_name[0];
407 let schema = &self.config.sink_table_name[1];
408
409 let plan = plan
411 .clone()
412 .transform_down_with_subqueries(|p| {
413 if let LogicalPlan::TableScan(mut table_scan) = p {
414 let resolved = table_scan.table_name.resolve(catalog, schema);
415 table_scan.table_name = resolved.into();
416 Ok(Transformed::yes(LogicalPlan::TableScan(table_scan)))
417 } else {
418 Ok(Transformed::no(p))
419 }
420 })
421 .with_context(|_| DatafusionSnafu {
422 context: format!("Failed to fix table ref in logical plan, plan={:?}", plan),
423 })?
424 .data;
425
426 let incremental_plan = if can_advance_checkpoints {
429 self.prepare_plan_for_incremental(&plan, dirty_filter)
430 .await?
431 } else {
432 None
433 };
434 let incremental_safe = incremental_plan.is_some();
435 let plan = incremental_plan.unwrap_or_else(|| plan.clone());
436
437 let extensions = self
438 .build_flow_query_extensions(incremental_safe, can_advance_checkpoints)
439 .await?;
440 let extension_refs = extensions
441 .iter()
442 .map(|(key, value)| (*key, value.as_str()))
443 .collect::<Vec<_>>();
444 let query_mode = if extensions
445 .iter()
446 .any(|(key, _)| *key == FLOW_INCREMENTAL_MODE)
447 {
448 CheckpointMode::Incremental
449 } else {
450 CheckpointMode::FullSnapshot
451 };
452 Self::record_query_mode(flow_id, query_mode);
453 debug!(
454 "Flow {flow_id} executing batching query with checkpoint_mode={}, extension_count={}",
455 checkpoint_mode_label(query_mode),
456 extensions.len()
457 );
458
459 let mut peer_desc = None;
460 let res = {
461 let _timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME
462 .with_label_values(&[flow_id.to_string().as_str()])
463 .start_timer();
464
465 let req = if let Some((insert_to, insert_plan)) =
466 breakup_insert_plan(&plan, catalog, schema)
467 {
468 let message = DFLogicalSubstraitConvertor {}
469 .encode(&insert_plan, DefaultSerializer)
470 .context(SubstraitEncodeLogicalPlanSnafu)?;
471 api::v1::QueryRequest {
472 query: Some(api::v1::query_request::Query::InsertIntoPlan(
473 api::v1::InsertIntoPlan {
474 table_name: Some(insert_to),
475 logical_plan: message.to_vec(),
476 },
477 )),
478 }
479 } else {
480 let message = DFLogicalSubstraitConvertor {}
481 .encode(&plan, DefaultSerializer)
482 .context(SubstraitEncodeLogicalPlanSnafu)?;
483
484 api::v1::QueryRequest {
485 query: Some(api::v1::query_request::Query::LogicalPlan(message.to_vec())),
486 }
487 };
488
489 frontend_client
490 .query_with_terminal_metrics(catalog, schema, req, &extension_refs, &mut peer_desc)
491 .await
492 };
493
494 let elapsed = instant.elapsed();
495 let peer_label = peer_desc
496 .as_ref()
497 .map(ToString::to_string)
498 .unwrap_or_else(|| PeerDesc::default().to_string());
499 if let Err(err) = &res {
500 warn!(
501 "Failed to execute Flow {flow_id} on frontend {peer_label}, result: {err:?}, elapsed: {:?} with query: {}",
502 elapsed, &plan
503 );
504 let decision = {
505 let mut state = self.state.write().unwrap();
506 let reason = Self::query_failure_reason(err);
507 Self::apply_query_failure_to_state(&mut state, elapsed, reason)
508 };
509 if let Some(decision) = decision {
510 Self::record_checkpoint_decision(flow_id, decision);
511 }
512 }
513
514 if elapsed >= self.config.batch_opts.slow_query_threshold {
516 warn!(
517 "Flow {flow_id} on frontend {peer_label} executed for {:?} before complete, query: {}",
518 elapsed, &plan
519 );
520 let flow_id = flow_id.to_string();
521 METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY
522 .with_label_values(&[flow_id.as_str(), peer_label.as_str()])
523 .observe(elapsed.as_secs_f64());
524 }
525
526 let res = res?;
527 let (affected_rows, _) = res.output.extract_rows_and_cost();
528 debug!(
529 "Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}, watermark: {:?}",
530 elapsed,
531 res.region_watermark_map()
532 );
533 METRIC_FLOW_ROWS
534 .with_label_values(&[format!("{}-out-batching", flow_id).as_str()])
535 .inc_by(affected_rows as _);
536 {
537 let mut state = self.state.write().unwrap();
538 let decision = Self::apply_query_result_to_state(
539 &mut state,
540 &res,
541 elapsed,
542 can_advance_checkpoints,
543 );
544 Self::record_checkpoint_decision(flow_id, decision);
545 }
546
547 Ok(Some((affected_rows, elapsed)))
548 }
549
550 fn restore_dirty_windows_after_failure(&self, query: &PlanInfo) {
554 match &query.dirty_restore {
555 DirtyRestore::Scoped(filter) => self.restore_scoped_dirty_windows(filter),
556 DirtyRestore::Unscoped(dirty_windows) => self
557 .state
558 .write()
559 .unwrap()
560 .dirty_time_windows
561 .add_dirty_windows(dirty_windows),
562 }
563 }
564
565 fn restore_scoped_dirty_windows(&self, filter: &FilterExprInfo) {
566 self.state
567 .write()
568 .unwrap()
569 .dirty_time_windows
570 .add_windows(filter.time_ranges.clone());
571 }
572
573 fn restore_scoped_dirty_windows_on_err<T>(
574 &self,
575 filter: &FilterExprInfo,
576 result: Result<T, Error>,
577 ) -> Result<T, Error> {
578 result.inspect_err(|_| {
579 self.restore_scoped_dirty_windows(filter);
580 })
581 }
582
583 fn handle_executed_query_failure(&self, query: Option<&PlanInfo>) {
584 if let Some(query) = query {
585 self.restore_dirty_windows_after_failure(query);
586 }
587 }
588
589 pub async fn start_executing_loop(
593 &self,
594 engine: QueryEngineRef,
595 frontend_client: Arc<FrontendClient>,
596 ) {
597 let flow_id_str = self.config.flow_id.to_string();
598 let mut max_window_cnt = None;
599 let mut interval = self
600 .config
601 .flow_eval_interval
602 .map(|d| tokio::time::interval(d));
603 if let Some(tick) = &mut interval {
604 tick.tick().await; }
606 loop {
607 {
610 let mut state = self.state.write().unwrap();
611 match state.shutdown_rx.try_recv() {
612 Ok(()) => break,
613 Err(TryRecvError::Closed) => {
614 warn!(
615 "Unexpected shutdown flow {}, shutdown anyway",
616 self.config.flow_id
617 );
618 break;
619 }
620 Err(TryRecvError::Empty) => (),
621 }
622 }
623 METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT
624 .with_label_values(&[&flow_id_str])
625 .inc();
626
627 let min_refresh = self.config.batch_opts.experimental_min_refresh_duration;
628
629 let new_query = match self.gen_insert_plan(&engine, max_window_cnt).await {
630 Ok(new_query) => new_query,
631 Err(err) => {
632 common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id);
633 tokio::time::sleep(min_refresh).await;
635 continue;
636 }
637 };
638
639 let res = if let Some(new_query) = &new_query {
640 let dirty_filter = match &new_query.dirty_restore {
641 DirtyRestore::Scoped(f) => Some(f),
642 _ => None,
643 };
644 self.execute_logical_plan(
645 &frontend_client,
646 &new_query.plan,
647 dirty_filter,
648 new_query.can_advance_checkpoints,
649 )
650 .await
651 } else {
652 Ok(None)
653 };
654
655 match res {
656 Ok(Some(_)) => {
658 max_window_cnt = max_window_cnt.map(|cnt| {
660 (cnt + 1).min(self.config.batch_opts.experimental_max_filter_num_per_query)
661 });
662
663 if let Some(eval_interval) = &mut interval {
665 eval_interval.tick().await;
666 } else {
667 let sleep_until = {
670 let state = self.state.write().unwrap();
671
672 let time_window_size = self
673 .config
674 .time_window_expr
675 .as_ref()
676 .and_then(|t| *t.time_window_size());
677
678 let prefer_short_incremental_cadence = state.checkpoint_mode()
679 == CheckpointMode::Incremental
680 && !state.is_incremental_disabled();
681
682 state.get_next_start_query_time(
683 self.config.flow_id,
684 &time_window_size,
685 min_refresh,
686 Some(self.config.batch_opts.query_timeout),
687 self.config.batch_opts.experimental_max_filter_num_per_query,
688 prefer_short_incremental_cadence,
689 )
690 };
691
692 tokio::time::sleep_until(sleep_until).await;
693 };
694 }
695 Ok(None) => {
697 debug!(
698 "Flow id = {:?} found no new data, sleep for {:?} then continue",
699 self.config.flow_id, min_refresh
700 );
701 tokio::time::sleep(min_refresh).await;
702 continue;
703 }
704 Err(err) => {
706 self.handle_executed_query_failure(new_query.as_ref());
707 METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT
708 .with_label_values(&[&flow_id_str])
709 .inc();
710 match new_query {
711 Some(query) => {
712 common_telemetry::error!(err; "Failed to execute query for flow={} with query: {}", self.config.flow_id, query.plan);
713 max_window_cnt = Some(1);
718 }
719 None => {
720 common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id)
721 }
722 }
723 tokio::time::sleep(min_refresh).await;
725 }
726 }
727 }
728 }
729
730 async fn gen_create_table_expr(
737 &self,
738 engine: QueryEngineRef,
739 ) -> Result<CreateTableExpr, Error> {
740 let query_ctx = self.state.read().unwrap().query_ctx.clone();
741 let plan =
742 sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, true).await?;
743 create_table_with_expr(&plan, &self.config.sink_table_name, &self.config.query_type)
744 }
745
746 async fn gen_query_with_time_window(
748 &self,
749 engine: QueryEngineRef,
750 sink_table_schema: &Arc<Schema>,
751 primary_key_indices: &[usize],
752 allow_partial: bool,
753 max_window_cnt: Option<usize>,
754 ) -> Result<Option<PlanInfo>, Error> {
755 let query_ctx = self.state.read().unwrap().query_ctx.clone();
756 let start = SystemTime::now();
757 let since_the_epoch = start
758 .duration_since(UNIX_EPOCH)
759 .expect("Time went backwards");
760 let low_bound = self
761 .config
762 .expire_after
763 .map(|e| since_the_epoch.as_secs() - e as u64)
764 .unwrap_or(u64::MIN);
765
766 let low_bound = Timestamp::new_second(low_bound as i64);
767
768 let expire_time_window_bound = self
769 .config
770 .time_window_expr
771 .as_ref()
772 .map(|expr| expr.eval(low_bound))
773 .transpose()?;
774
775 let (expire_lower_bound, expire_upper_bound) =
776 match (expire_time_window_bound, &self.config.query_type) {
777 (Some((Some(l), Some(u))), QueryType::Sql) => (l, u),
778 (None, QueryType::Sql) => {
779 debug!(
782 "Flow id = {:?}, no time window, using the same query",
783 self.config.flow_id
784 );
785 let (is_dirty, dirty_windows_to_restore) = {
787 let mut state = self.state.write().unwrap();
788 let dirty_windows_to_restore = state.dirty_time_windows.clone();
789 let is_dirty = !dirty_windows_to_restore.is_empty();
790 state.dirty_time_windows.clean();
791 (is_dirty, dirty_windows_to_restore)
792 };
793
794 if !is_dirty {
795 debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
797 return Ok(None);
798 }
799
800 let plan = match gen_plan_with_matching_schema(
801 &self.config.query,
802 query_ctx,
803 engine,
804 sink_table_schema.clone(),
805 primary_key_indices,
806 allow_partial,
807 )
808 .await
809 {
810 Ok(plan) => plan,
811 Err(err) => {
812 self.state
813 .write()
814 .unwrap()
815 .dirty_time_windows
816 .add_dirty_windows(&dirty_windows_to_restore);
817 return Err(err);
818 }
819 };
820
821 return Ok(Some(PlanInfo {
822 plan,
823 dirty_restore: DirtyRestore::Unscoped(dirty_windows_to_restore),
824 can_advance_checkpoints: true,
825 }));
826 }
827 _ => {
828 let dirty_windows_to_restore = {
831 let mut state = self.state.write().unwrap();
832 let dirty_windows_to_restore = state.dirty_time_windows.clone();
833 state.dirty_time_windows.clean();
834 dirty_windows_to_restore
835 };
836
837 let plan = match gen_plan_with_matching_schema(
838 &self.config.query,
839 query_ctx,
840 engine,
841 sink_table_schema.clone(),
842 primary_key_indices,
843 allow_partial,
844 )
845 .await
846 {
847 Ok(plan) => plan,
848 Err(err) => {
849 self.state
850 .write()
851 .unwrap()
852 .dirty_time_windows
853 .add_dirty_windows(&dirty_windows_to_restore);
854 return Err(err);
855 }
856 };
857
858 return Ok(Some(PlanInfo {
859 plan,
860 dirty_restore: DirtyRestore::Unscoped(dirty_windows_to_restore),
861 can_advance_checkpoints: true,
862 }));
863 }
864 };
865
866 debug!(
867 "Flow id = {:?}, found time window: precise_lower_bound={:?}, precise_upper_bound={:?} with dirty time windows: {:?}",
868 self.config.flow_id,
869 expire_lower_bound,
870 expire_upper_bound,
871 self.state.read().unwrap().dirty_time_windows
872 );
873 let window_size = expire_upper_bound
874 .sub(&expire_lower_bound)
875 .with_context(|| UnexpectedSnafu {
876 reason: format!(
877 "Can't get window size from {expire_upper_bound:?} - {expire_lower_bound:?}"
878 ),
879 })?;
880 let col_name = self
881 .config
882 .time_window_expr
883 .as_ref()
884 .map(|expr| expr.column_name.clone())
885 .with_context(|| UnexpectedSnafu {
886 reason: format!(
887 "Flow id={:?}, Failed to get column name from time window expr",
888 self.config.flow_id
889 ),
890 })?;
891
892 let (expr, can_advance_checkpoints) = {
893 let mut state = self.state.write().unwrap();
894 let window_cnt = if state.checkpoint_mode() == CheckpointMode::Incremental
895 && !state.is_incremental_disabled()
896 && matches!(self.config.query_type, QueryType::Sql)
897 {
898 MAX_INCREMENTAL_DIRTY_WINDOW_FILTERS
904 } else {
905 max_window_cnt
906 .unwrap_or(self.config.batch_opts.experimental_max_filter_num_per_query)
907 };
908 let expr = state.dirty_time_windows.gen_filter_exprs(
909 &col_name,
910 Some(expire_lower_bound),
911 window_size,
912 window_cnt,
913 self.config.flow_id,
914 Some(self),
915 )?;
916 let can_advance_checkpoints = state.dirty_time_windows.is_empty();
917 (expr, can_advance_checkpoints)
918 };
919
920 let Some(expr) = expr else {
921 debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
923 return Ok(None);
924 };
925
926 let filter_sql = expr_to_sql(&expr.expr)
927 .map(|sql| sql.to_string())
928 .unwrap_or_else(|err| format!("<failed to format filter expr: {err}>"));
929
930 debug!(
931 "Flow id={:?}, Generated filter expr: {:?}",
932 self.config.flow_id, filter_sql
933 );
934
935 let mut add_filter = AddFilterRewriter::new(expr.expr.clone());
936 let mut add_auto_column = ColumnMatcherRewriter::new(
937 sink_table_schema.clone(),
938 primary_key_indices.to_vec(),
939 allow_partial,
940 );
941
942 let plan = self.restore_scoped_dirty_windows_on_err(
943 &expr,
944 sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false).await,
945 )?;
946 let rewrite = self.restore_scoped_dirty_windows_on_err(
947 &expr,
948 plan.clone()
949 .rewrite(&mut add_filter)
950 .and_then(|p| p.data.rewrite(&mut add_auto_column))
951 .with_context(|_| DatafusionSnafu {
952 context: format!("Failed to rewrite plan:\n {}\n", plan),
953 })
954 .map(|rewrite| rewrite.data),
955 )?;
956 let new_plan = self.restore_scoped_dirty_windows_on_err(
958 &expr,
959 apply_df_optimizer(rewrite, &query_ctx).await,
960 )?;
961
962 let info = PlanInfo {
963 plan: new_plan.clone(),
964 dirty_restore: DirtyRestore::Scoped(expr),
965 can_advance_checkpoints,
966 };
967
968 Ok(Some(info))
969 }
970}
971
972#[cfg(test)]
973mod test;