fix!: fence scoped flow repair snapshots (#8277)

* fix: fence scoped flow repair snapshots

Signed-off-by: discord9 <discord9@163.com>

* test: trim duplicate flow task tests

Signed-off-by: discord9 <discord9@163.com>

* test: moreless

Signed-off-by: discord9 <discord9@163.com>

* refactor: simplify flow query failure fallback

Signed-off-by: discord9 <discord9@163.com>

* test: update need eval interval

Signed-off-by: discord9 <discord9@163.com>

* docs: for helper fn

Signed-off-by: discord9 <discord9@163.com>

* chore: less test

Signed-off-by: discord9 <discord9@163.com>

* refactor: per review

Signed-off-by: discord9 <discord9@163.com>

* test: update for eval interval

Signed-off-by: discord9 <discord9@163.com>

* fix: consume dirty windows after successful query

Signed-off-by: discord9 <discord9@163.com>

* test: rm useless tests

Signed-off-by: discord9 <discord9@163.com>

* test: standalone seq&rm dead if

Signed-off-by: discord9 <discord9@163.com>

* chore: move to pending window instead

Signed-off-by: discord9 <discord9@163.com>

* chore: mark full also call abandon_fenced_repair

Signed-off-by: discord9 <discord9@163.com>

* chore: instant for pending fenced repair

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-06-12 21:27:12 +08:00
committed by GitHub
parent 4a4fe749d8
commit f6f56e6bc5
19 changed files with 1499 additions and 369 deletions

View File

@@ -16,6 +16,7 @@ use crate::batching_mode::state::CheckpointMode;
pub(super) const CHECKPOINT_DECISION_ADVANCE: &str = "advance";
pub(super) const CHECKPOINT_DECISION_FALLBACK: &str = "fallback";
pub(super) const CHECKPOINT_DECISION_CONTINUE_REPAIR: &str = "continue_repair";
pub(super) const CHECKPOINT_REASON_NONE: &str = "none";
/// Why the task fell back to full snapshot mode.
@@ -34,9 +35,16 @@ pub(super) enum FlowQueryFallbackReason {
/// The datanode detected a stale incremental cursor and the Flow
/// must recompute from scratch.
StaleCursor,
/// A fenced repair chunk tried to use a snapshot upper bound that the
/// storage engine can no longer enforce, so the current repair high must
/// be abandoned and rebound by a fresh scoped full snapshot repair.
SnapshotFenceExpired,
/// A non-stale-cursor query failure; the Flow resets to full snapshot
/// to avoid cascading errors.
IncrementalQueryFailure,
/// A non-incremental query failed while the task was already in full
/// snapshot or scoped repair mode.
QueryFailure,
/// Incremental mode has been permanently disabled for this Flow
/// (e.g. because the query shape is not incrementally safe).
IncrementalDisabled,
@@ -49,7 +57,9 @@ impl FlowQueryFallbackReason {
Self::IncompleteRegionWatermark => "incomplete_region_watermark",
Self::DirtyBacklogPending => "dirty_backlog_pending",
Self::StaleCursor => "stale_cursor",
Self::SnapshotFenceExpired => "snapshot_fence_expired",
Self::IncrementalQueryFailure => "incremental_query_failure",
Self::QueryFailure => "query_failure",
Self::IncrementalDisabled => "incremental_disabled",
}
}
@@ -77,6 +87,14 @@ pub(super) enum FlowCheckpointDecision {
participating_regions: usize,
watermarks: usize,
},
/// FullSnapshot stayed in full snapshot mode because a scoped base repair
/// found additional dirty windows that may be concurrent with the returned
/// high watermark. These windows must be repaired under the fixed high
/// before checkpoints can advance.
ContinuedFencedRepair {
pending_windows: usize,
watermarks: usize,
},
/// Any mode → FullSnapshot.
///
/// Watermark information was incomplete, a participating region was
@@ -96,6 +114,13 @@ impl FlowCheckpointDecision {
checkpoint_mode_label(CheckpointMode::FullSnapshot)
}
Self::AdvancedIncremental { .. } => checkpoint_mode_label(CheckpointMode::Incremental),
// Fenced repair is intentionally a FullSnapshot sub-state, not a
// third top-level checkpoint mode, so metrics keep the
// `full_snapshot` mode label while the decision label carries
// `continue_repair`.
Self::ContinuedFencedRepair { .. } => {
checkpoint_mode_label(CheckpointMode::FullSnapshot)
}
Self::FallbackToFullSnapshot { previous_mode, .. } => {
checkpoint_mode_label(previous_mode)
}
@@ -107,6 +132,7 @@ impl FlowCheckpointDecision {
Self::AdvancedFromFullSnapshot { .. } | Self::AdvancedIncremental { .. } => {
CHECKPOINT_DECISION_ADVANCE
}
Self::ContinuedFencedRepair { .. } => CHECKPOINT_DECISION_CONTINUE_REPAIR,
Self::FallbackToFullSnapshot { .. } => CHECKPOINT_DECISION_FALLBACK,
}
}

View File

@@ -458,6 +458,22 @@ impl BatchingEngine {
.is_some_and(|value| value.eq_ignore_ascii_case("true"))
}
/// SQL flows without a usable time-window expression can only run as an
/// explicit full-query flow, so require `EVAL INTERVAL` at creation time.
fn ensure_sql_flow_has_twe_or_eval_interval(
eval_interval: Option<i64>,
has_time_window_expr: bool,
) -> Result<(), Error> {
ensure!(
eval_interval.is_some() || has_time_window_expr,
InvalidQuerySnafu {
reason: "SQL batching flow without a time-window expression must specify EVAL INTERVAL to run as an explicit full-query flow"
.to_string(),
}
);
Ok(())
}
fn ensure_incremental_source_append_only(
batch_opts: &BatchingModeOptions,
table_name: &[String; 3],
@@ -609,6 +625,10 @@ impl BatchingEngine {
.unwrap_or("None".to_string())
);
if !is_tql {
Self::ensure_sql_flow_has_twe_or_eval_interval(eval_interval, phy_expr.is_some())?;
}
let task_args = TaskArgs {
flow_id,
query: &sql,
@@ -1035,6 +1055,62 @@ mod tests {
));
}
#[test]
fn test_sql_flow_requires_time_window_or_eval_interval() {
BatchingEngine::ensure_sql_flow_has_twe_or_eval_interval(None, true)
.expect("SQL flow with a time-window expression should be accepted");
BatchingEngine::ensure_sql_flow_has_twe_or_eval_interval(Some(10), false).expect(
"SQL flow with EVAL INTERVAL should be accepted as an explicit full-query flow",
);
let err = BatchingEngine::ensure_sql_flow_has_twe_or_eval_interval(None, false)
.expect_err("SQL flow without a time-window expression or EVAL INTERVAL should fail");
assert!(matches!(err, Error::InvalidQuery { .. }), "{err}");
assert!(
err.to_string().contains("must specify EVAL INTERVAL"),
"{err}"
);
}
#[tokio::test]
async fn test_complex_sql_without_eval_interval_is_rejected_as_no_twe() {
let query_engine = create_test_query_engine();
let ctx = QueryContext::arc();
let plan = sql_to_df_plan(
ctx.clone(),
query_engine.clone(),
r#"
SELECT
l.number,
date_bin('5 minutes', l.ts) AS time_window
FROM numbers_with_ts l
JOIN numbers_with_ts r ON l.number = r.number
GROUP BY l.number, time_window
"#,
true,
)
.await
.unwrap();
let (_, time_window_expr, _, _) = find_time_window_expr(
&plan,
query_engine.engine_state().catalog_manager().clone(),
ctx,
)
.await
.unwrap();
assert!(
time_window_expr.is_none(),
"complex SQL should be classified as having no safe TWE"
);
BatchingEngine::ensure_sql_flow_has_twe_or_eval_interval(Some(10), false)
.expect("complex SQL can run as an explicit full-query flow when EVAL INTERVAL is set");
let err = BatchingEngine::ensure_sql_flow_has_twe_or_eval_interval(None, false)
.expect_err("complex SQL without EVAL INTERVAL should fail creation");
assert!(matches!(err, Error::InvalidQuery { .. }), "{err}");
}
#[test]
fn test_incremental_source_append_only_enforcement() {
let table_name = [

View File

@@ -15,7 +15,7 @@
//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
use std::collections::HashMap;
use std::sync::{Arc, Mutex, Weak};
use std::sync::{Arc, Mutex, RwLock, Weak};
use api::v1::greptime_request::Request;
use api::v1::query_request::Query;
@@ -25,7 +25,6 @@ use common_error::ext::BoxedError;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager, load_client_tls_config};
use common_meta::peer::{Peer, PeerDiscovery};
use common_query::{Output, OutputData};
use common_recordbatch::adapter::{RecordBatchMetrics, RegionWatermarkEntry};
use common_telemetry::warn;
use futures::stream::{FuturesUnordered, StreamExt};
use meta_client::client::MetaClient;
@@ -388,12 +387,15 @@ impl FrontendClient {
}
}
/// Execute a flow query and return terminal metrics. `snapshot_seqs` are
/// optional read upper bounds used only by snapshot-fenced repair chunks.
pub(crate) async fn query_with_terminal_metrics(
&self,
catalog: &str,
schema: &str,
request: QueryRequest,
extensions: &[(&str, &str)],
snapshot_seqs: &HashMap<u64, u64>,
peer_desc: &mut Option<PeerDesc>,
) -> Result<OutputWithMetrics, Error> {
let flow_extensions = build_flow_extensions(extensions)?;
@@ -415,7 +417,7 @@ impl FrontendClient {
request,
&hints,
extensions,
&Default::default(),
snapshot_seqs,
)
.await
.map_err(BoxedError::new)
@@ -437,6 +439,7 @@ impl FrontendClient {
.current_catalog(catalog.to_string())
.current_schema(schema.to_string())
.extensions(extensions_map)
.snapshot_seqs(Arc::new(RwLock::new(snapshot_seqs.clone())))
.build();
let ctx = Arc::new(ctx);
let database_client = {
@@ -462,7 +465,7 @@ impl FrontendClient {
.do_query(Request::Query(request), ctx.clone())
.await
.map(|output| {
wrap_standalone_output_with_terminal_metrics(output, &flow_extensions, &ctx)
wrap_standalone_output_with_terminal_metrics(output, &flow_extensions)
})
.map_err(BoxedError::new)
.context(ExternalSnafu)
@@ -570,7 +573,6 @@ fn build_flow_extensions(extensions: &[(&str, &str)]) -> Result<FlowQueryExtensi
fn wrap_standalone_output_with_terminal_metrics(
output: Output,
flow_extensions: &FlowQueryExtensions,
query_ctx: &QueryContextRef,
) -> OutputWithMetrics {
let should_collect_region_watermark = flow_extensions.should_collect_region_watermark();
let terminal_metrics =
@@ -580,7 +582,6 @@ fn wrap_standalone_output_with_terminal_metrics(
.plan
.clone()
.and_then(terminal_recordbatch_metrics_from_plan)
.or_else(|| terminal_recordbatch_metrics_from_snapshots(query_ctx))
} else {
None
};
@@ -591,28 +592,6 @@ fn wrap_standalone_output_with_terminal_metrics(
result
}
fn terminal_recordbatch_metrics_from_snapshots(
query_ctx: &QueryContextRef,
) -> Option<RecordBatchMetrics> {
let mut region_watermarks = query_ctx
.snapshots()
.into_iter()
.map(|(region_id, watermark)| RegionWatermarkEntry {
region_id,
watermark: Some(watermark),
})
.collect::<Vec<_>>();
if region_watermarks.is_empty() {
return None;
}
region_watermarks.sort_by_key(|entry| entry.region_id);
Some(RecordBatchMetrics {
region_watermarks,
..Default::default()
})
}
/// Describe a peer of frontend
#[derive(Debug, Default, Clone)]
pub(crate) enum PeerDesc {
@@ -790,6 +769,8 @@ mod tests {
ctx: QueryContextRef,
) -> std::result::Result<Output, BoxedError> {
assert_eq!(ctx.extension("flow.return_region_seq"), Some("true"));
assert_eq!(ctx.get_snapshot(1), Some(10));
assert_eq!(ctx.get_snapshot(2), Some(20));
ctx.set_snapshot(42, 99);
Ok(Output::new_with_affected_rows(1))
}
@@ -903,6 +884,7 @@ mod tests {
query: Some(Query::Sql("select 1".to_string())),
},
&[],
&HashMap::new(),
&mut peer_desc,
)
.await
@@ -940,6 +922,7 @@ mod tests {
query: Some(Query::Sql("insert into t select 1".to_string())),
},
&[("flow.return_region_seq", "true")],
&HashMap::new(),
&mut peer_desc,
)
.await
@@ -965,6 +948,7 @@ mod tests {
query: Some(Query::Sql("insert into t select * from src".to_string())),
},
&[("flow.return_region_seq", "true")],
&HashMap::from([(1, 10), (2, 20)]),
&mut peer_desc,
)
.await
@@ -972,10 +956,7 @@ mod tests {
assert!(matches!(peer_desc, Some(PeerDesc::Standalone)));
assert!(result.metrics.is_ready());
assert_eq!(
result.region_watermark_map(),
Some(HashMap::from([(42, 99)]))
);
assert_eq!(result.region_watermark_map(), None);
}
#[tokio::test]
@@ -993,6 +974,7 @@ mod tests {
query: Some(Query::Sql("select 1".to_string())),
},
&[("flow.return_region_seq", "not-a-bool")],
&HashMap::new(),
&mut peer_desc,
)
.await

View File

@@ -51,6 +51,7 @@ pub struct TaskState {
/// mapping of `start -> end` and non-overlapping
pub(crate) dirty_time_windows: DirtyTimeWindows,
checkpoint_mode: CheckpointMode,
pending_fenced_repair: Option<FencedRepair>,
/// Region id -> last consumed watermark sequence. Incremental scans use
/// this as the next lower sequence bound for each source region.
checkpoints: BTreeMap<u64, u64>,
@@ -81,6 +82,7 @@ impl TaskState {
last_exec_time_millis: None,
dirty_time_windows,
checkpoint_mode: CheckpointMode::FullSnapshot,
pending_fenced_repair: None,
checkpoints: Default::default(),
incremental_disabled: false,
exec_state: ExecState::Idle,
@@ -112,6 +114,12 @@ impl TaskState {
&self.checkpoints
}
/// Returns the in-progress fenced repair, if the task is repairing dirty
/// windows under a frozen full-snapshot high watermark.
pub fn pending_fenced_repair(&self) -> Option<&FencedRepair> {
self.pending_fenced_repair.as_ref()
}
pub fn is_incremental_disabled(&self) -> bool {
self.incremental_disabled
}
@@ -123,17 +131,25 @@ impl TaskState {
self.mark_full_snapshot();
}
/// Move back to top-level FullSnapshot mode. If a fenced repair is active,
/// restore its not-yet-in-flight pending windows to the live dirty queue so
/// the moved backlog is not lost.
pub fn mark_full_snapshot(&mut self) {
self.checkpoint_mode = CheckpointMode::FullSnapshot;
self.abandon_fenced_repair();
}
/// Replace full-snapshot checkpoints with a complete watermark proof.
/// Clears fenced repair state and enters Incremental unless disabled.
pub fn advance_checkpoints(&mut self, watermark_map: HashMap<u64, u64>) {
self.checkpoints = watermark_map.into_iter().collect();
self.pending_fenced_repair = None;
if !self.incremental_disabled {
self.checkpoint_mode = CheckpointMode::Incremental;
}
}
/// Advance only the participating regions for an incremental delta query.
/// This also clears any stale fenced repair sub-state.
pub fn advance_incremental_checkpoints_with_participation(
&mut self,
participating_regions: &BTreeSet<u64>,
@@ -147,8 +163,140 @@ impl TaskState {
if !self.incremental_disabled {
self.checkpoint_mode = CheckpointMode::Incremental;
}
self.pending_fenced_repair = None;
}
/// Start repairing the current live dirty windows under a frozen high `H`.
/// The current live backlog is moved into the fenced repair so successful
/// chunks are consumed from that backlog. New post-`H` dirty signals can
/// still arrive in the live queue while the fenced repair is active.
pub fn start_fenced_repair(&mut self, high: BTreeMap<u64, u64>) -> Option<&FencedRepair> {
if self.dirty_time_windows.is_empty() {
self.pending_fenced_repair = None;
return None;
}
let pending_windows = self.dirty_time_windows.clone();
self.dirty_time_windows.clean();
self.pending_fenced_repair = Some(FencedRepair {
high,
pending_windows,
});
self.checkpoint_mode = CheckpointMode::FullSnapshot;
self.pending_fenced_repair.as_ref()
}
/// Finish the fenced repair and promote the frozen high watermark to the
/// checkpoint map. Incremental-disabled flows stay in FullSnapshot mode.
pub fn finish_fenced_repair(&mut self) -> Option<BTreeMap<u64, u64>> {
let repair = self.pending_fenced_repair.take()?;
self.checkpoints = repair.high;
if !self.incremental_disabled {
self.checkpoint_mode = CheckpointMode::Incremental;
}
Some(self.checkpoints.clone())
}
/// Abandon the current fenced repair and restore all not-yet-in-flight
/// pending windows to the live dirty queue for a fresh scoped repair.
pub fn abandon_fenced_repair(&mut self) -> bool {
self.checkpoint_mode = CheckpointMode::FullSnapshot;
let Some(repair) = self.pending_fenced_repair.take() else {
return false;
};
self.dirty_time_windows
.add_dirty_windows(&repair.pending_windows);
true
}
/// Restore a scoped query's windows after a failed or unproven run. During
/// an active fenced repair this requeues into `pending_windows`; otherwise
/// it restores to the live dirty queue.
pub fn restore_scoped_windows(&mut self, filter: &FilterExprInfo) {
if let Some(repair) = self.pending_fenced_repair.as_mut() {
repair
.pending_windows
.add_windows(filter.time_ranges.clone());
return;
}
self.dirty_time_windows
.add_windows(filter.time_ranges.clone());
}
/// Generate the next scoped filter from the fenced-repair queue when active;
/// otherwise consume windows from the live dirty queue.
pub fn gen_scoped_filter_exprs(
&mut self,
col_name: &str,
expire_lower_bound: Option<Timestamp>,
window_size: chrono::Duration,
window_cnt: usize,
flow_id: FlowId,
task_ctx: Option<&BatchingTask>,
) -> Result<Option<FilterExprInfo>, Error> {
if let Some(repair) = self.pending_fenced_repair.as_mut() {
let expr = repair.pending_windows.gen_filter_exprs(
col_name,
expire_lower_bound,
window_size,
window_cnt,
flow_id,
task_ctx,
)?;
if expr.is_some() || !repair.pending_windows.is_empty() {
return Ok(expr);
}
// All pending repair windows may have expired during merge. Clear
// the empty repair so this call can fall back to live dirty windows
// instead of routing future executions to an empty queue forever.
self.pending_fenced_repair = None;
}
self.dirty_time_windows.gen_filter_exprs(
col_name,
expire_lower_bound,
window_size,
window_cnt,
flow_id,
task_ctx,
)
}
/// Returns true only when the query result's participating regions and
/// terminal watermarks exactly match the fenced repair's frozen high `H`.
pub fn fenced_repair_watermarks_match_high(
&self,
participating_regions: &BTreeSet<u64>,
watermark_map: &HashMap<u64, u64>,
) -> bool {
let Some(repair) = self.pending_fenced_repair.as_ref() else {
return false;
};
!participating_regions.is_empty()
&& participating_regions.len() == repair.high.len()
&& watermark_map.len() == repair.high.len()
&& participating_regions.iter().all(|region_id| {
repair
.high
.get(region_id)
.zip(watermark_map.get(region_id))
.is_some_and(|(high, watermark)| high == watermark)
})
}
/// Whether the active fenced repair has drained all pending windows.
pub fn fenced_repair_pending_is_empty(&self) -> bool {
self.pending_fenced_repair
.as_ref()
.is_some_and(|repair| repair.pending_windows.is_empty())
}
/// Full-snapshot checkpoint advances require a watermark for every region
/// that participated in the query.
pub fn can_advance_full_snapshot_checkpoints(
&self,
participating_regions: &BTreeSet<u64>,
@@ -161,6 +309,8 @@ impl TaskState {
.all(|region_id| watermark_map.contains_key(region_id))
}
/// Incremental advances are limited to participating regions whose returned
/// watermark is not older than the stored checkpoint.
pub fn can_advance_incremental_checkpoints_with_participation(
&self,
participating_regions: &BTreeSet<u64>,
@@ -191,6 +341,9 @@ impl TaskState {
///
/// if current the dirty time range is longer than one query can handle,
/// execute immediately to faster clean up dirty time windows.
/// Active fenced repairs also execute immediately while pending windows
/// remain: the current backlog has moved out of live dirty windows and into
/// `pending_fenced_repair.pending_windows`.
///
/// If `prefer_short_incremental_cadence` is true, run incremental queries
/// more often when there is no large dirty backlog. This only reduces the
@@ -214,6 +367,18 @@ impl TaskState {
next_duration
};
if self
.pending_fenced_repair
.as_ref()
.is_some_and(|repair| !repair.pending_windows().is_empty())
{
debug!(
"Flow id = {}, active fenced repair still has pending windows, execute immediately",
flow_id,
);
return Instant::now();
}
let cur_dirty_window_size = self.dirty_time_windows.window_size();
// compute how much time range can be handled in one query
let max_query_update_range = (*time_window_size)
@@ -721,6 +886,26 @@ pub enum CheckpointMode {
Incremental,
}
/// Dirty windows that must be repaired under a frozen full-snapshot watermark.
/// This is a FullSnapshot sub-state, not a separate checkpoint mode.
#[derive(Debug, Clone)]
pub struct FencedRepair {
high: BTreeMap<u64, u64>,
pending_windows: DirtyTimeWindows,
}
impl FencedRepair {
/// Frozen high watermark `H` used as the snapshot upper bound for chunks.
pub fn high(&self) -> &BTreeMap<u64, u64> {
&self.high
}
/// Dirty windows still waiting to be repaired under `high`.
pub fn pending_windows(&self) -> &DirtyTimeWindows {
&self.pending_windows
}
}
/// Filter Expression's information
#[derive(Debug, Clone)]
pub struct FilterExprInfo {
@@ -1029,6 +1214,38 @@ mod test {
);
}
#[test]
fn test_mark_full_snapshot_restores_pending_fenced_repair_windows() {
let query_ctx = QueryContext::arc();
let (_tx, rx) = tokio::sync::oneshot::channel();
let mut state = TaskState::new(query_ctx, rx);
state
.dirty_time_windows
.add_window(Timestamp::new_second(10), Some(Timestamp::new_second(15)));
state
.dirty_time_windows
.add_window(Timestamp::new_second(100), Some(Timestamp::new_second(105)));
state
.start_fenced_repair(BTreeMap::from([(1_u64, 10_u64)]))
.unwrap();
assert!(state.dirty_time_windows.is_empty());
assert_eq!(
state
.pending_fenced_repair()
.unwrap()
.pending_windows()
.len(),
2
);
state.mark_full_snapshot();
assert_eq!(state.checkpoint_mode(), CheckpointMode::FullSnapshot);
assert!(state.pending_fenced_repair().is_none());
assert_eq!(state.dirty_time_windows.len(), 2);
}
#[test]
fn test_disable_incremental_persists_full_snapshot_mode() {
let query_ctx = QueryContext::arc();
@@ -1339,6 +1556,33 @@ mod test {
);
}
#[test]
fn test_pending_fenced_repair_schedules_immediately() {
let mut state = state_with_past_update(Duration::from_secs(10));
state
.dirty_time_windows
.add_window(Timestamp::new_second(0), Some(Timestamp::new_second(5)));
state
.start_fenced_repair(BTreeMap::from([(1_u64, 10_u64)]))
.unwrap();
assert!(state.dirty_time_windows.is_empty());
assert!(!state.fenced_repair_pending_is_empty());
let result = state.get_next_start_query_time(
1,
&Some(Duration::from_secs(60)),
Duration::from_secs(5),
None,
20,
false,
);
assert!(
result <= Instant::now(),
"pending fenced repair backlog should schedule immediately"
);
}
#[test]
fn test_incremental_disabled_ignores_short_cadence() {
// When prefer_short_incremental_cadence is true but the dirty backlog is

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{BTreeSet, HashMap, HashSet};
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::sync::{Arc, RwLock};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
@@ -161,7 +161,42 @@ pub struct TaskArgs<'a> {
pub struct PlanInfo {
pub plan: LogicalPlan,
pub dirty_restore: DirtyRestore,
pub can_advance_checkpoints: bool,
pub coverage: QueryCoverage,
}
#[derive(Clone)]
pub enum QueryCoverage {
/// Explicit full-query snapshot coverage, e.g. TQL or evaluation-interval
/// SQL flows whose plan shape cannot be safely dirty-window pruned. This
/// must not be used as an implicit recovery path for scoped repair or an
/// unsafe incremental rewrite fallback.
UnfilteredFull,
/// Scoped full-snapshot repair over the current dirty windows. A successful
/// result may start a fenced repair if new dirty windows appeared meanwhile.
ScopedBaseRepair,
/// A chunk of windows being repaired under the frozen high-watermark `H`.
/// The `high` map is sent as snapshot read bounds and must be matched by
/// the returned terminal watermarks before checkpoints can advance.
FencedRepairChunk { high: BTreeMap<u64, u64> },
/// Incremental delta query over `(checkpoint, scan-open snapshot]`.
IncrementalDelta,
}
impl QueryCoverage {
/// Whether this query should use incremental scan extensions and
/// incremental checkpoint advancement rules.
fn is_incremental_delta(&self) -> bool {
matches!(self, Self::IncrementalDelta)
}
/// Snapshot upper bounds requested from the storage layer. Only fenced
/// repair chunks carry bounds; all other coverage relies on normal scans.
fn snapshot_seqs(&self) -> HashMap<u64, u64> {
match self {
Self::FencedRepairChunk { high } => high.iter().map(|(k, v)| (*k, *v)).collect(),
_ => HashMap::new(),
}
}
}
pub enum DirtyRestore {
@@ -379,7 +414,8 @@ impl BatchingTask {
.execute_logical_plan_unlocked(
frontend_client,
&new_query.plan,
new_query.can_advance_checkpoints,
&new_query.dirty_restore,
&new_query.coverage,
)
.await;
if res.is_err() {
@@ -466,7 +502,7 @@ impl BatchingTask {
let insert_into_info = PlanInfo {
plan,
dirty_restore: new_query.dirty_restore,
can_advance_checkpoints: new_query.can_advance_checkpoints,
coverage: new_query.coverage,
};
let insert_into =
match insert_into_info
@@ -486,7 +522,7 @@ impl BatchingTask {
Ok(Some(PlanInfo {
plan: insert_into,
dirty_restore: insert_into_info.dirty_restore,
can_advance_checkpoints: insert_into_info.can_advance_checkpoints,
coverage: insert_into_info.coverage,
}))
}
@@ -508,7 +544,8 @@ impl BatchingTask {
&self,
frontend_client: &Arc<FrontendClient>,
plan: &LogicalPlan,
can_advance_checkpoints: bool,
dirty_restore: &DirtyRestore,
coverage: &QueryCoverage,
) -> Result<Option<(usize, Duration)>, Error> {
let instant = Instant::now();
let flow_id = self.config.flow_id;
@@ -540,16 +577,24 @@ impl BatchingTask {
// For incremental-mode SQL queries, attempt to rewrite the delta aggregate
// plan into a safe delta-LEFT-JOIN-sink form before deciding on extensions.
let incremental_plan = if can_advance_checkpoints {
let incremental_plan = if coverage.is_incremental_delta() {
self.prepare_plan_for_incremental(&plan).await?
} else {
None
};
let incremental_safe = incremental_plan.is_some();
if coverage.is_incremental_delta() && !incremental_safe {
warn!(
"Flow {flow_id} skipped unsafe incremental delta fallback; \
restored dirty signal instead of executing an unfiltered full snapshot"
);
self.restore_dirty_windows(dirty_restore);
return Ok(None);
}
let plan = incremental_plan.unwrap_or_else(|| plan.clone());
let extensions = self
.build_flow_query_extensions(incremental_safe, can_advance_checkpoints)
.build_flow_query_extensions(incremental_safe, coverage.is_incremental_delta())
.await?;
let extension_refs = extensions
.iter()
@@ -633,8 +678,16 @@ impl BatchingTask {
}
};
let snapshot_seqs = coverage.snapshot_seqs();
frontend_client
.query_with_terminal_metrics(catalog, schema, req, &extension_refs, &mut peer_desc)
.query_with_terminal_metrics(
catalog,
schema,
req,
&extension_refs,
&snapshot_seqs,
&mut peer_desc,
)
.await
};
@@ -650,8 +703,8 @@ impl BatchingTask {
);
let decision = {
let mut state = self.state.write().unwrap();
let reason = Self::query_failure_reason(err);
Self::apply_query_failure_to_state(&mut state, elapsed, reason)
let reason = Self::query_failure_reason(err, coverage);
Self::apply_query_failure_to_state(&mut state, elapsed, coverage, reason)
};
if let Some(decision) = decision {
Self::record_checkpoint_decision(flow_id, decision);
@@ -680,16 +733,11 @@ impl BatchingTask {
METRIC_FLOW_ROWS
.with_label_values(&[format!("{}-out-batching", flow_id).as_str()])
.inc_by(affected_rows as _);
{
let decision = {
let mut state = self.state.write().unwrap();
let decision = Self::apply_query_result_to_state(
&mut state,
&res,
elapsed,
can_advance_checkpoints,
);
Self::record_checkpoint_decision(flow_id, decision);
}
Self::apply_query_result_to_state(&mut state, &res, elapsed, coverage)
};
Self::record_checkpoint_decision(flow_id, decision);
Ok(Some((affected_rows, elapsed)))
}
@@ -697,8 +745,8 @@ impl BatchingTask {
/// Restore dirty windows consumed by a failed query so they are retried on
/// the next execution.
///
fn restore_dirty_windows_after_failure(&self, query: &PlanInfo) {
match &query.dirty_restore {
fn restore_dirty_windows(&self, dirty_restore: &DirtyRestore) {
match dirty_restore {
DirtyRestore::Scoped(filter) => self.restore_scoped_dirty_windows(filter),
DirtyRestore::Unscoped(dirty_windows) => self
.state
@@ -709,14 +757,20 @@ impl BatchingTask {
}
}
fn restore_scoped_dirty_windows(&self, filter: &FilterExprInfo) {
self.state
.write()
.unwrap()
.dirty_time_windows
.add_windows(filter.time_ranges.clone());
/// Restore the dirty signal for a plan that was generated but failed before
/// it could prove any checkpoint advancement.
fn restore_dirty_windows_after_failure(&self, query: &PlanInfo) {
self.restore_dirty_windows(&query.dirty_restore);
}
/// Restore scoped windows through `TaskState` so fenced repair can decide
/// whether they go back to pending repair or live dirty state.
fn restore_scoped_dirty_windows(&self, filter: &FilterExprInfo) {
self.state.write().unwrap().restore_scoped_windows(filter);
}
/// Run a fallible scoped operation and restore its consumed windows if plan
/// generation/rewrite fails before execution.
fn restore_scoped_dirty_windows_on_err<T>(
&self,
filter: &FilterExprInfo,
@@ -727,6 +781,8 @@ impl BatchingTask {
})
}
/// Restore an unscoped dirty signal consumed by an explicit full-query or
/// incremental-delta plan.
fn restore_unscoped_dirty_windows(&self, dirty_windows: &DirtyTimeWindows) {
self.state
.write()
@@ -735,6 +791,8 @@ impl BatchingTask {
.add_dirty_windows(dirty_windows);
}
/// Run a fallible unscoped operation and restore the dirty signal if it
/// fails before a query is executed.
fn restore_unscoped_dirty_windows_on_err<T>(
&self,
dirty_windows: &DirtyTimeWindows,
@@ -745,6 +803,8 @@ impl BatchingTask {
})
}
/// Consume the live dirty signal for an unscoped query while keeping a copy
/// that can be restored if planning or execution fails.
fn drain_dirty_windows_signal(&self) -> (bool, DirtyTimeWindows) {
let mut state = self.state.write().unwrap();
let dirty_windows_to_restore = state.dirty_time_windows.clone();
@@ -754,6 +814,8 @@ impl BatchingTask {
}
#[allow(clippy::too_many_arguments)]
/// Build an unfiltered plan for explicit full-query or incremental-delta
/// coverage. Callers pass the consumed dirty signal for failure restoration.
async fn gen_unfiltered_plan_info(
&self,
engine: QueryEngineRef,
@@ -763,6 +825,7 @@ impl BatchingTask {
allow_partial: bool,
dirty_windows_to_restore: DirtyTimeWindows,
retention_filter: Option<(&str, Timestamp, &'static str)>,
coverage: QueryCoverage,
) -> Result<PlanInfo, Error> {
let mut plan = self.restore_unscoped_dirty_windows_on_err(
&dirty_windows_to_restore,
@@ -801,10 +864,13 @@ impl BatchingTask {
Ok(PlanInfo {
plan,
dirty_restore: DirtyRestore::Unscoped(dirty_windows_to_restore),
can_advance_checkpoints: true,
coverage,
})
}
#[allow(clippy::too_many_arguments)]
/// Build an unfiltered plan only when the live dirty signal was present;
/// otherwise skip this round without querying.
async fn gen_unfiltered_plan_info_if_dirty(
&self,
engine: QueryEngineRef,
@@ -813,6 +879,7 @@ impl BatchingTask {
primary_key_indices: &[usize],
allow_partial: bool,
retention_filter: Option<(&str, Timestamp, &'static str)>,
coverage: QueryCoverage,
) -> Result<Option<PlanInfo>, Error> {
let (is_dirty, dirty_windows_to_restore) = self.drain_dirty_windows_signal();
if !is_dirty {
@@ -828,6 +895,7 @@ impl BatchingTask {
allow_partial,
dirty_windows_to_restore,
retention_filter,
coverage,
)
.await
.map(Some)
@@ -973,6 +1041,8 @@ impl BatchingTask {
create_table_with_expr(&plan, &self.config.sink_table_name, &self.config.query_type)
}
/// Incremental delta scans are unfiltered by dirty windows; the sequence
/// range, not a time predicate, defines source correctness.
fn should_use_unfiltered_incremental_delta(&self) -> bool {
let state = self.state.read().unwrap();
state.checkpoint_mode() == CheckpointMode::Incremental
@@ -980,14 +1050,8 @@ impl BatchingTask {
&& matches!(self.config.query_type, QueryType::Sql)
}
fn should_use_unfiltered_full_snapshot_seeding(&self) -> bool {
let state = self.state.read().unwrap();
state.checkpoint_mode() == CheckpointMode::FullSnapshot
&& !state.is_incremental_disabled()
&& matches!(self.config.query_type, QueryType::Sql)
}
/// will merge and use the first ten time window in query
/// Generate the next plan and classify its coverage so checkpoint handling
/// knows whether it is full-query, scoped repair, fenced repair, or delta.
async fn gen_query_with_time_window(
&self,
engine: QueryEngineRef,
@@ -1016,49 +1080,44 @@ impl BatchingTask {
.map(|expr| expr.eval(low_bound))
.transpose()?;
let (expire_lower_bound, expire_upper_bound) =
match (expire_time_window_bound, &self.config.query_type) {
(Some((Some(l), Some(u))), QueryType::Sql) => (l, u),
(None, QueryType::Sql) if self.config.flow_eval_interval.is_none() => {
// if it's sql query and no time window lower/upper bound is found, just return the original query(with auto columns)
// use sink_table_meta to add to query the `update_at` and `__ts_placeholder` column's value too for compatibility reason
debug!(
"Flow id = {:?}, no time window, using the same query",
let (expire_lower_bound, expire_upper_bound) = match (
expire_time_window_bound,
&self.config.query_type,
) {
(Some((Some(l), Some(u))), QueryType::Sql) => (l, u),
(None, QueryType::Sql) if self.config.flow_eval_interval.is_none() => {
return UnexpectedSnafu {
reason: format!(
"Flow id={} reached runtime without a time-window expression or EVAL INTERVAL; create-flow validation should have rejected it",
self.config.flow_id
);
// clean dirty time window too, this could be from create flow's check_execute
return self
.gen_unfiltered_plan_info_if_dirty(
engine,
query_ctx,
sink_table_schema.clone(),
primary_key_indices,
allow_partial,
None,
)
.await;
),
}
_ => {
// Clean dirty windows for full-query/non-scoped paths,
// such as TQL or evaluation-interval SQL without a recognized
// time-window expression, that cannot use a time-window filter.
let (_, dirty_windows_to_restore) = self.drain_dirty_windows_signal();
.fail();
}
_ => {
// Explicit full-query flows (TQL and evaluation-interval SQL
// plans whose shape cannot be safely dirty-window pruned) are
// allowed to run as unfiltered full snapshots. This is distinct
// from using unfiltered full as a fallback after scoped repair or
// incremental rewrite failed.
let (_, dirty_windows_to_restore) = self.drain_dirty_windows_signal();
let plan_info = self
.gen_unfiltered_plan_info(
engine,
query_ctx,
sink_table_schema.clone(),
primary_key_indices,
allow_partial,
dirty_windows_to_restore,
None,
)
.await?;
let plan_info = self
.gen_unfiltered_plan_info(
engine,
query_ctx,
sink_table_schema.clone(),
primary_key_indices,
allow_partial,
dirty_windows_to_restore,
None,
QueryCoverage::UnfilteredFull,
)
.await?;
return Ok(Some(plan_info));
}
};
return Ok(Some(plan_info));
}
};
debug!(
"Flow id = {:?}, found time window: precise_lower_bound={:?}, precise_upper_bound={:?} with dirty time windows: {:?}",
@@ -1086,31 +1145,6 @@ impl BatchingTask {
),
})?;
if self.should_use_unfiltered_full_snapshot_seeding() {
// A full-snapshot query that can seed/refresh incremental
// checkpoints must not use dirty-window predicates. Rows can be
// written after dirty windows are drained but before the source scan
// snapshot opens; a stale dirty-window filter could exclude those
// rows while the returned watermark includes them, causing the next
// incremental read to skip them forever. Execute an unfiltered full
// snapshot instead, and keep dirty windows only as the scheduling and
// failure-restoration signal.
let retention_filter = self
.config
.expire_after
.map(|_| (col_name.as_str(), expire_lower_bound, "full-snapshot"));
return self
.gen_unfiltered_plan_info_if_dirty(
engine,
query_ctx,
sink_table_schema.clone(),
primary_key_indices,
allow_partial,
retention_filter,
)
.await;
}
if self.should_use_unfiltered_incremental_delta() {
// In incremental mode, source correctness is defined by the
// per-region sequence range `(checkpoint, scan-open snapshot]`, not
@@ -1133,15 +1167,16 @@ impl BatchingTask {
primary_key_indices,
allow_partial,
retention_filter,
QueryCoverage::IncrementalDelta,
)
.await;
}
let (expr, can_advance_checkpoints) = {
let (expr, coverage) = {
let mut state = self.state.write().unwrap();
let window_cnt = max_window_cnt
.unwrap_or(self.config.batch_opts.experimental_max_filter_num_per_query);
let expr = state.dirty_time_windows.gen_filter_exprs(
let expr = state.gen_scoped_filter_exprs(
&col_name,
Some(expire_lower_bound),
window_size,
@@ -1149,8 +1184,15 @@ impl BatchingTask {
self.config.flow_id,
Some(self),
)?;
let can_advance_checkpoints = state.dirty_time_windows.is_empty();
(expr, can_advance_checkpoints)
let repair_high = state
.pending_fenced_repair()
.map(|repair| repair.high().clone());
let coverage = if let Some(high) = repair_high {
QueryCoverage::FencedRepairChunk { high }
} else {
QueryCoverage::ScopedBaseRepair
};
(expr, coverage)
};
let Some(expr) = expr else {
@@ -1198,7 +1240,7 @@ impl BatchingTask {
let info = PlanInfo {
plan: new_plan.clone(),
dirty_restore: DirtyRestore::Scoped(expr),
can_advance_checkpoints,
coverage,
};
Ok(Some(info))

View File

@@ -24,73 +24,169 @@ use crate::batching_mode::checkpoint::{
FlowCheckpointDecision, FlowQueryFallbackReason, checkpoint_mode_label,
};
use crate::batching_mode::state::{CheckpointMode, TaskState};
use crate::batching_mode::task::BatchingTask;
use crate::batching_mode::task::{BatchingTask, QueryCoverage};
use crate::metrics::{
METRIC_FLOW_BATCHING_ENGINE_CHECKPOINT_DECISION_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_MODE_CNT,
};
use crate::{Error, FlowId};
impl BatchingTask {
pub(super) fn query_failure_reason(err: &Error) -> FlowQueryFallbackReason {
/// Classify execution errors into checkpoint fallback reasons. A stale
/// snapshot fence is special only for fenced repair chunks.
pub(super) fn query_failure_reason(
err: &Error,
coverage: &QueryCoverage,
) -> FlowQueryFallbackReason {
if err.status_code() == StatusCode::RequestOutdated {
FlowQueryFallbackReason::StaleCursor
} else {
if matches!(coverage, QueryCoverage::FencedRepairChunk { .. }) {
FlowQueryFallbackReason::SnapshotFenceExpired
} else {
FlowQueryFallbackReason::StaleCursor
}
} else if matches!(coverage, QueryCoverage::IncrementalDelta) {
FlowQueryFallbackReason::IncrementalQueryFailure
} else {
FlowQueryFallbackReason::QueryFailure
}
}
/// Apply the conservative state transition for a failed query. This never
/// advances checkpoints; it only decides whether to abandon repair state or
/// fall back from incremental mode.
pub(super) fn apply_query_failure_to_state(
state: &mut TaskState,
elapsed: Duration,
coverage: &QueryCoverage,
reason: FlowQueryFallbackReason,
) -> Option<FlowCheckpointDecision> {
state.after_query_exec(elapsed, false);
let checkpoint_mode = state.checkpoint_mode();
if checkpoint_mode == CheckpointMode::Incremental {
state.mark_full_snapshot();
Some(FlowCheckpointDecision::FallbackToFullSnapshot {
if matches!(coverage, QueryCoverage::FencedRepairChunk { .. })
&& matches!(reason, FlowQueryFallbackReason::SnapshotFenceExpired)
{
// `abandon_fenced_repair()` restores the not-yet-in-flight pending
// windows from the stale repair back to live dirty windows. The
// currently executing chunk is restored separately by the outer
// failure path (`handle_executed_query_failure`), after this state
// transition has removed `pending_fenced_repair`, so it also lands
// in live dirty windows for the next scoped base repair.
state.abandon_fenced_repair();
return Some(FlowCheckpointDecision::FallbackToFullSnapshot {
previous_mode: checkpoint_mode,
reason,
})
} else {
None
});
}
if checkpoint_mode == CheckpointMode::Incremental {
state.mark_full_snapshot();
}
Some(FlowCheckpointDecision::FallbackToFullSnapshot {
previous_mode: checkpoint_mode,
reason,
})
}
/// Apply checkpoint transitions for a successfully executed query using its
/// terminal watermark proof and declared coverage.
pub(super) fn apply_query_result_to_state(
state: &mut TaskState,
res: &OutputWithMetrics,
elapsed: Duration,
can_advance_checkpoints: bool,
coverage: &QueryCoverage,
) -> FlowCheckpointDecision {
state.after_query_exec(elapsed, true);
let checkpoint_mode = state.checkpoint_mode();
if !can_advance_checkpoints {
state.mark_full_snapshot();
return FlowCheckpointDecision::FallbackToFullSnapshot {
previous_mode: checkpoint_mode,
reason: FlowQueryFallbackReason::DirtyBacklogPending,
};
}
if let (Some(participating_regions), Some(watermark_map)) =
(res.participating_regions(), res.region_watermark_map())
{
let can_advance = match checkpoint_mode {
CheckpointMode::FullSnapshot => state
.can_advance_full_snapshot_checkpoints(&participating_regions, &watermark_map),
CheckpointMode::Incremental => state
.can_advance_incremental_checkpoints_with_participation(
let participating_region_count = participating_regions.len();
let watermark_count = watermark_map.len();
match coverage {
QueryCoverage::ScopedBaseRepair => {
if !state.can_advance_full_snapshot_checkpoints(
&participating_regions,
&watermark_map,
),
};
) {
return FlowCheckpointDecision::FallbackToFullSnapshot {
previous_mode: checkpoint_mode,
reason: FlowQueryFallbackReason::IncompleteRegionWatermark,
};
}
if can_advance {
let participating_region_count = participating_regions.len();
let watermark_count = watermark_map.len();
match checkpoint_mode {
CheckpointMode::FullSnapshot => {
if state.is_incremental_disabled() {
return FlowCheckpointDecision::FallbackToFullSnapshot {
previous_mode: CheckpointMode::FullSnapshot,
reason: FlowQueryFallbackReason::IncrementalDisabled,
};
}
if state.dirty_time_windows.is_empty() {
state.advance_checkpoints(watermark_map);
FlowCheckpointDecision::AdvancedFromFullSnapshot {
participating_regions: participating_region_count,
watermarks: watermark_count,
}
} else if let Some(repair) =
state.start_fenced_repair(watermark_map.into_iter().collect())
{
FlowCheckpointDecision::ContinuedFencedRepair {
pending_windows: repair.pending_windows().len(),
watermarks: repair.high().len(),
}
} else {
FlowCheckpointDecision::FallbackToFullSnapshot {
previous_mode: checkpoint_mode,
reason: FlowQueryFallbackReason::DirtyBacklogPending,
}
}
}
QueryCoverage::FencedRepairChunk { .. } => {
if !state
.fenced_repair_watermarks_match_high(&participating_regions, &watermark_map)
{
// A successful repair chunk whose terminal watermark
// differs from the frozen fence `H` cannot prove that
// continuing the same repair is safe. The pre-`H`
// repair work item already executed under the snapshot
// fence, so do not requeue it; restore only
// not-yet-in-flight pending windows, then start over
// with a fresh H. Later/post-`H` writes still rely on
// live dirty notifications.
state.abandon_fenced_repair();
return FlowCheckpointDecision::FallbackToFullSnapshot {
previous_mode: checkpoint_mode,
reason: FlowQueryFallbackReason::IncompleteRegionWatermark,
};
}
if state.fenced_repair_pending_is_empty() {
state.finish_fenced_repair();
if state.is_incremental_disabled() {
FlowCheckpointDecision::FallbackToFullSnapshot {
previous_mode: CheckpointMode::FullSnapshot,
reason: FlowQueryFallbackReason::IncrementalDisabled,
}
} else {
FlowCheckpointDecision::AdvancedFromFullSnapshot {
participating_regions: participating_region_count,
watermarks: watermark_count,
}
}
} else {
let repair = state
.pending_fenced_repair()
.expect("fenced repair exists after matching repair chunk watermark");
FlowCheckpointDecision::ContinuedFencedRepair {
pending_windows: repair.pending_windows().len(),
watermarks: repair.high().len(),
}
}
}
QueryCoverage::UnfilteredFull => {
if state.can_advance_full_snapshot_checkpoints(
&participating_regions,
&watermark_map,
) {
state.advance_checkpoints(watermark_map);
if state.is_incremental_disabled() {
FlowCheckpointDecision::FallbackToFullSnapshot {
@@ -103,8 +199,19 @@ impl BatchingTask {
watermarks: watermark_count,
}
}
} else {
debug_assert_ne!(checkpoint_mode, CheckpointMode::Incremental);
FlowCheckpointDecision::FallbackToFullSnapshot {
previous_mode: checkpoint_mode,
reason: FlowQueryFallbackReason::IncompleteRegionWatermark,
}
}
CheckpointMode::Incremental => {
}
QueryCoverage::IncrementalDelta => {
if state.can_advance_incremental_checkpoints_with_participation(
&participating_regions,
&watermark_map,
) {
state.advance_incremental_checkpoints_with_participation(
&participating_regions,
watermark_map,
@@ -113,17 +220,22 @@ impl BatchingTask {
participating_regions: participating_region_count,
watermarks: watermark_count,
}
} else {
state.mark_full_snapshot();
FlowCheckpointDecision::FallbackToFullSnapshot {
previous_mode: checkpoint_mode,
reason: FlowQueryFallbackReason::IncompleteRegionWatermark,
}
}
}
} else {
state.mark_full_snapshot();
FlowCheckpointDecision::FallbackToFullSnapshot {
previous_mode: checkpoint_mode,
reason: FlowQueryFallbackReason::IncompleteRegionWatermark,
}
}
} else {
state.mark_full_snapshot();
if matches!(coverage, QueryCoverage::FencedRepairChunk { .. }) {
state.abandon_fenced_repair();
}
if matches!(checkpoint_mode, CheckpointMode::Incremental) {
state.mark_full_snapshot();
}
FlowCheckpointDecision::FallbackToFullSnapshot {
previous_mode: checkpoint_mode,
reason: FlowQueryFallbackReason::MissingRegionWatermark,
@@ -159,6 +271,14 @@ impl BatchingTask {
"Flow {flow_id} advanced incremental checkpoints, participating_regions={participating_regions}, watermarks={watermarks}"
);
}
FlowCheckpointDecision::ContinuedFencedRepair {
pending_windows,
watermarks,
} => {
debug!(
"Flow {flow_id} continued fenced repair, pending_windows={pending_windows}, watermarks={watermarks}"
);
}
FlowCheckpointDecision::FallbackToFullSnapshot {
previous_mode,
reason,

View File

@@ -64,12 +64,14 @@ impl BatchingTask {
/// For incremental-mode SQL queries, attempt to prepare an executable plan
/// that is safe for incremental scan extensions.
///
/// Returns `Some(plan)` when incremental extensions are safe, and `None`
/// when the caller should execute the original plan without incremental
/// extensions. The returned plan may be either a rewritten
/// delta-LEFT-JOIN-sink merge plan or the original plan. In particular,
/// plain GROUP BY queries with no aggregate merge columns are incremental
/// safe without a rewrite, so they return `Some(original_plan)`.
/// Returns `Some(plan)` when incremental extensions are safe. For an
/// incremental-delta query, `None` means the caller must not execute the
/// original plan without incremental extensions, because that would become
/// an unfiltered full snapshot; the caller should restore the dirty signal
/// and skip the current round instead. The returned plan may be either a
/// rewritten delta-LEFT-JOIN-sink merge plan or the original plan. In
/// particular, plain GROUP BY queries with no aggregate merge columns are
/// incremental safe without a rewrite, so they return `Some(original_plan)`.
pub(super) async fn prepare_plan_for_incremental(
&self,
plan: &LogicalPlan,
@@ -131,8 +133,10 @@ impl BatchingTask {
// Fetch sink table for the merge rewrite.
// Transient errors (catalog, schema, filter, or rewrite) should not
// permanently disable incremental mode. Instead, we fall back to a
// full-snapshot plan for this round while keeping incremental retryable.
// permanently disable incremental mode. They also must not execute the
// original plan without incremental extensions, because that would be an
// unfiltered full snapshot. The caller will restore the dirty signal and
// skip this round while keeping incremental retryable.
let sink_table = match get_table_info_df_schema(
self.config.catalog_manager.clone(),
self.config.sink_table_name.clone(),
@@ -143,10 +147,9 @@ impl BatchingTask {
Err(err) => {
warn!(
"Flow {} failed to fetch sink table for incremental rewrite; \
falling back to full snapshot for this round: {:?}",
skipping this round to avoid unfiltered full snapshot: {:?}",
self.config.flow_id, err
);
self.state.write().unwrap().mark_full_snapshot();
return Ok(None);
}
};
@@ -163,10 +166,9 @@ impl BatchingTask {
Err(err) => {
warn!(
"Flow {} failed to rewrite incremental aggregate with sink merge; \
falling back to full snapshot for this round: {:?}",
skipping this round to avoid unfiltered full snapshot: {:?}",
self.config.flow_id, err
);
self.state.write().unwrap().mark_full_snapshot();
return Ok(None);
}
};

File diff suppressed because it is too large Load Diff

View File

@@ -77,6 +77,27 @@ mod test {
| 1970-01-01T00:00:00.009 | -9 | s9 |
+-------------------------+----+----+";
query_and_expect(db.frontend().as_ref(), sql, expected).await;
create_table_named(&client, "bar").await;
let result = client
.sql_with_terminal_metrics(
"insert into bar select ts, a, `B` from foo",
&[("flow.return_region_seq", "true")],
)
.await
.unwrap();
let OutputData::AffectedRows(affected_rows) = result.output.data else {
panic!("expected affected rows output");
};
assert_eq!(affected_rows, 9);
assert!(result.metrics.is_ready());
let region_watermark_map = result
.region_watermark_map()
.expect("standalone affected-rows output should carry terminal region watermarks");
assert!(
!region_watermark_map.is_empty(),
"standalone affected-rows output should contain at least one region watermark"
);
}
#[tokio::test(flavor = "multi_thread")]

View File

@@ -10,7 +10,7 @@ Affected Rows: 0
-- should fallback to streaming mode
-- SQLNESS REPLACE id=\d+ id=REDACTED
CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
CREATE FLOW test_distinct_basic SINK TO out_distinct_basic EVAL INTERVAL '1m' AS
SELECT
DISTINCT number as dis
FROM
@@ -167,7 +167,7 @@ CREATE TABLE distinct_basic (
Affected Rows: 0
CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
CREATE FLOW test_distinct_basic SINK TO out_distinct_basic EVAL INTERVAL '1m' AS
SELECT
DISTINCT number as dis
FROM

View File

@@ -8,7 +8,7 @@ CREATE TABLE distinct_basic (
-- should fallback to streaming mode
-- SQLNESS REPLACE id=\d+ id=REDACTED
CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
CREATE FLOW test_distinct_basic SINK TO out_distinct_basic EVAL INTERVAL '1m' AS
SELECT
DISTINCT number as dis
FROM
@@ -71,7 +71,7 @@ CREATE TABLE distinct_basic (
TIME INDEX(ts)
)WITH ('ttl' = '5s');
CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
CREATE FLOW test_distinct_basic SINK TO out_distinct_basic EVAL INTERVAL '1m' AS
SELECT
DISTINCT number as dis
FROM
@@ -126,4 +126,4 @@ SELECT number FROM distinct_basic;
DROP FLOW test_distinct_basic;
DROP TABLE distinct_basic;
DROP TABLE out_distinct_basic;
DROP TABLE out_distinct_basic;

View File

@@ -166,7 +166,7 @@ CREATE TABLE input_basic (
Affected Rows: 0
CREATE FLOW test_wildcard_basic SiNk TO out_basic AS
CREATE FLOW test_wildcard_basic SiNk TO out_basic EVAL INTERVAL '1m' AS
SELECT
COUNT(*) as wildcard
FROM
@@ -196,7 +196,7 @@ DROP FLOW test_wildcard_basic;
Affected Rows: 0
CREATE FLOW test_wildcard_basic sink TO out_basic AS
CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS
SELECT
COUNT(*) as wildcard
FROM
@@ -297,7 +297,7 @@ CREATE TABLE distinct_basic (
Affected Rows: 0
CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
CREATE FLOW test_distinct_basic SINK TO out_distinct_basic EVAL INTERVAL '1m' AS
SELECT
DISTINCT number as dis
FROM
@@ -641,7 +641,7 @@ CREATE TABLE ngx_access_log (
Affected Rows: 0
-- create flow task to calculate the distinct country
CREATE FLOW calc_ngx_country SINK TO ngx_country AS
CREATE FLOW calc_ngx_country SINK TO ngx_country EVAL INTERVAL '1m' AS
SELECT
DISTINCT country,
FROM
@@ -790,7 +790,7 @@ CREATE TABLE ngx_access_log (
Affected Rows: 0
CREATE FLOW calc_ngx_country SINK TO ngx_country AS
CREATE FLOW calc_ngx_country SINK TO ngx_country EVAL INTERVAL '1m' AS
SELECT
DISTINCT country,
-- this distinct is not necessary, but it's a good test to see if it works
@@ -960,7 +960,7 @@ CREATE TABLE temp_alerts (
Affected Rows: 0
CREATE FLOW temp_monitoring SINK TO temp_alerts AS
CREATE FLOW temp_monitoring SINK TO temp_alerts EVAL INTERVAL '1m' AS
SELECT
sensor_id,
loc,
@@ -1255,7 +1255,7 @@ CREATE TABLE requests_without_ip (
Affected Rows: 0
CREATE FLOW requests_long_term SINK TO requests_without_ip AS
CREATE FLOW requests_long_term SINK TO requests_without_ip EVAL INTERVAL '1m' AS
SELECT
service_name,
val,
@@ -1685,7 +1685,7 @@ CREATE TABLE numbers_input_basic (
Affected Rows: 0
CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS
CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic EVAL INTERVAL '1m' AS
SELECT
sum(case when number > 10 then 1 else 0 end)/count(number) as avg_after_filter_num
FROM
@@ -1883,7 +1883,7 @@ SELECT device_model,
+--------------+------------------+--------------+-------------------------+-------------------------+---------------------------+---------------+----------------+----------------------+----------------------+-------------------+---------------------+
-- Create a flow with same source and sink table
CREATE FLOW same_source_and_sink_table SINK TO live_connection_log AS
CREATE FLOW same_source_and_sink_table SINK TO live_connection_log EVAL INTERVAL '1m' AS
SELECT
*
FROM

View File

@@ -75,7 +75,7 @@ CREATE TABLE input_basic (
TIME INDEX(ts)
);
CREATE FLOW test_wildcard_basic SiNk TO out_basic AS
CREATE FLOW test_wildcard_basic SiNk TO out_basic EVAL INTERVAL '1m' AS
SELECT
COUNT(*) as wildcard
FROM
@@ -85,7 +85,7 @@ SHOW CREATE TABLE out_basic;
DROP FLOW test_wildcard_basic;
CREATE FLOW test_wildcard_basic sink TO out_basic AS
CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS
SELECT
COUNT(*) as wildcard
FROM
@@ -122,7 +122,7 @@ CREATE TABLE distinct_basic (
TIME INDEX(ts)
);
CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
CREATE FLOW test_distinct_basic SINK TO out_distinct_basic EVAL INTERVAL '1m' AS
SELECT
DISTINCT number as dis
FROM
@@ -284,7 +284,7 @@ CREATE TABLE ngx_access_log (
);
-- create flow task to calculate the distinct country
CREATE FLOW calc_ngx_country SINK TO ngx_country AS
CREATE FLOW calc_ngx_country SINK TO ngx_country EVAL INTERVAL '1m' AS
SELECT
DISTINCT country,
FROM
@@ -346,7 +346,7 @@ CREATE TABLE ngx_access_log (
access_time TIMESTAMP TIME INDEX
);
CREATE FLOW calc_ngx_country SINK TO ngx_country AS
CREATE FLOW calc_ngx_country SINK TO ngx_country EVAL INTERVAL '1m' AS
SELECT
DISTINCT country,
-- this distinct is not necessary, but it's a good test to see if it works
@@ -427,7 +427,7 @@ CREATE TABLE temp_alerts (
update_at TIMESTAMP
);
CREATE FLOW temp_monitoring SINK TO temp_alerts AS
CREATE FLOW temp_monitoring SINK TO temp_alerts EVAL INTERVAL '1m' AS
SELECT
sensor_id,
loc,
@@ -586,7 +586,7 @@ CREATE TABLE requests_without_ip (
PRIMARY KEY(service_name)
);
CREATE FLOW requests_long_term SINK TO requests_without_ip AS
CREATE FLOW requests_long_term SINK TO requests_without_ip EVAL INTERVAL '1m' AS
SELECT
service_name,
val,
@@ -790,7 +790,7 @@ CREATE TABLE numbers_input_basic (
TIME INDEX(ts)
);
CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS
CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic EVAL INTERVAL '1m' AS
SELECT
sum(case when number > 10 then 1 else 0 end)/count(number) as avg_after_filter_num
FROM
@@ -908,7 +908,7 @@ SELECT device_model,
record_time_window FROM live_connection_statistics_detail;
-- Create a flow with same source and sink table
CREATE FLOW same_source_and_sink_table SINK TO live_connection_log AS
CREATE FLOW same_source_and_sink_table SINK TO live_connection_log EVAL INTERVAL '1m' AS
SELECT
*
FROM

View File

@@ -9,7 +9,7 @@ CREATE TABLE input_basic (
Affected Rows: 0
CREATE FLOW test_wildcard_basic sink TO out_basic AS
CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS
SELECT
COUNT(*) as wildcard
FROM
@@ -64,7 +64,7 @@ CREATE TABLE input_basic (
Affected Rows: 0
CREATE FLOW test_wildcard_basic sink TO out_basic AS
CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS
SELECT
COUNT(*) as wildcard
FROM
@@ -142,7 +142,7 @@ DROP FLOW test_wildcard_basic;
Affected Rows: 0
-- recreate flow so that it use new table id
CREATE FLOW test_wildcard_basic sink TO out_basic AS
CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS
SELECT
COUNT(*) as wildcard
FROM
@@ -206,7 +206,7 @@ CREATE TABLE input_basic (
Affected Rows: 0
CREATE FLOW test_wildcard_basic sink TO out_basic AS
CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS
SELECT
COUNT(*) as wildcard
FROM
@@ -247,7 +247,7 @@ DROP TABLE out_basic;
Affected Rows: 0
CREATE FLOW test_wildcard_basic sink TO out_basic AS
CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS
SELECT
COUNT(*) as wildcard
FROM
@@ -304,7 +304,7 @@ CREATE TABLE input_basic (
Affected Rows: 0
CREATE FLOW test_wildcard_basic sink TO out_basic AS
CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS
SELECT
COUNT(*) as wildcard
FROM
@@ -371,7 +371,7 @@ CREATE TABLE input_basic (
Affected Rows: 0
CREATE FLOW test_wildcard_basic sink TO out_basic AS
CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS
SELECT
COUNT(*) as wildcard
FROM
@@ -474,7 +474,7 @@ DROP FLOW test_wildcard_basic;
Affected Rows: 0
-- recreate flow so that it use new table id
CREATE FLOW test_wildcard_basic sink TO out_basic AS
CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS
SELECT
COUNT(*) as wildcard
FROM
@@ -549,7 +549,7 @@ CREATE TABLE input_basic (
Affected Rows: 0
CREATE FLOW test_wildcard_basic sink TO out_basic AS
CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS
SELECT
COUNT(*) as wildcard
FROM
@@ -602,7 +602,7 @@ DROP TABLE out_basic;
Affected Rows: 0
CREATE FLOW test_wildcard_basic sink TO out_basic AS
CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS
SELECT
COUNT(*) as wildcard
FROM

View File

@@ -7,7 +7,7 @@ CREATE TABLE input_basic (
append_mode = 'true'
);
CREATE FLOW test_wildcard_basic sink TO out_basic AS
CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS
SELECT
COUNT(*) as wildcard
FROM
@@ -39,7 +39,7 @@ CREATE TABLE input_basic (
TIME INDEX(ts)
);
CREATE FLOW test_wildcard_basic sink TO out_basic AS
CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS
SELECT
COUNT(*) as wildcard
FROM
@@ -81,7 +81,7 @@ SELECT wildcard FROM out_basic;
DROP FLOW test_wildcard_basic;
-- recreate flow so that it use new table id
CREATE FLOW test_wildcard_basic sink TO out_basic AS
CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS
SELECT
COUNT(*) as wildcard
FROM
@@ -113,7 +113,7 @@ CREATE TABLE input_basic (
TIME INDEX(ts)
);
CREATE FLOW test_wildcard_basic sink TO out_basic AS
CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS
SELECT
COUNT(*) as wildcard
FROM
@@ -135,7 +135,7 @@ DROP FLOW test_wildcard_basic;
DROP TABLE out_basic;
CREATE FLOW test_wildcard_basic sink TO out_basic AS
CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS
SELECT
COUNT(*) as wildcard
FROM
@@ -166,7 +166,7 @@ CREATE TABLE input_basic (
TIME INDEX(ts)
);
CREATE FLOW test_wildcard_basic sink TO out_basic AS
CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS
SELECT
COUNT(*) as wildcard
FROM
@@ -204,7 +204,7 @@ CREATE TABLE input_basic (
TIME INDEX(ts)
);
CREATE FLOW test_wildcard_basic sink TO out_basic AS
CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS
SELECT
COUNT(*) as wildcard
FROM
@@ -259,7 +259,7 @@ SELECT wildcard FROM out_basic;
DROP FLOW test_wildcard_basic;
-- recreate flow so that it use new table id
CREATE FLOW test_wildcard_basic sink TO out_basic AS
CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS
SELECT
COUNT(*) as wildcard
FROM
@@ -296,7 +296,7 @@ CREATE TABLE input_basic (
TIME INDEX(ts)
);
CREATE FLOW test_wildcard_basic sink TO out_basic AS
CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS
SELECT
COUNT(*) as wildcard
FROM
@@ -323,7 +323,7 @@ DROP FLOW test_wildcard_basic;
DROP TABLE out_basic;
CREATE FLOW test_wildcard_basic sink TO out_basic AS
CREATE FLOW test_wildcard_basic sink TO out_basic EVAL INTERVAL '1m' AS
SELECT
COUNT(*) as wildcard
FROM

View File

@@ -232,7 +232,7 @@ CREATE TABLE ngx_country (
Affected Rows: 0
/* create flow task to calculate the distinct country */
CREATE FLOW calc_ngx_country SINK TO ngx_country AS
CREATE FLOW calc_ngx_country SINK TO ngx_country EVAL INTERVAL '1m' AS
SELECT
DISTINCT country,
FROM
@@ -314,7 +314,7 @@ CREATE TABLE ngx_country (
Affected Rows: 0
CREATE FLOW calc_ngx_country SINK TO ngx_country AS
CREATE FLOW calc_ngx_country SINK TO ngx_country EVAL INTERVAL '1m' AS
SELECT
DISTINCT country,
date_bin(INTERVAL '1 hour', access_time) as time_window,
@@ -403,7 +403,7 @@ CREATE TABLE temp_alerts (
Affected Rows: 0
CREATE FLOW temp_monitoring SINK TO temp_alerts AS
CREATE FLOW temp_monitoring SINK TO temp_alerts EVAL INTERVAL '1m' AS
SELECT
sensor_id,
loc,

View File

@@ -184,7 +184,7 @@ CREATE TABLE ngx_country (
);
/* create flow task to calculate the distinct country */
CREATE FLOW calc_ngx_country SINK TO ngx_country AS
CREATE FLOW calc_ngx_country SINK TO ngx_country EVAL INTERVAL '1m' AS
SELECT
DISTINCT country,
FROM
@@ -236,7 +236,7 @@ CREATE TABLE ngx_country (
PRIMARY KEY(country)
);
CREATE FLOW calc_ngx_country SINK TO ngx_country AS
CREATE FLOW calc_ngx_country SINK TO ngx_country EVAL INTERVAL '1m' AS
SELECT
DISTINCT country,
date_bin(INTERVAL '1 hour', access_time) as time_window,
@@ -295,7 +295,7 @@ CREATE TABLE temp_alerts (
PRIMARY KEY(sensor_id, loc)
);
CREATE FLOW temp_monitoring SINK TO temp_alerts AS
CREATE FLOW temp_monitoring SINK TO temp_alerts EVAL INTERVAL '1m' AS
SELECT
sensor_id,
loc,
@@ -411,4 +411,4 @@ DROP FLOW calc_ngx_distribution;
DROP TABLE ngx_distribution;
DROP TABLE ngx_access_log;
DROP TABLE ngx_access_log;

View File

@@ -20,7 +20,7 @@ INSERT INTO ngx_access_log VALUES ('192.168.1.1', 'GET', '/index.html', 200, 512
Affected Rows: 4
CREATE FLOW user_agent_flow SINK TO user_agent_statistics AS SELECT user_agent, COUNT(user_agent) AS total_count FROM ngx_access_log GROUP BY user_agent;
CREATE FLOW user_agent_flow SINK TO user_agent_statistics EVAL INTERVAL '1m' AS SELECT user_agent, COUNT(user_agent) AS total_count FROM ngx_access_log GROUP BY user_agent;
Affected Rows: 0
@@ -32,7 +32,7 @@ SELECT created_time <= updated_time, created_time IS NOT NULL, source_table_name
| true | true | greptime.public.ngx_access_log |
+--------------------------------------------------------------------------------+---------------------------------------------------+--------------------------------+
CREATE OR REPLACE FLOW user_agent_flow SINK TO user_agent_statistics AS SELECT user_agent, COUNT(user_agent)+123 AS total_count FROM ngx_access_log GROUP BY user_agent;
CREATE OR REPLACE FLOW user_agent_flow SINK TO user_agent_statistics EVAL INTERVAL '1m' AS SELECT user_agent, COUNT(user_agent)+123 AS total_count FROM ngx_access_log GROUP BY user_agent;
Affected Rows: 0

View File

@@ -10,11 +10,11 @@ INSERT INTO ngx_access_log VALUES ('192.168.1.1', 'GET', '/index.html', 200, 512
-- SQLNESS SLEEP 1s
INSERT INTO ngx_access_log VALUES ('192.168.1.1', 'GET', '/index.html', 200, 512, 'Mozilla/5.0', 1024, '2023-10-01T10:00:00Z'), ('192.168.1.2', 'POST', '/submit', 201, 256, 'curl/7.68.0', 512, '2023-10-01T10:01:00Z'), ('192.168.1.1', 'GET', '/about.html', 200, 128, 'Mozilla/5.0', 256, '2023-10-01T10:02:00Z'), ('192.168.1.3', 'GET', '/contact', 404, 64, 'curl/7.68.0', 128, '2023-10-01T10:03:00Z');
CREATE FLOW user_agent_flow SINK TO user_agent_statistics AS SELECT user_agent, COUNT(user_agent) AS total_count FROM ngx_access_log GROUP BY user_agent;
CREATE FLOW user_agent_flow SINK TO user_agent_statistics EVAL INTERVAL '1m' AS SELECT user_agent, COUNT(user_agent) AS total_count FROM ngx_access_log GROUP BY user_agent;
SELECT created_time <= updated_time, created_time IS NOT NULL, source_table_names FROM information_schema.flows WHERE flow_name = 'user_agent_flow';
CREATE OR REPLACE FLOW user_agent_flow SINK TO user_agent_statistics AS SELECT user_agent, COUNT(user_agent)+123 AS total_count FROM ngx_access_log GROUP BY user_agent;
CREATE OR REPLACE FLOW user_agent_flow SINK TO user_agent_statistics EVAL INTERVAL '1m' AS SELECT user_agent, COUNT(user_agent)+123 AS total_count FROM ngx_access_log GROUP BY user_agent;
SELECT created_time < updated_time, created_time IS NOT NULL, updated_time IS NOT NULL, source_table_names FROM information_schema.flows WHERE flow_name = 'user_agent_flow';