Skip to main content

flow/batching_mode/
task.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::sync::{Arc, RwLock};
17use std::time::{Duration, SystemTime, UNIX_EPOCH};
18
19use api::v1::{CreateTableExpr, TableName};
20use catalog::CatalogManagerRef;
21use common_error::ext::BoxedError;
22use common_query::logical_plan::breakup_insert_plan;
23use common_telemetry::tracing::warn;
24use common_telemetry::{debug, info};
25use common_time::Timestamp;
26use datafusion::datasource::DefaultTableSource;
27use datafusion::sql::unparser::expr_to_sql;
28use datafusion_common::tree_node::{Transformed, TreeNode};
29use datafusion_common::utils::quote_identifier;
30use datafusion_common::{DFSchemaRef, TableReference};
31use datafusion_expr::{DmlStatement, LogicalPlan, WriteOp, col, lit};
32use datatypes::schema::Schema;
33use query::QueryEngineRef;
34use query::options::FLOW_INCREMENTAL_MODE;
35use query::query_engine::DefaultSerializer;
36use session::context::QueryContextRef;
37use snafu::{OptionExt, ResultExt};
38use sql::parsers::utils::is_tql;
39use store_api::mito_engine_options::MERGE_MODE_KEY;
40use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
41use table::table::adapter::DfTableProviderAdapter;
42use tokio::sync::oneshot::error::TryRecvError;
43use tokio::sync::{Mutex, oneshot};
44use tokio::time::Instant;
45
46use crate::batching_mode::BatchingModeOptions;
47use crate::batching_mode::checkpoint::checkpoint_mode_label;
48use crate::batching_mode::frontend_client::{FrontendClient, PeerDesc};
49use crate::batching_mode::state::{
50    CheckpointMode, DirtyTimeWindows, FilterExprInfo, TaskState, to_df_literal,
51};
52use crate::batching_mode::table_creator::{QueryType, create_table_with_expr};
53use crate::batching_mode::time_window::TimeWindowExpr;
54use crate::batching_mode::utils::{
55    AddFilterRewriter, ColumnMatcherRewriter, df_plan_to_sql, gen_plan_with_matching_schema,
56    get_table_info_df_schema, sql_to_df_plan,
57};
58use crate::df_optimizer::apply_df_optimizer;
59use crate::error::{
60    DatafusionSnafu, ExternalSnafu, InvalidQuerySnafu, SubstraitEncodeLogicalPlanSnafu,
61    UnexpectedSnafu,
62};
63use crate::metrics::{
64    METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME,
65    METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT,
66    METRIC_FLOW_ROWS,
67};
68use crate::{Error, FlowId};
69
70mod ckpt;
71mod inc;
72
73/// The task's config, immutable once created
74#[derive(Clone)]
75pub struct TaskConfig {
76    pub flow_id: FlowId,
77    pub query: String,
78    /// output schema of the query
79    pub output_schema: DFSchemaRef,
80    pub time_window_expr: Option<TimeWindowExpr>,
81    /// in seconds
82    pub expire_after: Option<i64>,
83    pub sink_table_name: [String; 3],
84    pub source_table_names: HashSet<[String; 3]>,
85    pub catalog_manager: CatalogManagerRef,
86    pub query_type: QueryType,
87    pub batch_opts: Arc<BatchingModeOptions>,
88    pub flow_eval_interval: Option<Duration>,
89}
90
91fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<QueryType, Error> {
92    let is_tql = is_tql(query_ctx.sql_dialect(), query)
93        .map_err(BoxedError::new)
94        .context(ExternalSnafu)?;
95    Ok(if is_tql {
96        QueryType::Tql
97    } else {
98        QueryType::Sql
99    })
100}
101
102fn is_merge_mode_last_non_null(options: &HashMap<String, String>) -> bool {
103    options
104        .get(MERGE_MODE_KEY)
105        .map(|mode| mode.eq_ignore_ascii_case("last_non_null"))
106        .unwrap_or(false)
107}
108
109fn encode_insert_plan_request(
110    insert_to: TableName,
111    insert_input_plan: &LogicalPlan,
112) -> Result<api::v1::QueryRequest, Error> {
113    let message = DFLogicalSubstraitConvertor {}
114        .encode(insert_input_plan, DefaultSerializer)
115        .context(SubstraitEncodeLogicalPlanSnafu)?;
116    Ok(api::v1::QueryRequest {
117        query: Some(api::v1::query_request::Query::InsertIntoPlan(
118            api::v1::InsertIntoPlan {
119                table_name: Some(insert_to),
120                logical_plan: message.to_vec(),
121            },
122        )),
123    })
124}
125
126fn format_insert_target_columns(plan: &LogicalPlan) -> String {
127    plan.schema()
128        .fields()
129        .iter()
130        .map(|field| quote_identifier(field.name()).to_string())
131        .collect::<Vec<_>>()
132        .join(", ")
133}
134
135#[derive(Clone)]
136pub struct BatchingTask {
137    pub config: Arc<TaskConfig>,
138    pub state: Arc<RwLock<TaskState>>,
139    /// Serializes plan generation, execution, checkpoint advancement, and dirty
140    /// window restoration for this flow. Without this, a manual flush and the
141    /// background loop can process the same checkpoint range concurrently.
142    execution_lock: Arc<Mutex<()>>,
143}
144
145/// Arguments for creating batching task
146pub struct TaskArgs<'a> {
147    pub flow_id: FlowId,
148    pub query: &'a str,
149    pub plan: LogicalPlan,
150    pub time_window_expr: Option<TimeWindowExpr>,
151    pub expire_after: Option<i64>,
152    pub sink_table_name: [String; 3],
153    pub source_table_names: Vec<[String; 3]>,
154    pub query_ctx: QueryContextRef,
155    pub catalog_manager: CatalogManagerRef,
156    pub shutdown_rx: oneshot::Receiver<()>,
157    pub batch_opts: Arc<BatchingModeOptions>,
158    pub flow_eval_interval: Option<Duration>,
159}
160
161pub struct PlanInfo {
162    pub plan: LogicalPlan,
163    pub dirty_restore: DirtyRestore,
164    pub coverage: QueryCoverage,
165}
166
167#[derive(Clone)]
168pub enum QueryCoverage {
169    /// Explicit full-query snapshot coverage, e.g. TQL or evaluation-interval
170    /// SQL flows whose plan shape cannot be safely dirty-window pruned. This
171    /// must not be used as an implicit recovery path for scoped repair or an
172    /// unsafe incremental rewrite fallback.
173    UnfilteredFull,
174    /// Scoped full-snapshot repair over the current dirty windows. A successful
175    /// result may start a fenced repair if new dirty windows appeared meanwhile.
176    ScopedBaseRepair,
177    /// A chunk of windows being repaired under the frozen high-watermark `H`.
178    /// The `high` map is sent as snapshot read bounds and must be matched by
179    /// the returned terminal watermarks before checkpoints can advance.
180    FencedRepairChunk { high: BTreeMap<u64, u64> },
181    /// Incremental delta query over `(checkpoint, scan-open snapshot]`.
182    IncrementalDelta,
183}
184
185impl QueryCoverage {
186    /// Whether this query should use incremental scan extensions and
187    /// incremental checkpoint advancement rules.
188    fn is_incremental_delta(&self) -> bool {
189        matches!(self, Self::IncrementalDelta)
190    }
191
192    /// Snapshot upper bounds requested from the storage layer. Only fenced
193    /// repair chunks carry bounds; all other coverage relies on normal scans.
194    fn snapshot_seqs(&self) -> HashMap<u64, u64> {
195        match self {
196            Self::FencedRepairChunk { high } => high.iter().map(|(k, v)| (*k, *v)).collect(),
197            _ => HashMap::new(),
198        }
199    }
200}
201
202pub enum DirtyRestore {
203    /// The query was scoped to dirty time ranges; restore those ranges if the
204    /// run fails.
205    Scoped(FilterExprInfo),
206    /// The query could not be scoped to dirty time ranges, so the dirty-window
207    /// state is only a dirty signal. Restore the consumed signal if the full
208    /// run fails.
209    ///
210    /// TODO(discord9): Full-query runs only need a dirty bool flag. Refactor
211    /// the unscoped path to stop reusing `DirtyTimeWindows` for this signal.
212    Unscoped(DirtyTimeWindows),
213}
214
215struct ExecuteOnceOutcome {
216    new_query: Option<PlanInfo>,
217    /// Execution result of the generated insert plan.
218    ///
219    /// `Ok(Some((affected_rows, elapsed)))` means a query was executed.
220    /// `Ok(None)` means no query was generated because there was no dirty signal.
221    /// `Err(_)` means plan generation or execution failed.
222    result: Result<Option<(usize, Duration)>, Error>,
223}
224
225impl BatchingTask {
226    #[allow(clippy::too_many_arguments)]
227    pub fn try_new(
228        TaskArgs {
229            flow_id,
230            query,
231            plan,
232            time_window_expr,
233            expire_after,
234            sink_table_name,
235            source_table_names,
236            query_ctx,
237            catalog_manager,
238            shutdown_rx,
239            batch_opts,
240            flow_eval_interval,
241        }: TaskArgs<'_>,
242    ) -> Result<Self, Error> {
243        let mut state = TaskState::with_dirty_time_windows(
244            query_ctx.clone(),
245            shutdown_rx,
246            DirtyTimeWindows::new(
247                batch_opts.experimental_max_filter_num_per_query,
248                batch_opts.experimental_time_window_merge_threshold,
249            ),
250        );
251        if !batch_opts.experimental_enable_incremental_read {
252            state.disable_incremental();
253        }
254
255        Ok(Self {
256            config: Arc::new(TaskConfig {
257                flow_id,
258                query: query.to_string(),
259                time_window_expr,
260                expire_after,
261                sink_table_name,
262                source_table_names: source_table_names.into_iter().collect(),
263                catalog_manager,
264                output_schema: plan.schema().clone(),
265                query_type: determine_query_type(query, &query_ctx)?,
266                batch_opts,
267                flow_eval_interval,
268            }),
269            state: Arc::new(RwLock::new(state)),
270            execution_lock: Arc::new(Mutex::new(())),
271        })
272    }
273
274    pub fn last_execution_time_millis(&self) -> Option<i64> {
275        self.state.read().unwrap().last_execution_time_millis()
276    }
277
278    /// mark time window range (now - expire_after, now) as dirty (or (0, now) if expire_after not set)
279    ///
280    /// useful for flush_flow to flush dirty time windows range
281    pub fn mark_all_windows_as_dirty(&self) -> Result<(), Error> {
282        let now = SystemTime::now();
283        let now = Timestamp::new_second(
284            now.duration_since(UNIX_EPOCH)
285                .expect("Time went backwards")
286                .as_secs() as _,
287        );
288        let lower_bound = self
289            .config
290            .expire_after
291            .map(|e| now.sub_duration(Duration::from_secs(e as _)))
292            .transpose()
293            .map_err(BoxedError::new)
294            .context(ExternalSnafu)?
295            .unwrap_or(Timestamp::new_second(0));
296        debug!(
297            "Flow {} mark range ({:?}, {:?}) as dirty",
298            self.config.flow_id, lower_bound, now
299        );
300        self.state
301            .write()
302            .unwrap()
303            .dirty_time_windows
304            .add_window(lower_bound, Some(now));
305        Ok(())
306    }
307
308    /// Create sink table if not exists
309    pub async fn check_or_create_sink_table(
310        &self,
311        engine: &QueryEngineRef,
312        frontend_client: &Arc<FrontendClient>,
313    ) -> Result<Option<(usize, Duration)>, Error> {
314        if !self.is_table_exist(&self.config.sink_table_name).await? {
315            let create_table = self.gen_create_table_expr(engine.clone()).await?;
316            info!(
317                "Try creating sink table(if not exists) with expr: {:?}",
318                create_table
319            );
320            self.create_table(frontend_client, create_table).await?;
321            info!(
322                "Sink table {}(if not exists) created",
323                self.config.sink_table_name.join(".")
324            );
325        }
326
327        Ok(None)
328    }
329
330    /// Validates that the sink table schema can accept this flow's output.
331    ///
332    /// This is a dry-run of the same schema matching logic used by runtime insert-plan
333    /// generation, but without adding dirty-window filters or executing the query. It is used
334    /// during CREATE FLOW to catch existing sink table mismatches early.
335    pub async fn validate_sink_table_schema(&self, engine: &QueryEngineRef) -> Result<(), Error> {
336        let (table, _) = get_table_info_df_schema(
337            self.config.catalog_manager.clone(),
338            self.config.sink_table_name.clone(),
339        )
340        .await?;
341
342        let table_meta = &table.table_info().meta;
343        let merge_mode_last_non_null =
344            is_merge_mode_last_non_null(&table_meta.options.extra_options);
345        let primary_key_indices = table_meta.primary_key_indices.clone();
346        let query_ctx = self.state.read().unwrap().query_ctx.clone();
347
348        gen_plan_with_matching_schema(
349            &self.config.query,
350            query_ctx,
351            engine.clone(),
352            table_meta.schema.clone(),
353            &primary_key_indices,
354            merge_mode_last_non_null,
355        )
356        .await
357        .map(|_| ())
358    }
359
360    async fn is_table_exist(&self, table_name: &[String; 3]) -> Result<bool, Error> {
361        self.config
362            .catalog_manager
363            .table_exists(&table_name[0], &table_name[1], &table_name[2], None)
364            .await
365            .map_err(BoxedError::new)
366            .context(ExternalSnafu)
367    }
368
369    pub(crate) async fn execute_once_serialized(
370        &self,
371        engine: &QueryEngineRef,
372        frontend_client: &Arc<FrontendClient>,
373        max_window_cnt: Option<usize>,
374    ) -> Result<Option<(usize, Duration)>, Error> {
375        let outcome = self
376            .execute_once_serialized_with_outcome(engine, frontend_client, max_window_cnt)
377            .await;
378        outcome.result
379    }
380
381    /// Executes one flow evaluation under `execution_lock` and keeps the
382    /// generated query context for the background loop's error logging/backoff.
383    async fn execute_once_serialized_with_outcome(
384        &self,
385        engine: &QueryEngineRef,
386        frontend_client: &Arc<FrontendClient>,
387        max_window_cnt: Option<usize>,
388    ) -> ExecuteOnceOutcome {
389        let _execution_guard = self.execution_lock.lock().await;
390        self.execute_once_unlocked(engine, frontend_client, max_window_cnt)
391            .await
392    }
393
394    /// Executes one flow evaluation. Caller must hold `execution_lock`.
395    async fn execute_once_unlocked(
396        &self,
397        engine: &QueryEngineRef,
398        frontend_client: &Arc<FrontendClient>,
399        max_window_cnt: Option<usize>,
400    ) -> ExecuteOnceOutcome {
401        let new_query = match self.gen_insert_plan_unlocked(engine, max_window_cnt).await {
402            Ok(new_query) => new_query,
403            Err(err) => {
404                return ExecuteOnceOutcome {
405                    new_query: None,
406                    result: Err(err),
407                };
408            }
409        };
410
411        if let Some(new_query) = new_query {
412            debug!("Generate new query: {}", new_query.plan);
413            let res = self
414                .execute_logical_plan_unlocked(
415                    frontend_client,
416                    &new_query.plan,
417                    &new_query.dirty_restore,
418                    &new_query.coverage,
419                )
420                .await;
421            if res.is_err() {
422                self.handle_executed_query_failure(Some(&new_query));
423            }
424            ExecuteOnceOutcome {
425                new_query: Some(new_query),
426                result: res,
427            }
428        } else {
429            debug!("Generate no query");
430            ExecuteOnceOutcome {
431                new_query: None,
432                result: Ok(None),
433            }
434        }
435    }
436
437    /// Generates the insert plan. Caller must reach this through the serialized path.
438    async fn gen_insert_plan_unlocked(
439        &self,
440        engine: &QueryEngineRef,
441        max_window_cnt: Option<usize>,
442    ) -> Result<Option<PlanInfo>, Error> {
443        let (table, df_schema) = get_table_info_df_schema(
444            self.config.catalog_manager.clone(),
445            self.config.sink_table_name.clone(),
446        )
447        .await?;
448
449        let table_meta = &table.table_info().meta;
450        let merge_mode_last_non_null =
451            is_merge_mode_last_non_null(&table_meta.options.extra_options);
452        let primary_key_indices = table_meta.primary_key_indices.clone();
453
454        let new_query = self
455            .gen_query_with_time_window(
456                engine.clone(),
457                &table.table_info().meta.schema,
458                &primary_key_indices,
459                merge_mode_last_non_null,
460                max_window_cnt,
461            )
462            .await?;
463
464        let Some(new_query) = new_query else {
465            return Ok(None);
466        };
467
468        // first check if all columns in input query exists in sink table
469        // since insert into ref to names in record batch generate by given query
470        let table_columns = df_schema
471            .columns()
472            .into_iter()
473            .map(|c| c.name)
474            .collect::<BTreeSet<_>>();
475        for column in new_query.plan.schema().columns() {
476            if !table_columns.contains(column.name()) {
477                self.restore_dirty_windows_after_failure(&new_query);
478                return InvalidQuerySnafu {
479                    reason: format!(
480                        "Column {} not found in sink table with columns {:?}",
481                        column, table_columns
482                    ),
483                }
484                .fail();
485            }
486        }
487
488        let table_provider = Arc::new(DfTableProviderAdapter::new(table));
489        let table_source = Arc::new(DefaultTableSource::new(table_provider));
490
491        // update_at& time index placeholder (if exists) should have default value
492        let plan = LogicalPlan::Dml(DmlStatement::new(
493            datafusion_common::TableReference::Full {
494                catalog: self.config.sink_table_name[0].clone().into(),
495                schema: self.config.sink_table_name[1].clone().into(),
496                table: self.config.sink_table_name[2].clone().into(),
497            },
498            table_source,
499            WriteOp::Insert(datafusion_expr::dml::InsertOp::Append),
500            Arc::new(new_query.plan.clone()),
501        ));
502        let insert_into_info = PlanInfo {
503            plan,
504            dirty_restore: new_query.dirty_restore,
505            coverage: new_query.coverage,
506        };
507        let insert_into =
508            match insert_into_info
509                .plan
510                .clone()
511                .recompute_schema()
512                .context(DatafusionSnafu {
513                    context: "Failed to recompute schema",
514                }) {
515                Ok(insert_into) => insert_into,
516                Err(err) => {
517                    self.restore_dirty_windows_after_failure(&insert_into_info);
518                    return Err(err);
519                }
520            };
521
522        Ok(Some(PlanInfo {
523            plan: insert_into,
524            dirty_restore: insert_into_info.dirty_restore,
525            coverage: insert_into_info.coverage,
526        }))
527    }
528
529    pub async fn create_table(
530        &self,
531        frontend_client: &Arc<FrontendClient>,
532        expr: CreateTableExpr,
533    ) -> Result<(), Error> {
534        let catalog = &self.config.sink_table_name[0];
535        let schema = &self.config.sink_table_name[1];
536        frontend_client
537            .create(expr.clone(), catalog, schema)
538            .await?;
539        Ok(())
540    }
541
542    /// Executes the insert plan. Caller must reach this through the serialized path.
543    async fn execute_logical_plan_unlocked(
544        &self,
545        frontend_client: &Arc<FrontendClient>,
546        plan: &LogicalPlan,
547        dirty_restore: &DirtyRestore,
548        coverage: &QueryCoverage,
549    ) -> Result<Option<(usize, Duration)>, Error> {
550        let instant = Instant::now();
551        let flow_id = self.config.flow_id;
552
553        debug!(
554            "Executing flow {flow_id}(expire_after={:?} secs) with query {}",
555            self.config.expire_after, &plan
556        );
557
558        let catalog = &self.config.sink_table_name[0];
559        let schema = &self.config.sink_table_name[1];
560
561        // fix all table ref by make it fully qualified, i.e. "table_name" => "catalog_name.schema_name.table_name"
562        let plan = plan
563            .clone()
564            .transform_down_with_subqueries(|p| {
565                if let LogicalPlan::TableScan(mut table_scan) = p {
566                    let resolved = table_scan.table_name.resolve(catalog, schema);
567                    table_scan.table_name = resolved.into();
568                    Ok(Transformed::yes(LogicalPlan::TableScan(table_scan)))
569                } else {
570                    Ok(Transformed::no(p))
571                }
572            })
573            .with_context(|_| DatafusionSnafu {
574                context: format!("Failed to fix table ref in logical plan, plan={:?}", plan),
575            })?
576            .data;
577
578        // For incremental-mode SQL queries, attempt to rewrite the delta aggregate
579        // plan into a safe delta-LEFT-JOIN-sink form before deciding on extensions.
580        let incremental_plan = if coverage.is_incremental_delta() {
581            self.prepare_plan_for_incremental(&plan).await?
582        } else {
583            None
584        };
585        let incremental_safe = incremental_plan.is_some();
586        if coverage.is_incremental_delta() && !incremental_safe {
587            warn!(
588                "Flow {flow_id} skipped unsafe incremental delta fallback; \
589                 restored dirty signal instead of executing an unfiltered full snapshot"
590            );
591            self.restore_dirty_windows(dirty_restore);
592            return Ok(None);
593        }
594        let plan = incremental_plan.unwrap_or_else(|| plan.clone());
595
596        let extensions = self
597            .build_flow_query_extensions(incremental_safe, coverage.is_incremental_delta())
598            .await?;
599        let extension_refs = extensions
600            .iter()
601            .map(|(key, value)| (*key, value.as_str()))
602            .collect::<Vec<_>>();
603        let query_mode = if extensions
604            .iter()
605            .any(|(key, _)| *key == FLOW_INCREMENTAL_MODE)
606        {
607            CheckpointMode::Incremental
608        } else {
609            CheckpointMode::FullSnapshot
610        };
611        Self::record_query_mode(flow_id, query_mode);
612        debug!(
613            "Flow {flow_id} executing batching query with checkpoint_mode={}, extension_count={}",
614            checkpoint_mode_label(query_mode),
615            extensions.len()
616        );
617
618        let mut peer_desc = None;
619        let res = {
620            let _timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME
621                .with_label_values(&[flow_id.to_string().as_str()])
622                .start_timer();
623
624            let req = if let Some((insert_to, insert_input_plan)) =
625                breakup_insert_plan(&plan, catalog, schema)
626            {
627                if query_mode == CheckpointMode::FullSnapshot
628                    && matches!(self.config.query_type, QueryType::Sql)
629                    && self.config.flow_eval_interval.is_some()
630                    && self.config.time_window_expr.is_none()
631                {
632                    // Evaluation-interval SQL flows without a time-window
633                    // expression execute as full-query snapshots. Send these
634                    // as SQL text instead of Substrait to avoid logical-plan
635                    // round-trip issues around complex joins/unions/CTEs and
636                    // duplicate field aliases. Keep ordinary SQL full snapshots
637                    // on the existing InsertIntoPlan path because SQL unparsing
638                    // is not valid for every planned aggregate shape yet.
639                    // If the local SQL unparser does not support this plan,
640                    // keep the previous InsertIntoPlan transport as a fallback.
641                    match df_plan_to_sql(&insert_input_plan) {
642                        Ok(select_sql) => {
643                            let target_columns = format_insert_target_columns(&insert_input_plan);
644                            let sql = format!(
645                                "INSERT INTO {} ({}) {}",
646                                TableReference::full(
647                                    insert_to.catalog_name.as_str(),
648                                    insert_to.schema_name.as_str(),
649                                    insert_to.table_name.as_str(),
650                                )
651                                .to_quoted_string(),
652                                target_columns,
653                                select_sql
654                            );
655                            api::v1::QueryRequest {
656                                query: Some(api::v1::query_request::Query::Sql(sql)),
657                            }
658                        }
659                        Err(err) => {
660                            warn!(
661                                "Failed to unparse full-snapshot SQL flow {} plan; \
662                                 falling back to InsertIntoPlan: {:?}",
663                                flow_id, err
664                            );
665                            encode_insert_plan_request(insert_to, &insert_input_plan)?
666                        }
667                    }
668                } else {
669                    encode_insert_plan_request(insert_to, &insert_input_plan)?
670                }
671            } else {
672                let message = DFLogicalSubstraitConvertor {}
673                    .encode(&plan, DefaultSerializer)
674                    .context(SubstraitEncodeLogicalPlanSnafu)?;
675
676                api::v1::QueryRequest {
677                    query: Some(api::v1::query_request::Query::LogicalPlan(message.to_vec())),
678                }
679            };
680
681            let snapshot_seqs = coverage.snapshot_seqs();
682            frontend_client
683                .query_with_terminal_metrics(
684                    catalog,
685                    schema,
686                    req,
687                    &extension_refs,
688                    &snapshot_seqs,
689                    &mut peer_desc,
690                )
691                .await
692        };
693
694        let elapsed = instant.elapsed();
695        let peer_label = peer_desc
696            .as_ref()
697            .map(ToString::to_string)
698            .unwrap_or_else(|| PeerDesc::default().to_string());
699        if let Err(err) = &res {
700            warn!(
701                "Failed to execute Flow {flow_id} on frontend {peer_label}, result: {err:?}, elapsed: {:?} with query: {}",
702                elapsed, &plan
703            );
704            let decision = {
705                let mut state = self.state.write().unwrap();
706                let reason = Self::query_failure_reason(err, coverage);
707                Self::apply_query_failure_to_state(&mut state, elapsed, coverage, reason)
708            };
709            if let Some(decision) = decision {
710                Self::record_checkpoint_decision(flow_id, decision);
711            }
712        }
713
714        // record slow query
715        if elapsed >= self.config.batch_opts.slow_query_threshold {
716            warn!(
717                "Flow {flow_id} on frontend {peer_label} executed for {:?} before complete, query: {}",
718                elapsed, &plan
719            );
720            let flow_id = flow_id.to_string();
721            METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY
722                .with_label_values(&[flow_id.as_str(), peer_label.as_str()])
723                .observe(elapsed.as_secs_f64());
724        }
725
726        let res = res?;
727        let (affected_rows, _) = res.output.extract_rows_and_cost();
728        debug!(
729            "Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}, watermark: {:?}",
730            elapsed,
731            res.region_watermark_map()
732        );
733        METRIC_FLOW_ROWS
734            .with_label_values(&[format!("{}-out-batching", flow_id).as_str()])
735            .inc_by(affected_rows as _);
736        let decision = {
737            let mut state = self.state.write().unwrap();
738            Self::apply_query_result_to_state(&mut state, &res, elapsed, coverage)
739        };
740        Self::record_checkpoint_decision(flow_id, decision);
741
742        Ok(Some((affected_rows, elapsed)))
743    }
744
745    /// Restore dirty windows consumed by a failed query so they are retried on
746    /// the next execution.
747    ///
748    fn restore_dirty_windows(&self, dirty_restore: &DirtyRestore) {
749        match dirty_restore {
750            DirtyRestore::Scoped(filter) => self.restore_scoped_dirty_windows(filter),
751            DirtyRestore::Unscoped(dirty_windows) => self
752                .state
753                .write()
754                .unwrap()
755                .dirty_time_windows
756                .add_dirty_windows(dirty_windows),
757        }
758    }
759
760    /// Restore the dirty signal for a plan that was generated but failed before
761    /// it could prove any checkpoint advancement.
762    fn restore_dirty_windows_after_failure(&self, query: &PlanInfo) {
763        self.restore_dirty_windows(&query.dirty_restore);
764    }
765
766    /// Restore scoped windows through `TaskState` so fenced repair can decide
767    /// whether they go back to pending repair or live dirty state.
768    fn restore_scoped_dirty_windows(&self, filter: &FilterExprInfo) {
769        self.state.write().unwrap().restore_scoped_windows(filter);
770    }
771
772    /// Run a fallible scoped operation and restore its consumed windows if plan
773    /// generation/rewrite fails before execution.
774    fn restore_scoped_dirty_windows_on_err<T>(
775        &self,
776        filter: &FilterExprInfo,
777        result: Result<T, Error>,
778    ) -> Result<T, Error> {
779        result.inspect_err(|_| {
780            self.restore_scoped_dirty_windows(filter);
781        })
782    }
783
784    /// Restore an unscoped dirty signal consumed by an explicit full-query or
785    /// incremental-delta plan.
786    fn restore_unscoped_dirty_windows(&self, dirty_windows: &DirtyTimeWindows) {
787        self.state
788            .write()
789            .unwrap()
790            .dirty_time_windows
791            .add_dirty_windows(dirty_windows);
792    }
793
794    /// Run a fallible unscoped operation and restore the dirty signal if it
795    /// fails before a query is executed.
796    fn restore_unscoped_dirty_windows_on_err<T>(
797        &self,
798        dirty_windows: &DirtyTimeWindows,
799        result: Result<T, Error>,
800    ) -> Result<T, Error> {
801        result.inspect_err(|_| {
802            self.restore_unscoped_dirty_windows(dirty_windows);
803        })
804    }
805
806    /// Consume the live dirty signal for an unscoped query while keeping a copy
807    /// that can be restored if planning or execution fails.
808    fn drain_dirty_windows_signal(&self) -> (bool, DirtyTimeWindows) {
809        let mut state = self.state.write().unwrap();
810        let dirty_windows_to_restore = state.dirty_time_windows.clone();
811        let is_dirty = !dirty_windows_to_restore.is_empty();
812        state.dirty_time_windows.clean();
813        (is_dirty, dirty_windows_to_restore)
814    }
815
816    #[allow(clippy::too_many_arguments)]
817    /// Build an unfiltered plan for explicit full-query or incremental-delta
818    /// coverage. Callers pass the consumed dirty signal for failure restoration.
819    async fn gen_unfiltered_plan_info(
820        &self,
821        engine: QueryEngineRef,
822        query_ctx: QueryContextRef,
823        sink_table_schema: Arc<Schema>,
824        primary_key_indices: &[usize],
825        allow_partial: bool,
826        dirty_windows_to_restore: DirtyTimeWindows,
827        retention_filter: Option<(&str, Timestamp, &'static str)>,
828        coverage: QueryCoverage,
829    ) -> Result<PlanInfo, Error> {
830        let mut plan = self.restore_unscoped_dirty_windows_on_err(
831            &dirty_windows_to_restore,
832            gen_plan_with_matching_schema(
833                &self.config.query,
834                query_ctx,
835                engine,
836                sink_table_schema,
837                primary_key_indices,
838                allow_partial,
839            )
840            .await,
841        )?;
842
843        if let Some((col_name, lower_bound, context)) = retention_filter {
844            let lower = self.restore_unscoped_dirty_windows_on_err(
845                &dirty_windows_to_restore,
846                to_df_literal(lower_bound),
847            )?;
848            let retention_filter = col(col_name).gt_eq(lit(lower));
849            let mut add_filter = AddFilterRewriter::new(retention_filter);
850            plan = self.restore_unscoped_dirty_windows_on_err(
851                &dirty_windows_to_restore,
852                plan.clone()
853                    .rewrite(&mut add_filter)
854                    .with_context(|_| DatafusionSnafu {
855                        context: format!(
856                            "Failed to apply {context} expire_after filter to plan:\n {}\n",
857                            plan
858                        ),
859                    })
860                    .map(|rewrite| rewrite.data),
861            )?;
862        }
863
864        Ok(PlanInfo {
865            plan,
866            dirty_restore: DirtyRestore::Unscoped(dirty_windows_to_restore),
867            coverage,
868        })
869    }
870
871    #[allow(clippy::too_many_arguments)]
872    /// Build an unfiltered plan only when the live dirty signal was present;
873    /// otherwise skip this round without querying.
874    async fn gen_unfiltered_plan_info_if_dirty(
875        &self,
876        engine: QueryEngineRef,
877        query_ctx: QueryContextRef,
878        sink_table_schema: Arc<Schema>,
879        primary_key_indices: &[usize],
880        allow_partial: bool,
881        retention_filter: Option<(&str, Timestamp, &'static str)>,
882        coverage: QueryCoverage,
883    ) -> Result<Option<PlanInfo>, Error> {
884        let (is_dirty, dirty_windows_to_restore) = self.drain_dirty_windows_signal();
885        if !is_dirty {
886            debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
887            return Ok(None);
888        }
889
890        self.gen_unfiltered_plan_info(
891            engine,
892            query_ctx,
893            sink_table_schema,
894            primary_key_indices,
895            allow_partial,
896            dirty_windows_to_restore,
897            retention_filter,
898            coverage,
899        )
900        .await
901        .map(Some)
902    }
903
904    fn handle_executed_query_failure(&self, query: Option<&PlanInfo>) {
905        if let Some(query) = query {
906            self.restore_dirty_windows_after_failure(query);
907        }
908    }
909
910    /// start executing query in a loop, break when receive shutdown signal
911    ///
912    /// any error will be logged when executing query
913    pub async fn start_executing_loop(
914        &self,
915        engine: QueryEngineRef,
916        frontend_client: Arc<FrontendClient>,
917    ) {
918        let flow_id_str = self.config.flow_id.to_string();
919        let mut max_window_cnt = None;
920        let mut interval = self
921            .config
922            .flow_eval_interval
923            .map(|d| tokio::time::interval(d));
924        if let Some(tick) = &mut interval {
925            tick.tick().await; // pass the first tick immediately
926        }
927        loop {
928            // first check if shutdown signal is received
929            // if so, break the loop
930            {
931                let mut state = self.state.write().unwrap();
932                match state.shutdown_rx.try_recv() {
933                    Ok(()) => break,
934                    Err(TryRecvError::Closed) => {
935                        warn!(
936                            "Unexpected shutdown flow {}, shutdown anyway",
937                            self.config.flow_id
938                        );
939                        break;
940                    }
941                    Err(TryRecvError::Empty) => (),
942                }
943            }
944            METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT
945                .with_label_values(&[&flow_id_str])
946                .inc();
947
948            let min_refresh = self.config.batch_opts.experimental_min_refresh_duration;
949
950            let outcome = self
951                .execute_once_serialized_with_outcome(&engine, &frontend_client, max_window_cnt)
952                .await;
953
954            match outcome.result {
955                // normal execute, sleep for some time before doing next query
956                Ok(Some(_)) => {
957                    // can increase max_window_cnt to query more windows next time
958                    max_window_cnt = max_window_cnt.map(|cnt| {
959                        (cnt + 1).min(self.config.batch_opts.experimental_max_filter_num_per_query)
960                    });
961
962                    // here use proper ticking if set eval interval
963                    if let Some(eval_interval) = &mut interval {
964                        eval_interval.tick().await;
965                    } else {
966                        // if not explicitly set, just automatically calculate next start time
967                        // using time window size and more args
968                        let sleep_until = {
969                            let state = self.state.write().unwrap();
970
971                            let time_window_size = self
972                                .config
973                                .time_window_expr
974                                .as_ref()
975                                .and_then(|t| *t.time_window_size());
976
977                            let prefer_short_incremental_cadence = state.checkpoint_mode()
978                                == CheckpointMode::Incremental
979                                && !state.is_incremental_disabled();
980
981                            state.get_next_start_query_time(
982                                self.config.flow_id,
983                                &time_window_size,
984                                min_refresh,
985                                Some(self.config.batch_opts.query_timeout),
986                                self.config.batch_opts.experimental_max_filter_num_per_query,
987                                prefer_short_incremental_cadence,
988                            )
989                        };
990
991                        tokio::time::sleep_until(sleep_until).await;
992                    };
993                }
994                // no new data, sleep for some time before checking for new data
995                Ok(None) => {
996                    debug!(
997                        "Flow id = {:?} found no new data, sleep for {:?} then continue",
998                        self.config.flow_id, min_refresh
999                    );
1000                    tokio::time::sleep(min_refresh).await;
1001                    continue;
1002                }
1003                // TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed
1004                Err(err) => {
1005                    METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT
1006                        .with_label_values(&[&flow_id_str])
1007                        .inc();
1008                    match outcome.new_query {
1009                        Some(query) => {
1010                            common_telemetry::error!(err; "Failed to execute query for flow={} with query: {}", self.config.flow_id, query.plan);
1011                            // TODO(discord9): add some backoff here? half the query time window or what
1012                            // backoff meaning use smaller `max_window_cnt` for next query
1013
1014                            // since last query failed, we should not try to query too many windows
1015                            max_window_cnt = Some(1);
1016                        }
1017                        None => {
1018                            common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id)
1019                        }
1020                    }
1021                    // also sleep for a little while before try again to prevent flooding logs
1022                    tokio::time::sleep(min_refresh).await;
1023                }
1024            }
1025        }
1026    }
1027
1028    /// Generate the create table SQL
1029    ///
1030    /// the auto created table will automatically added a `update_at` Milliseconds DEFAULT now() column in the end
1031    /// (for compatibility with flow streaming mode)
1032    ///
1033    /// and it will use first timestamp column as time index, all other columns will be added as normal columns and nullable
1034    async fn gen_create_table_expr(
1035        &self,
1036        engine: QueryEngineRef,
1037    ) -> Result<CreateTableExpr, Error> {
1038        let query_ctx = self.state.read().unwrap().query_ctx.clone();
1039        let plan =
1040            sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, true).await?;
1041        create_table_with_expr(&plan, &self.config.sink_table_name, &self.config.query_type)
1042    }
1043
1044    /// Incremental delta scans are unfiltered by dirty windows; the sequence
1045    /// range, not a time predicate, defines source correctness.
1046    fn should_use_unfiltered_incremental_delta(&self) -> bool {
1047        let state = self.state.read().unwrap();
1048        state.checkpoint_mode() == CheckpointMode::Incremental
1049            && !state.is_incremental_disabled()
1050            && matches!(self.config.query_type, QueryType::Sql)
1051    }
1052
1053    /// Generate the next plan and classify its coverage so checkpoint handling
1054    /// knows whether it is full-query, scoped repair, fenced repair, or delta.
1055    async fn gen_query_with_time_window(
1056        &self,
1057        engine: QueryEngineRef,
1058        sink_table_schema: &Arc<Schema>,
1059        primary_key_indices: &[usize],
1060        allow_partial: bool,
1061        max_window_cnt: Option<usize>,
1062    ) -> Result<Option<PlanInfo>, Error> {
1063        let query_ctx = self.state.read().unwrap().query_ctx.clone();
1064        let start = SystemTime::now();
1065        let since_the_epoch = start
1066            .duration_since(UNIX_EPOCH)
1067            .expect("Time went backwards");
1068        let low_bound = self
1069            .config
1070            .expire_after
1071            .map(|e| since_the_epoch.as_secs() - e as u64)
1072            .unwrap_or(u64::MIN);
1073
1074        let low_bound = Timestamp::new_second(low_bound as i64);
1075
1076        let expire_time_window_bound = self
1077            .config
1078            .time_window_expr
1079            .as_ref()
1080            .map(|expr| expr.eval(low_bound))
1081            .transpose()?;
1082
1083        let (expire_lower_bound, expire_upper_bound) = match (
1084            expire_time_window_bound,
1085            &self.config.query_type,
1086        ) {
1087            (Some((Some(l), Some(u))), QueryType::Sql) => (l, u),
1088            (None, QueryType::Sql) if self.config.flow_eval_interval.is_none() => {
1089                return UnexpectedSnafu {
1090                    reason: format!(
1091                        "Flow id={} reached runtime without a time-window expression or EVAL INTERVAL; create-flow validation should have rejected it",
1092                        self.config.flow_id
1093                    ),
1094                }
1095                .fail();
1096            }
1097            _ => {
1098                // Explicit full-query flows (TQL and evaluation-interval SQL
1099                // plans whose shape cannot be safely dirty-window pruned) are
1100                // allowed to run as unfiltered full snapshots. This is distinct
1101                // from using unfiltered full as a fallback after scoped repair or
1102                // incremental rewrite failed.
1103                let (_, dirty_windows_to_restore) = self.drain_dirty_windows_signal();
1104
1105                let plan_info = self
1106                    .gen_unfiltered_plan_info(
1107                        engine,
1108                        query_ctx,
1109                        sink_table_schema.clone(),
1110                        primary_key_indices,
1111                        allow_partial,
1112                        dirty_windows_to_restore,
1113                        None,
1114                        QueryCoverage::UnfilteredFull,
1115                    )
1116                    .await?;
1117
1118                return Ok(Some(plan_info));
1119            }
1120        };
1121
1122        debug!(
1123            "Flow id = {:?}, found time window: precise_lower_bound={:?}, precise_upper_bound={:?} with dirty time windows: {:?}",
1124            self.config.flow_id,
1125            expire_lower_bound,
1126            expire_upper_bound,
1127            self.state.read().unwrap().dirty_time_windows
1128        );
1129        let window_size = expire_upper_bound
1130            .sub(&expire_lower_bound)
1131            .with_context(|| UnexpectedSnafu {
1132                reason: format!(
1133                    "Can't get window size from {expire_upper_bound:?} - {expire_lower_bound:?}"
1134                ),
1135            })?;
1136        let col_name = self
1137            .config
1138            .time_window_expr
1139            .as_ref()
1140            .map(|expr| expr.column_name.clone())
1141            .with_context(|| UnexpectedSnafu {
1142                reason: format!(
1143                    "Flow id={:?}, Failed to get column name from time window expr",
1144                    self.config.flow_id
1145                ),
1146            })?;
1147
1148        if self.should_use_unfiltered_incremental_delta() {
1149            // In incremental mode, source correctness is defined by the
1150            // per-region sequence range `(checkpoint, scan-open snapshot]`, not
1151            // by dirty-window predicates. Dirty windows are only a scheduling
1152            // signal here. Applying a stale dirty-window filter to the source can
1153            // exclude rows that are inside the returned watermark and make a
1154            // checkpoint advance skip them forever. The sink side is also left
1155            // unfiltered by dirty windows; the incremental rewrite joins the
1156            // delta groups with the full sink state for correctness. Future
1157            // dynamic filters can prune sink reads as a pure optimization.
1158            let retention_filter = self
1159                .config
1160                .expire_after
1161                .map(|_| (col_name.as_str(), expire_lower_bound, "incremental"));
1162            return self
1163                .gen_unfiltered_plan_info_if_dirty(
1164                    engine,
1165                    query_ctx,
1166                    sink_table_schema.clone(),
1167                    primary_key_indices,
1168                    allow_partial,
1169                    retention_filter,
1170                    QueryCoverage::IncrementalDelta,
1171                )
1172                .await;
1173        }
1174
1175        let (expr, coverage) = {
1176            let mut state = self.state.write().unwrap();
1177            let window_cnt = max_window_cnt
1178                .unwrap_or(self.config.batch_opts.experimental_max_filter_num_per_query);
1179            let expr = state.gen_scoped_filter_exprs(
1180                &col_name,
1181                Some(expire_lower_bound),
1182                window_size,
1183                window_cnt,
1184                self.config.flow_id,
1185                Some(self),
1186            )?;
1187            let repair_high = state
1188                .pending_fenced_repair()
1189                .map(|repair| repair.high().clone());
1190            let coverage = if let Some(high) = repair_high {
1191                QueryCoverage::FencedRepairChunk { high }
1192            } else {
1193                QueryCoverage::ScopedBaseRepair
1194            };
1195            (expr, coverage)
1196        };
1197
1198        let Some(expr) = expr else {
1199            // no new data, hence no need to update
1200            debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
1201            return Ok(None);
1202        };
1203
1204        let filter_sql = expr_to_sql(&expr.expr)
1205            .map(|sql| sql.to_string())
1206            .unwrap_or_else(|err| format!("<failed to format filter expr: {err}>"));
1207
1208        debug!(
1209            "Flow id={:?}, Generated filter expr: {:?}",
1210            self.config.flow_id, filter_sql
1211        );
1212
1213        let mut add_filter = AddFilterRewriter::new(expr.expr.clone());
1214        let mut add_auto_column = ColumnMatcherRewriter::new(
1215            sink_table_schema.clone(),
1216            primary_key_indices.to_vec(),
1217            allow_partial,
1218        );
1219
1220        let plan = self.restore_scoped_dirty_windows_on_err(
1221            &expr,
1222            sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false).await,
1223        )?;
1224        let rewrite = self.restore_scoped_dirty_windows_on_err(
1225            &expr,
1226            plan.clone()
1227                .rewrite(&mut add_filter)
1228                .and_then(|p| p.data.rewrite(&mut add_auto_column))
1229                .with_context(|_| DatafusionSnafu {
1230                    context: format!("Failed to rewrite plan:\n {}\n", plan),
1231                })
1232                .map(|rewrite| rewrite.data),
1233        )?;
1234        // only apply optimize after complex rewrite is done
1235        let new_plan = self.restore_scoped_dirty_windows_on_err(
1236            &expr,
1237            apply_df_optimizer(rewrite, &query_ctx).await,
1238        )?;
1239
1240        let info = PlanInfo {
1241            plan: new_plan.clone(),
1242            dirty_restore: DirtyRestore::Scoped(expr),
1243            coverage,
1244        };
1245
1246        Ok(Some(info))
1247    }
1248}
1249
1250#[cfg(test)]
1251mod test;