Skip to main content

flow/batching_mode/
task.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeSet, HashMap, HashSet};
16use std::sync::{Arc, RwLock};
17use std::time::{Duration, SystemTime, UNIX_EPOCH};
18
19use api::v1::CreateTableExpr;
20use catalog::CatalogManagerRef;
21use common_error::ext::BoxedError;
22use common_query::logical_plan::breakup_insert_plan;
23use common_telemetry::tracing::warn;
24use common_telemetry::{debug, info};
25use common_time::Timestamp;
26use datafusion::datasource::DefaultTableSource;
27use datafusion::sql::unparser::expr_to_sql;
28use datafusion_common::DFSchemaRef;
29use datafusion_common::tree_node::{Transformed, TreeNode};
30use datafusion_expr::{DmlStatement, LogicalPlan, WriteOp, col, lit};
31use datatypes::schema::Schema;
32use query::QueryEngineRef;
33use query::options::FLOW_INCREMENTAL_MODE;
34use query::query_engine::DefaultSerializer;
35use session::context::QueryContextRef;
36use snafu::{OptionExt, ResultExt};
37use sql::parsers::utils::is_tql;
38use store_api::mito_engine_options::MERGE_MODE_KEY;
39use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
40use table::table::adapter::DfTableProviderAdapter;
41use tokio::sync::oneshot::error::TryRecvError;
42use tokio::sync::{Mutex, oneshot};
43use tokio::time::Instant;
44
45use crate::batching_mode::BatchingModeOptions;
46use crate::batching_mode::checkpoint::checkpoint_mode_label;
47use crate::batching_mode::frontend_client::{FrontendClient, PeerDesc};
48use crate::batching_mode::state::{
49    CheckpointMode, DirtyTimeWindows, FilterExprInfo, TaskState, to_df_literal,
50};
51use crate::batching_mode::table_creator::{QueryType, create_table_with_expr};
52use crate::batching_mode::time_window::TimeWindowExpr;
53use crate::batching_mode::utils::{
54    AddFilterRewriter, ColumnMatcherRewriter, gen_plan_with_matching_schema,
55    get_table_info_df_schema, sql_to_df_plan,
56};
57use crate::df_optimizer::apply_df_optimizer;
58use crate::error::{
59    DatafusionSnafu, ExternalSnafu, InvalidQuerySnafu, SubstraitEncodeLogicalPlanSnafu,
60    UnexpectedSnafu,
61};
62use crate::metrics::{
63    METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME,
64    METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT,
65    METRIC_FLOW_ROWS,
66};
67use crate::{Error, FlowId};
68
69mod ckpt;
70mod inc;
71
72/// The task's config, immutable once created
73#[derive(Clone)]
74pub struct TaskConfig {
75    pub flow_id: FlowId,
76    pub query: String,
77    /// output schema of the query
78    pub output_schema: DFSchemaRef,
79    pub time_window_expr: Option<TimeWindowExpr>,
80    /// in seconds
81    pub expire_after: Option<i64>,
82    pub sink_table_name: [String; 3],
83    pub source_table_names: HashSet<[String; 3]>,
84    pub catalog_manager: CatalogManagerRef,
85    pub query_type: QueryType,
86    pub batch_opts: Arc<BatchingModeOptions>,
87    pub flow_eval_interval: Option<Duration>,
88}
89
90fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<QueryType, Error> {
91    let is_tql = is_tql(query_ctx.sql_dialect(), query)
92        .map_err(BoxedError::new)
93        .context(ExternalSnafu)?;
94    Ok(if is_tql {
95        QueryType::Tql
96    } else {
97        QueryType::Sql
98    })
99}
100
101fn is_merge_mode_last_non_null(options: &HashMap<String, String>) -> bool {
102    options
103        .get(MERGE_MODE_KEY)
104        .map(|mode| mode.eq_ignore_ascii_case("last_non_null"))
105        .unwrap_or(false)
106}
107
108#[derive(Clone)]
109pub struct BatchingTask {
110    pub config: Arc<TaskConfig>,
111    pub state: Arc<RwLock<TaskState>>,
112    /// Serializes plan generation, execution, checkpoint advancement, and dirty
113    /// window restoration for this flow. Without this, a manual flush and the
114    /// background loop can process the same checkpoint range concurrently.
115    execution_lock: Arc<Mutex<()>>,
116}
117
118/// Arguments for creating batching task
119pub struct TaskArgs<'a> {
120    pub flow_id: FlowId,
121    pub query: &'a str,
122    pub plan: LogicalPlan,
123    pub time_window_expr: Option<TimeWindowExpr>,
124    pub expire_after: Option<i64>,
125    pub sink_table_name: [String; 3],
126    pub source_table_names: Vec<[String; 3]>,
127    pub query_ctx: QueryContextRef,
128    pub catalog_manager: CatalogManagerRef,
129    pub shutdown_rx: oneshot::Receiver<()>,
130    pub batch_opts: Arc<BatchingModeOptions>,
131    pub flow_eval_interval: Option<Duration>,
132}
133
134pub struct PlanInfo {
135    pub plan: LogicalPlan,
136    pub dirty_restore: DirtyRestore,
137    pub can_advance_checkpoints: bool,
138}
139
140pub enum DirtyRestore {
141    /// The query was scoped to dirty time ranges; restore those ranges if the
142    /// run fails.
143    Scoped(FilterExprInfo),
144    /// The query could not be scoped to dirty time ranges, so the dirty-window
145    /// state is only a dirty signal. Restore the consumed signal if the full
146    /// run fails.
147    ///
148    /// TODO(discord9): Full-query runs only need a dirty bool flag. Refactor
149    /// the unscoped path to stop reusing `DirtyTimeWindows` for this signal.
150    Unscoped(DirtyTimeWindows),
151}
152
153struct ExecuteOnceOutcome {
154    new_query: Option<PlanInfo>,
155    /// Execution result of the generated insert plan.
156    ///
157    /// `Ok(Some((affected_rows, elapsed)))` means a query was executed.
158    /// `Ok(None)` means no query was generated because there was no dirty signal.
159    /// `Err(_)` means plan generation or execution failed.
160    result: Result<Option<(usize, Duration)>, Error>,
161}
162
163impl BatchingTask {
164    #[allow(clippy::too_many_arguments)]
165    pub fn try_new(
166        TaskArgs {
167            flow_id,
168            query,
169            plan,
170            time_window_expr,
171            expire_after,
172            sink_table_name,
173            source_table_names,
174            query_ctx,
175            catalog_manager,
176            shutdown_rx,
177            batch_opts,
178            flow_eval_interval,
179        }: TaskArgs<'_>,
180    ) -> Result<Self, Error> {
181        let mut state = TaskState::with_dirty_time_windows(
182            query_ctx.clone(),
183            shutdown_rx,
184            DirtyTimeWindows::new(
185                batch_opts.experimental_max_filter_num_per_query,
186                batch_opts.experimental_time_window_merge_threshold,
187            ),
188        );
189        if !batch_opts.experimental_enable_incremental_read {
190            state.disable_incremental();
191        }
192
193        Ok(Self {
194            config: Arc::new(TaskConfig {
195                flow_id,
196                query: query.to_string(),
197                time_window_expr,
198                expire_after,
199                sink_table_name,
200                source_table_names: source_table_names.into_iter().collect(),
201                catalog_manager,
202                output_schema: plan.schema().clone(),
203                query_type: determine_query_type(query, &query_ctx)?,
204                batch_opts,
205                flow_eval_interval,
206            }),
207            state: Arc::new(RwLock::new(state)),
208            execution_lock: Arc::new(Mutex::new(())),
209        })
210    }
211
212    pub fn last_execution_time_millis(&self) -> Option<i64> {
213        self.state.read().unwrap().last_execution_time_millis()
214    }
215
216    /// mark time window range (now - expire_after, now) as dirty (or (0, now) if expire_after not set)
217    ///
218    /// useful for flush_flow to flush dirty time windows range
219    pub fn mark_all_windows_as_dirty(&self) -> Result<(), Error> {
220        let now = SystemTime::now();
221        let now = Timestamp::new_second(
222            now.duration_since(UNIX_EPOCH)
223                .expect("Time went backwards")
224                .as_secs() as _,
225        );
226        let lower_bound = self
227            .config
228            .expire_after
229            .map(|e| now.sub_duration(Duration::from_secs(e as _)))
230            .transpose()
231            .map_err(BoxedError::new)
232            .context(ExternalSnafu)?
233            .unwrap_or(Timestamp::new_second(0));
234        debug!(
235            "Flow {} mark range ({:?}, {:?}) as dirty",
236            self.config.flow_id, lower_bound, now
237        );
238        self.state
239            .write()
240            .unwrap()
241            .dirty_time_windows
242            .add_window(lower_bound, Some(now));
243        Ok(())
244    }
245
246    /// Create sink table if not exists
247    pub async fn check_or_create_sink_table(
248        &self,
249        engine: &QueryEngineRef,
250        frontend_client: &Arc<FrontendClient>,
251    ) -> Result<Option<(usize, Duration)>, Error> {
252        if !self.is_table_exist(&self.config.sink_table_name).await? {
253            let create_table = self.gen_create_table_expr(engine.clone()).await?;
254            info!(
255                "Try creating sink table(if not exists) with expr: {:?}",
256                create_table
257            );
258            self.create_table(frontend_client, create_table).await?;
259            info!(
260                "Sink table {}(if not exists) created",
261                self.config.sink_table_name.join(".")
262            );
263        }
264
265        Ok(None)
266    }
267
268    async fn is_table_exist(&self, table_name: &[String; 3]) -> Result<bool, Error> {
269        self.config
270            .catalog_manager
271            .table_exists(&table_name[0], &table_name[1], &table_name[2], None)
272            .await
273            .map_err(BoxedError::new)
274            .context(ExternalSnafu)
275    }
276
277    pub(crate) async fn execute_once_serialized(
278        &self,
279        engine: &QueryEngineRef,
280        frontend_client: &Arc<FrontendClient>,
281        max_window_cnt: Option<usize>,
282    ) -> Result<Option<(usize, Duration)>, Error> {
283        let outcome = self
284            .execute_once_serialized_with_outcome(engine, frontend_client, max_window_cnt)
285            .await;
286        outcome.result
287    }
288
289    /// Executes one flow evaluation under `execution_lock` and keeps the
290    /// generated query context for the background loop's error logging/backoff.
291    async fn execute_once_serialized_with_outcome(
292        &self,
293        engine: &QueryEngineRef,
294        frontend_client: &Arc<FrontendClient>,
295        max_window_cnt: Option<usize>,
296    ) -> ExecuteOnceOutcome {
297        let _execution_guard = self.execution_lock.lock().await;
298        self.execute_once_unlocked(engine, frontend_client, max_window_cnt)
299            .await
300    }
301
302    /// Executes one flow evaluation. Caller must hold `execution_lock`.
303    async fn execute_once_unlocked(
304        &self,
305        engine: &QueryEngineRef,
306        frontend_client: &Arc<FrontendClient>,
307        max_window_cnt: Option<usize>,
308    ) -> ExecuteOnceOutcome {
309        let new_query = match self.gen_insert_plan_unlocked(engine, max_window_cnt).await {
310            Ok(new_query) => new_query,
311            Err(err) => {
312                return ExecuteOnceOutcome {
313                    new_query: None,
314                    result: Err(err),
315                };
316            }
317        };
318
319        if let Some(new_query) = new_query {
320            debug!("Generate new query: {}", new_query.plan);
321            let res = self
322                .execute_logical_plan_unlocked(
323                    frontend_client,
324                    &new_query.plan,
325                    new_query.can_advance_checkpoints,
326                )
327                .await;
328            if res.is_err() {
329                self.handle_executed_query_failure(Some(&new_query));
330            }
331            ExecuteOnceOutcome {
332                new_query: Some(new_query),
333                result: res,
334            }
335        } else {
336            debug!("Generate no query");
337            ExecuteOnceOutcome {
338                new_query: None,
339                result: Ok(None),
340            }
341        }
342    }
343
344    /// Generates the insert plan. Caller must reach this through the serialized path.
345    async fn gen_insert_plan_unlocked(
346        &self,
347        engine: &QueryEngineRef,
348        max_window_cnt: Option<usize>,
349    ) -> Result<Option<PlanInfo>, Error> {
350        let (table, df_schema) = get_table_info_df_schema(
351            self.config.catalog_manager.clone(),
352            self.config.sink_table_name.clone(),
353        )
354        .await?;
355
356        let table_meta = &table.table_info().meta;
357        let merge_mode_last_non_null =
358            is_merge_mode_last_non_null(&table_meta.options.extra_options);
359        let primary_key_indices = table_meta.primary_key_indices.clone();
360
361        let new_query = self
362            .gen_query_with_time_window(
363                engine.clone(),
364                &table.table_info().meta.schema,
365                &primary_key_indices,
366                merge_mode_last_non_null,
367                max_window_cnt,
368            )
369            .await?;
370
371        let Some(new_query) = new_query else {
372            return Ok(None);
373        };
374
375        // first check if all columns in input query exists in sink table
376        // since insert into ref to names in record batch generate by given query
377        let table_columns = df_schema
378            .columns()
379            .into_iter()
380            .map(|c| c.name)
381            .collect::<BTreeSet<_>>();
382        for column in new_query.plan.schema().columns() {
383            if !table_columns.contains(column.name()) {
384                self.restore_dirty_windows_after_failure(&new_query);
385                return InvalidQuerySnafu {
386                    reason: format!(
387                        "Column {} not found in sink table with columns {:?}",
388                        column, table_columns
389                    ),
390                }
391                .fail();
392            }
393        }
394
395        let table_provider = Arc::new(DfTableProviderAdapter::new(table));
396        let table_source = Arc::new(DefaultTableSource::new(table_provider));
397
398        // update_at& time index placeholder (if exists) should have default value
399        let plan = LogicalPlan::Dml(DmlStatement::new(
400            datafusion_common::TableReference::Full {
401                catalog: self.config.sink_table_name[0].clone().into(),
402                schema: self.config.sink_table_name[1].clone().into(),
403                table: self.config.sink_table_name[2].clone().into(),
404            },
405            table_source,
406            WriteOp::Insert(datafusion_expr::dml::InsertOp::Append),
407            Arc::new(new_query.plan.clone()),
408        ));
409        let insert_into_info = PlanInfo {
410            plan,
411            dirty_restore: new_query.dirty_restore,
412            can_advance_checkpoints: new_query.can_advance_checkpoints,
413        };
414        let insert_into =
415            match insert_into_info
416                .plan
417                .clone()
418                .recompute_schema()
419                .context(DatafusionSnafu {
420                    context: "Failed to recompute schema",
421                }) {
422                Ok(insert_into) => insert_into,
423                Err(err) => {
424                    self.restore_dirty_windows_after_failure(&insert_into_info);
425                    return Err(err);
426                }
427            };
428
429        Ok(Some(PlanInfo {
430            plan: insert_into,
431            dirty_restore: insert_into_info.dirty_restore,
432            can_advance_checkpoints: insert_into_info.can_advance_checkpoints,
433        }))
434    }
435
436    pub async fn create_table(
437        &self,
438        frontend_client: &Arc<FrontendClient>,
439        expr: CreateTableExpr,
440    ) -> Result<(), Error> {
441        let catalog = &self.config.sink_table_name[0];
442        let schema = &self.config.sink_table_name[1];
443        frontend_client
444            .create(expr.clone(), catalog, schema)
445            .await?;
446        Ok(())
447    }
448
449    /// Executes the insert plan. Caller must reach this through the serialized path.
450    async fn execute_logical_plan_unlocked(
451        &self,
452        frontend_client: &Arc<FrontendClient>,
453        plan: &LogicalPlan,
454        can_advance_checkpoints: bool,
455    ) -> Result<Option<(usize, Duration)>, Error> {
456        let instant = Instant::now();
457        let flow_id = self.config.flow_id;
458
459        debug!(
460            "Executing flow {flow_id}(expire_after={:?} secs) with query {}",
461            self.config.expire_after, &plan
462        );
463
464        let catalog = &self.config.sink_table_name[0];
465        let schema = &self.config.sink_table_name[1];
466
467        // fix all table ref by make it fully qualified, i.e. "table_name" => "catalog_name.schema_name.table_name"
468        let plan = plan
469            .clone()
470            .transform_down_with_subqueries(|p| {
471                if let LogicalPlan::TableScan(mut table_scan) = p {
472                    let resolved = table_scan.table_name.resolve(catalog, schema);
473                    table_scan.table_name = resolved.into();
474                    Ok(Transformed::yes(LogicalPlan::TableScan(table_scan)))
475                } else {
476                    Ok(Transformed::no(p))
477                }
478            })
479            .with_context(|_| DatafusionSnafu {
480                context: format!("Failed to fix table ref in logical plan, plan={:?}", plan),
481            })?
482            .data;
483
484        // For incremental-mode SQL queries, attempt to rewrite the delta aggregate
485        // plan into a safe delta-LEFT-JOIN-sink form before deciding on extensions.
486        let incremental_plan = if can_advance_checkpoints {
487            self.prepare_plan_for_incremental(&plan).await?
488        } else {
489            None
490        };
491        let incremental_safe = incremental_plan.is_some();
492        let plan = incremental_plan.unwrap_or_else(|| plan.clone());
493
494        let extensions = self
495            .build_flow_query_extensions(incremental_safe, can_advance_checkpoints)
496            .await?;
497        let extension_refs = extensions
498            .iter()
499            .map(|(key, value)| (*key, value.as_str()))
500            .collect::<Vec<_>>();
501        let query_mode = if extensions
502            .iter()
503            .any(|(key, _)| *key == FLOW_INCREMENTAL_MODE)
504        {
505            CheckpointMode::Incremental
506        } else {
507            CheckpointMode::FullSnapshot
508        };
509        Self::record_query_mode(flow_id, query_mode);
510        debug!(
511            "Flow {flow_id} executing batching query with checkpoint_mode={}, extension_count={}",
512            checkpoint_mode_label(query_mode),
513            extensions.len()
514        );
515
516        let mut peer_desc = None;
517        let res = {
518            let _timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME
519                .with_label_values(&[flow_id.to_string().as_str()])
520                .start_timer();
521
522            let req = if let Some((insert_to, insert_plan)) =
523                breakup_insert_plan(&plan, catalog, schema)
524            {
525                let message = DFLogicalSubstraitConvertor {}
526                    .encode(&insert_plan, DefaultSerializer)
527                    .context(SubstraitEncodeLogicalPlanSnafu)?;
528                api::v1::QueryRequest {
529                    query: Some(api::v1::query_request::Query::InsertIntoPlan(
530                        api::v1::InsertIntoPlan {
531                            table_name: Some(insert_to),
532                            logical_plan: message.to_vec(),
533                        },
534                    )),
535                }
536            } else {
537                let message = DFLogicalSubstraitConvertor {}
538                    .encode(&plan, DefaultSerializer)
539                    .context(SubstraitEncodeLogicalPlanSnafu)?;
540
541                api::v1::QueryRequest {
542                    query: Some(api::v1::query_request::Query::LogicalPlan(message.to_vec())),
543                }
544            };
545
546            frontend_client
547                .query_with_terminal_metrics(catalog, schema, req, &extension_refs, &mut peer_desc)
548                .await
549        };
550
551        let elapsed = instant.elapsed();
552        let peer_label = peer_desc
553            .as_ref()
554            .map(ToString::to_string)
555            .unwrap_or_else(|| PeerDesc::default().to_string());
556        if let Err(err) = &res {
557            warn!(
558                "Failed to execute Flow {flow_id} on frontend {peer_label}, result: {err:?}, elapsed: {:?} with query: {}",
559                elapsed, &plan
560            );
561            let decision = {
562                let mut state = self.state.write().unwrap();
563                let reason = Self::query_failure_reason(err);
564                Self::apply_query_failure_to_state(&mut state, elapsed, reason)
565            };
566            if let Some(decision) = decision {
567                Self::record_checkpoint_decision(flow_id, decision);
568            }
569        }
570
571        // record slow query
572        if elapsed >= self.config.batch_opts.slow_query_threshold {
573            warn!(
574                "Flow {flow_id} on frontend {peer_label} executed for {:?} before complete, query: {}",
575                elapsed, &plan
576            );
577            let flow_id = flow_id.to_string();
578            METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY
579                .with_label_values(&[flow_id.as_str(), peer_label.as_str()])
580                .observe(elapsed.as_secs_f64());
581        }
582
583        let res = res?;
584        let (affected_rows, _) = res.output.extract_rows_and_cost();
585        debug!(
586            "Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}, watermark: {:?}",
587            elapsed,
588            res.region_watermark_map()
589        );
590        METRIC_FLOW_ROWS
591            .with_label_values(&[format!("{}-out-batching", flow_id).as_str()])
592            .inc_by(affected_rows as _);
593        {
594            let mut state = self.state.write().unwrap();
595            let decision = Self::apply_query_result_to_state(
596                &mut state,
597                &res,
598                elapsed,
599                can_advance_checkpoints,
600            );
601            Self::record_checkpoint_decision(flow_id, decision);
602        }
603
604        Ok(Some((affected_rows, elapsed)))
605    }
606
607    /// Restore dirty windows consumed by a failed query so they are retried on
608    /// the next execution.
609    ///
610    fn restore_dirty_windows_after_failure(&self, query: &PlanInfo) {
611        match &query.dirty_restore {
612            DirtyRestore::Scoped(filter) => self.restore_scoped_dirty_windows(filter),
613            DirtyRestore::Unscoped(dirty_windows) => self
614                .state
615                .write()
616                .unwrap()
617                .dirty_time_windows
618                .add_dirty_windows(dirty_windows),
619        }
620    }
621
622    fn restore_scoped_dirty_windows(&self, filter: &FilterExprInfo) {
623        self.state
624            .write()
625            .unwrap()
626            .dirty_time_windows
627            .add_windows(filter.time_ranges.clone());
628    }
629
630    fn restore_scoped_dirty_windows_on_err<T>(
631        &self,
632        filter: &FilterExprInfo,
633        result: Result<T, Error>,
634    ) -> Result<T, Error> {
635        result.inspect_err(|_| {
636            self.restore_scoped_dirty_windows(filter);
637        })
638    }
639
640    fn restore_unscoped_dirty_windows(&self, dirty_windows: &DirtyTimeWindows) {
641        self.state
642            .write()
643            .unwrap()
644            .dirty_time_windows
645            .add_dirty_windows(dirty_windows);
646    }
647
648    fn restore_unscoped_dirty_windows_on_err<T>(
649        &self,
650        dirty_windows: &DirtyTimeWindows,
651        result: Result<T, Error>,
652    ) -> Result<T, Error> {
653        result.inspect_err(|_| {
654            self.restore_unscoped_dirty_windows(dirty_windows);
655        })
656    }
657
658    fn drain_dirty_windows_signal(&self) -> (bool, DirtyTimeWindows) {
659        let mut state = self.state.write().unwrap();
660        let dirty_windows_to_restore = state.dirty_time_windows.clone();
661        let is_dirty = !dirty_windows_to_restore.is_empty();
662        state.dirty_time_windows.clean();
663        (is_dirty, dirty_windows_to_restore)
664    }
665
666    #[allow(clippy::too_many_arguments)]
667    async fn gen_unfiltered_plan_info(
668        &self,
669        engine: QueryEngineRef,
670        query_ctx: QueryContextRef,
671        sink_table_schema: Arc<Schema>,
672        primary_key_indices: &[usize],
673        allow_partial: bool,
674        dirty_windows_to_restore: DirtyTimeWindows,
675        retention_filter: Option<(&str, Timestamp, &'static str)>,
676    ) -> Result<PlanInfo, Error> {
677        let mut plan = self.restore_unscoped_dirty_windows_on_err(
678            &dirty_windows_to_restore,
679            gen_plan_with_matching_schema(
680                &self.config.query,
681                query_ctx,
682                engine,
683                sink_table_schema,
684                primary_key_indices,
685                allow_partial,
686            )
687            .await,
688        )?;
689
690        if let Some((col_name, lower_bound, context)) = retention_filter {
691            let lower = self.restore_unscoped_dirty_windows_on_err(
692                &dirty_windows_to_restore,
693                to_df_literal(lower_bound),
694            )?;
695            let retention_filter = col(col_name).gt_eq(lit(lower));
696            let mut add_filter = AddFilterRewriter::new(retention_filter);
697            plan = self.restore_unscoped_dirty_windows_on_err(
698                &dirty_windows_to_restore,
699                plan.clone()
700                    .rewrite(&mut add_filter)
701                    .with_context(|_| DatafusionSnafu {
702                        context: format!(
703                            "Failed to apply {context} expire_after filter to plan:\n {}\n",
704                            plan
705                        ),
706                    })
707                    .map(|rewrite| rewrite.data),
708            )?;
709        }
710
711        Ok(PlanInfo {
712            plan,
713            dirty_restore: DirtyRestore::Unscoped(dirty_windows_to_restore),
714            can_advance_checkpoints: true,
715        })
716    }
717
718    async fn gen_unfiltered_plan_info_if_dirty(
719        &self,
720        engine: QueryEngineRef,
721        query_ctx: QueryContextRef,
722        sink_table_schema: Arc<Schema>,
723        primary_key_indices: &[usize],
724        allow_partial: bool,
725        retention_filter: Option<(&str, Timestamp, &'static str)>,
726    ) -> Result<Option<PlanInfo>, Error> {
727        let (is_dirty, dirty_windows_to_restore) = self.drain_dirty_windows_signal();
728        if !is_dirty {
729            debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
730            return Ok(None);
731        }
732
733        self.gen_unfiltered_plan_info(
734            engine,
735            query_ctx,
736            sink_table_schema,
737            primary_key_indices,
738            allow_partial,
739            dirty_windows_to_restore,
740            retention_filter,
741        )
742        .await
743        .map(Some)
744    }
745
746    fn handle_executed_query_failure(&self, query: Option<&PlanInfo>) {
747        if let Some(query) = query {
748            self.restore_dirty_windows_after_failure(query);
749        }
750    }
751
752    /// start executing query in a loop, break when receive shutdown signal
753    ///
754    /// any error will be logged when executing query
755    pub async fn start_executing_loop(
756        &self,
757        engine: QueryEngineRef,
758        frontend_client: Arc<FrontendClient>,
759    ) {
760        let flow_id_str = self.config.flow_id.to_string();
761        let mut max_window_cnt = None;
762        let mut interval = self
763            .config
764            .flow_eval_interval
765            .map(|d| tokio::time::interval(d));
766        if let Some(tick) = &mut interval {
767            tick.tick().await; // pass the first tick immediately
768        }
769        loop {
770            // first check if shutdown signal is received
771            // if so, break the loop
772            {
773                let mut state = self.state.write().unwrap();
774                match state.shutdown_rx.try_recv() {
775                    Ok(()) => break,
776                    Err(TryRecvError::Closed) => {
777                        warn!(
778                            "Unexpected shutdown flow {}, shutdown anyway",
779                            self.config.flow_id
780                        );
781                        break;
782                    }
783                    Err(TryRecvError::Empty) => (),
784                }
785            }
786            METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT
787                .with_label_values(&[&flow_id_str])
788                .inc();
789
790            let min_refresh = self.config.batch_opts.experimental_min_refresh_duration;
791
792            let outcome = self
793                .execute_once_serialized_with_outcome(&engine, &frontend_client, max_window_cnt)
794                .await;
795
796            match outcome.result {
797                // normal execute, sleep for some time before doing next query
798                Ok(Some(_)) => {
799                    // can increase max_window_cnt to query more windows next time
800                    max_window_cnt = max_window_cnt.map(|cnt| {
801                        (cnt + 1).min(self.config.batch_opts.experimental_max_filter_num_per_query)
802                    });
803
804                    // here use proper ticking if set eval interval
805                    if let Some(eval_interval) = &mut interval {
806                        eval_interval.tick().await;
807                    } else {
808                        // if not explicitly set, just automatically calculate next start time
809                        // using time window size and more args
810                        let sleep_until = {
811                            let state = self.state.write().unwrap();
812
813                            let time_window_size = self
814                                .config
815                                .time_window_expr
816                                .as_ref()
817                                .and_then(|t| *t.time_window_size());
818
819                            let prefer_short_incremental_cadence = state.checkpoint_mode()
820                                == CheckpointMode::Incremental
821                                && !state.is_incremental_disabled();
822
823                            state.get_next_start_query_time(
824                                self.config.flow_id,
825                                &time_window_size,
826                                min_refresh,
827                                Some(self.config.batch_opts.query_timeout),
828                                self.config.batch_opts.experimental_max_filter_num_per_query,
829                                prefer_short_incremental_cadence,
830                            )
831                        };
832
833                        tokio::time::sleep_until(sleep_until).await;
834                    };
835                }
836                // no new data, sleep for some time before checking for new data
837                Ok(None) => {
838                    debug!(
839                        "Flow id = {:?} found no new data, sleep for {:?} then continue",
840                        self.config.flow_id, min_refresh
841                    );
842                    tokio::time::sleep(min_refresh).await;
843                    continue;
844                }
845                // TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed
846                Err(err) => {
847                    METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT
848                        .with_label_values(&[&flow_id_str])
849                        .inc();
850                    match outcome.new_query {
851                        Some(query) => {
852                            common_telemetry::error!(err; "Failed to execute query for flow={} with query: {}", self.config.flow_id, query.plan);
853                            // TODO(discord9): add some backoff here? half the query time window or what
854                            // backoff meaning use smaller `max_window_cnt` for next query
855
856                            // since last query failed, we should not try to query too many windows
857                            max_window_cnt = Some(1);
858                        }
859                        None => {
860                            common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id)
861                        }
862                    }
863                    // also sleep for a little while before try again to prevent flooding logs
864                    tokio::time::sleep(min_refresh).await;
865                }
866            }
867        }
868    }
869
870    /// Generate the create table SQL
871    ///
872    /// the auto created table will automatically added a `update_at` Milliseconds DEFAULT now() column in the end
873    /// (for compatibility with flow streaming mode)
874    ///
875    /// and it will use first timestamp column as time index, all other columns will be added as normal columns and nullable
876    async fn gen_create_table_expr(
877        &self,
878        engine: QueryEngineRef,
879    ) -> Result<CreateTableExpr, Error> {
880        let query_ctx = self.state.read().unwrap().query_ctx.clone();
881        let plan =
882            sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, true).await?;
883        create_table_with_expr(&plan, &self.config.sink_table_name, &self.config.query_type)
884    }
885
886    fn should_use_unfiltered_incremental_delta(&self) -> bool {
887        let state = self.state.read().unwrap();
888        state.checkpoint_mode() == CheckpointMode::Incremental
889            && !state.is_incremental_disabled()
890            && matches!(self.config.query_type, QueryType::Sql)
891    }
892
893    fn should_use_unfiltered_full_snapshot_seeding(&self) -> bool {
894        let state = self.state.read().unwrap();
895        state.checkpoint_mode() == CheckpointMode::FullSnapshot
896            && !state.is_incremental_disabled()
897            && matches!(self.config.query_type, QueryType::Sql)
898    }
899
900    /// will merge and use the first ten time window in query
901    async fn gen_query_with_time_window(
902        &self,
903        engine: QueryEngineRef,
904        sink_table_schema: &Arc<Schema>,
905        primary_key_indices: &[usize],
906        allow_partial: bool,
907        max_window_cnt: Option<usize>,
908    ) -> Result<Option<PlanInfo>, Error> {
909        let query_ctx = self.state.read().unwrap().query_ctx.clone();
910        let start = SystemTime::now();
911        let since_the_epoch = start
912            .duration_since(UNIX_EPOCH)
913            .expect("Time went backwards");
914        let low_bound = self
915            .config
916            .expire_after
917            .map(|e| since_the_epoch.as_secs() - e as u64)
918            .unwrap_or(u64::MIN);
919
920        let low_bound = Timestamp::new_second(low_bound as i64);
921
922        let expire_time_window_bound = self
923            .config
924            .time_window_expr
925            .as_ref()
926            .map(|expr| expr.eval(low_bound))
927            .transpose()?;
928
929        let (expire_lower_bound, expire_upper_bound) =
930            match (expire_time_window_bound, &self.config.query_type) {
931                (Some((Some(l), Some(u))), QueryType::Sql) => (l, u),
932                (None, QueryType::Sql) => {
933                    // if it's sql query and no time window lower/upper bound is found, just return the original query(with auto columns)
934                    // use sink_table_meta to add to query the `update_at` and `__ts_placeholder` column's value too for compatibility reason
935                    debug!(
936                        "Flow id = {:?}, no time window, using the same query",
937                        self.config.flow_id
938                    );
939                    // clean dirty time window too, this could be from create flow's check_execute
940                    return self
941                        .gen_unfiltered_plan_info_if_dirty(
942                            engine,
943                            query_ctx,
944                            sink_table_schema.clone(),
945                            primary_key_indices,
946                            allow_partial,
947                            None,
948                        )
949                        .await;
950                }
951                _ => {
952                    // Clean dirty windows for full-query/non-scoped paths,
953                    // such as TQL, that cannot use a time-window filter.
954                    let (_, dirty_windows_to_restore) = self.drain_dirty_windows_signal();
955
956                    let plan_info = self
957                        .gen_unfiltered_plan_info(
958                            engine,
959                            query_ctx,
960                            sink_table_schema.clone(),
961                            primary_key_indices,
962                            allow_partial,
963                            dirty_windows_to_restore,
964                            None,
965                        )
966                        .await?;
967
968                    return Ok(Some(plan_info));
969                }
970            };
971
972        debug!(
973            "Flow id = {:?}, found time window: precise_lower_bound={:?}, precise_upper_bound={:?} with dirty time windows: {:?}",
974            self.config.flow_id,
975            expire_lower_bound,
976            expire_upper_bound,
977            self.state.read().unwrap().dirty_time_windows
978        );
979        let window_size = expire_upper_bound
980            .sub(&expire_lower_bound)
981            .with_context(|| UnexpectedSnafu {
982                reason: format!(
983                    "Can't get window size from {expire_upper_bound:?} - {expire_lower_bound:?}"
984                ),
985            })?;
986        let col_name = self
987            .config
988            .time_window_expr
989            .as_ref()
990            .map(|expr| expr.column_name.clone())
991            .with_context(|| UnexpectedSnafu {
992                reason: format!(
993                    "Flow id={:?}, Failed to get column name from time window expr",
994                    self.config.flow_id
995                ),
996            })?;
997
998        if self.should_use_unfiltered_full_snapshot_seeding() {
999            // A full-snapshot query that can seed/refresh incremental
1000            // checkpoints must not use dirty-window predicates. Rows can be
1001            // written after dirty windows are drained but before the source scan
1002            // snapshot opens; a stale dirty-window filter could exclude those
1003            // rows while the returned watermark includes them, causing the next
1004            // incremental read to skip them forever. Execute an unfiltered full
1005            // snapshot instead, and keep dirty windows only as the scheduling and
1006            // failure-restoration signal.
1007            let retention_filter = self
1008                .config
1009                .expire_after
1010                .map(|_| (col_name.as_str(), expire_lower_bound, "full-snapshot"));
1011            return self
1012                .gen_unfiltered_plan_info_if_dirty(
1013                    engine,
1014                    query_ctx,
1015                    sink_table_schema.clone(),
1016                    primary_key_indices,
1017                    allow_partial,
1018                    retention_filter,
1019                )
1020                .await;
1021        }
1022
1023        if self.should_use_unfiltered_incremental_delta() {
1024            // In incremental mode, source correctness is defined by the
1025            // per-region sequence range `(checkpoint, scan-open snapshot]`, not
1026            // by dirty-window predicates. Dirty windows are only a scheduling
1027            // signal here. Applying a stale dirty-window filter to the source can
1028            // exclude rows that are inside the returned watermark and make a
1029            // checkpoint advance skip them forever. The sink side is also left
1030            // unfiltered by dirty windows; the incremental rewrite joins the
1031            // delta groups with the full sink state for correctness. Future
1032            // dynamic filters can prune sink reads as a pure optimization.
1033            let retention_filter = self
1034                .config
1035                .expire_after
1036                .map(|_| (col_name.as_str(), expire_lower_bound, "incremental"));
1037            return self
1038                .gen_unfiltered_plan_info_if_dirty(
1039                    engine,
1040                    query_ctx,
1041                    sink_table_schema.clone(),
1042                    primary_key_indices,
1043                    allow_partial,
1044                    retention_filter,
1045                )
1046                .await;
1047        }
1048
1049        let (expr, can_advance_checkpoints) = {
1050            let mut state = self.state.write().unwrap();
1051            let window_cnt = max_window_cnt
1052                .unwrap_or(self.config.batch_opts.experimental_max_filter_num_per_query);
1053            let expr = state.dirty_time_windows.gen_filter_exprs(
1054                &col_name,
1055                Some(expire_lower_bound),
1056                window_size,
1057                window_cnt,
1058                self.config.flow_id,
1059                Some(self),
1060            )?;
1061            let can_advance_checkpoints = state.dirty_time_windows.is_empty();
1062            (expr, can_advance_checkpoints)
1063        };
1064
1065        let Some(expr) = expr else {
1066            // no new data, hence no need to update
1067            debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
1068            return Ok(None);
1069        };
1070
1071        let filter_sql = expr_to_sql(&expr.expr)
1072            .map(|sql| sql.to_string())
1073            .unwrap_or_else(|err| format!("<failed to format filter expr: {err}>"));
1074
1075        debug!(
1076            "Flow id={:?}, Generated filter expr: {:?}",
1077            self.config.flow_id, filter_sql
1078        );
1079
1080        let mut add_filter = AddFilterRewriter::new(expr.expr.clone());
1081        let mut add_auto_column = ColumnMatcherRewriter::new(
1082            sink_table_schema.clone(),
1083            primary_key_indices.to_vec(),
1084            allow_partial,
1085        );
1086
1087        let plan = self.restore_scoped_dirty_windows_on_err(
1088            &expr,
1089            sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false).await,
1090        )?;
1091        let rewrite = self.restore_scoped_dirty_windows_on_err(
1092            &expr,
1093            plan.clone()
1094                .rewrite(&mut add_filter)
1095                .and_then(|p| p.data.rewrite(&mut add_auto_column))
1096                .with_context(|_| DatafusionSnafu {
1097                    context: format!("Failed to rewrite plan:\n {}\n", plan),
1098                })
1099                .map(|rewrite| rewrite.data),
1100        )?;
1101        // only apply optimize after complex rewrite is done
1102        let new_plan = self.restore_scoped_dirty_windows_on_err(
1103            &expr,
1104            apply_df_optimizer(rewrite, &query_ctx).await,
1105        )?;
1106
1107        let info = PlanInfo {
1108            plan: new_plan.clone(),
1109            dirty_restore: DirtyRestore::Scoped(expr),
1110            can_advance_checkpoints,
1111        };
1112
1113        Ok(Some(info))
1114    }
1115}
1116
1117#[cfg(test)]
1118mod test;