diff --git a/src/flow/src/batching_mode/frontend_client.rs b/src/flow/src/batching_mode/frontend_client.rs index 8ec6de3a08..baf22e7a20 100644 --- a/src/flow/src/batching_mode/frontend_client.rs +++ b/src/flow/src/batching_mode/frontend_client.rs @@ -179,6 +179,27 @@ pub struct DatabaseWithPeer { pub peer: Peer, } +#[derive(Debug, Clone, PartialEq, Eq, Default)] +pub struct FlowQueryFailure { + pub stale_cursor: Option, +} + +impl FlowQueryFailure { + pub fn is_stale_cursor(&self) -> bool { + self.stale_cursor.is_some() + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Default)] +pub struct FlowStaleCursorDetail { + pub region_id: Option, + pub given_seq: Option, + pub min_readable_seq: Option, +} + +const STALE_CURSOR_TOKEN: &str = "STALE_CURSOR"; +const STALE_CURSOR_RETRY_HINT: &str = "FALLBACK_FULL_RECOMPUTE"; + impl DatabaseWithPeer { fn new(database: Database, peer: Peer) -> Self { Self { database, peer } @@ -199,6 +220,13 @@ impl DatabaseWithPeer { } impl FrontendClient { + /// TODO(discord9): better way to detect stale cursor error instead of parsing the error message + pub fn inspect_query_error(err: &Error) -> FlowQueryFailure { + let debug = format!("{err:?}"); + let stale_cursor = parse_stale_cursor_detail(&debug); + FlowQueryFailure { stale_cursor } + } + /// scan for available frontend from metadata pub(crate) async fn scan_for_frontend(&self) -> Result, Error> { let Self::Distributed { meta_client, .. } = self else { @@ -491,11 +519,37 @@ impl std::fmt::Display for PeerDesc { } } +fn parse_stale_cursor_detail(message: &str) -> Option { + if !message.contains(STALE_CURSOR_TOKEN) || !message.contains(STALE_CURSOR_RETRY_HINT) { + return None; + } + + Some(FlowStaleCursorDetail { + region_id: extract_segment(message, "region: ", ", given_seq:"), + given_seq: extract_u64_segment(message, "given_seq: ", ", min_readable_seq:"), + min_readable_seq: extract_u64_segment(message, "min_readable_seq: ", ", retry_hint:"), + }) +} + +fn extract_segment(message: &str, start: &str, end: &str) -> Option { + let start_idx = message.find(start)? + start.len(); + let tail = &message[start_idx..]; + let end_idx = tail.find(end)?; + Some(tail[..end_idx].trim().to_string()) +} + +fn extract_u64_segment(message: &str, start: &str, end: &str) -> Option { + extract_segment(message, start, end)?.parse().ok() +} + #[cfg(test)] mod tests { use std::time::Duration; + use common_error::ext::PlainError; + use common_error::status_code::StatusCode; use common_query::Output; + use snafu::GenerateImplicitData; use tokio::time::timeout; use super::*; @@ -559,4 +613,41 @@ mod tests { .is_ok() ); } + + #[test] + fn test_inspect_query_error_detects_stale_cursor() { + let err = Error::External { + source: BoxedError::new(PlainError::new( + "STALE_CURSOR: incremental query stale, region: 4398046511104(1024, 0), given_seq: 9, min_readable_seq: 18, retry_hint: FALLBACK_FULL_RECOMPUTE".to_string(), + StatusCode::EngineExecuteQuery, + )), + location: snafu::Location::generate(), + }; + + let failure = FrontendClient::inspect_query_error(&err); + assert!(failure.is_stale_cursor()); + assert_eq!( + failure.stale_cursor, + Some(FlowStaleCursorDetail { + region_id: Some("4398046511104(1024, 0)".to_string()), + given_seq: Some(9), + min_readable_seq: Some(18), + }) + ); + } + + #[test] + fn test_inspect_query_error_ignores_non_stale_error() { + let err = Error::External { + source: BoxedError::new(PlainError::new( + "ordinary query failure".to_string(), + StatusCode::EngineExecuteQuery, + )), + location: snafu::Location::generate(), + }; + + let failure = FrontendClient::inspect_query_error(&err); + assert!(!failure.is_stale_cursor()); + assert_eq!(failure.stale_cursor, None); + } } diff --git a/src/flow/src/batching_mode/task.rs b/src/flow/src/batching_mode/task.rs index 84c96cc7cd..dfd40945e2 100644 --- a/src/flow/src/batching_mode/task.rs +++ b/src/flow/src/batching_mode/task.rs @@ -455,6 +455,44 @@ impl BatchingTask { Ok(Some((res, elapsed))) } + fn handle_flow_query_failure(&self, err: &Error, query: Option<&PlanInfo>) -> bool { + let failure = FrontendClient::inspect_query_error(err); + if failure.is_stale_cursor() { + warn!( + "Flow {} detected stale incremental query failure, switching to non-incremental recompute semantics for current query scope: {:?}", + self.config.flow_id, failure.stale_cursor + ); + + // notice that we only mark all as dirty if query itself has no time window filter. + if query.is_none_or(|query| query.filter.is_none()) + && let Err(mark_err) = self.mark_all_windows_as_dirty() + { + warn!( + "Flow {} failed to mark all windows dirty after stale incremental query without time-window scope: {}", + self.config.flow_id, mark_err + ); + } + + true + } else { + false + } + } + + fn restore_dirty_windows_after_failure(&self, query: &PlanInfo, is_stale_cursor: bool) { + if is_stale_cursor && query.filter.is_none() { + return; + } + + self.state.write().unwrap().dirty_time_windows.add_windows( + query + .filter + .as_ref() + .map(|f| f.time_ranges.clone()) + .unwrap_or_default(), + ); + } + /// start executing query in a loop, break when receive shutdown signal /// /// any error will be logged when executing query @@ -558,6 +596,7 @@ impl BatchingTask { } // TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed Err(err) => { + let is_stale_cursor = self.handle_flow_query_failure(&err, new_query.as_ref()); METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT .with_label_values(&[&flow_id_str]) .inc(); @@ -565,9 +604,7 @@ impl BatchingTask { Some(query) => { common_telemetry::error!(err; "Failed to execute query for flow={} with query: {}", self.config.flow_id, query.plan); // Re-add dirty windows back since query failed - self.state.write().unwrap().dirty_time_windows.add_windows( - query.filter.map(|f| f.time_ranges).unwrap_or_default(), - ); + self.restore_dirty_windows_after_failure(&query, is_stale_cursor); // TODO(discord9): add some backoff here? half the query time window or what // backoff meaning use smaller `max_window_cnt` for next query @@ -979,8 +1016,11 @@ fn build_pk_from_aggr(plan: &LogicalPlan) -> Result, Error> { #[cfg(test)] mod test { use api::v1::column_def::try_as_column_schema; + use common_error::ext::{BoxedError, PlainError}; + use common_error::status_code::StatusCode; use pretty_assertions::assert_eq; use session::context::QueryContext; + use snafu::GenerateImplicitData; use super::*; use crate::test_utils::create_test_query_engine; @@ -1106,4 +1146,126 @@ mod test { assert_eq!(tc.time_index, expr.time_index, "{:?}", tc.sql); } } + + #[tokio::test] + async fn test_handle_flow_query_failure_marks_full_recompute_on_stale() { + let query_engine = create_test_query_engine(); + let query_ctx = QueryContext::arc(); + let plan = sql_to_df_plan( + query_ctx.clone(), + query_engine, + "SELECT number, ts FROM numbers_with_ts", + true, + ) + .await + .unwrap(); + let (_tx, rx) = tokio::sync::oneshot::channel(); + let task = BatchingTask::try_new(TaskArgs { + flow_id: 42, + query: "SELECT number, ts FROM numbers_with_ts", + plan, + time_window_expr: None, + expire_after: None, + sink_table_name: [ + "greptime".to_string(), + "public".to_string(), + "sink".to_string(), + ], + source_table_names: vec![[ + "greptime".to_string(), + "public".to_string(), + "numbers_with_ts".to_string(), + ]], + query_ctx, + catalog_manager: create_test_query_engine() + .engine_state() + .catalog_manager() + .clone(), + shutdown_rx: rx, + batch_opts: Arc::new(BatchingModeOptions::default()), + flow_eval_interval: None, + }) + .unwrap(); + + let err = Error::External { + source: BoxedError::new(PlainError::new( + "STALE_CURSOR: incremental query stale, region: 4398046511104(1024, 0), given_seq: 9, min_readable_seq: 18, retry_hint: FALLBACK_FULL_RECOMPUTE".to_string(), + StatusCode::EngineExecuteQuery, + )), + location: snafu::Location::generate(), + }; + + task.handle_flow_query_failure(&err, None); + + let state = task.state.read().unwrap(); + assert_eq!(state.dirty_time_windows.len(), 1); + } + + #[tokio::test] + async fn test_stale_failure_preserves_current_time_window_scope() { + let query_engine = create_test_query_engine(); + let query_ctx = QueryContext::arc(); + let plan = sql_to_df_plan( + query_ctx.clone(), + query_engine, + "SELECT number, ts FROM numbers_with_ts", + true, + ) + .await + .unwrap(); + let (_tx, rx) = tokio::sync::oneshot::channel(); + let task = BatchingTask::try_new(TaskArgs { + flow_id: 43, + query: "SELECT number, ts FROM numbers_with_ts", + plan: plan.clone(), + time_window_expr: None, + expire_after: None, + sink_table_name: [ + "greptime".to_string(), + "public".to_string(), + "sink".to_string(), + ], + source_table_names: vec![[ + "greptime".to_string(), + "public".to_string(), + "numbers_with_ts".to_string(), + ]], + query_ctx, + catalog_manager: create_test_query_engine() + .engine_state() + .catalog_manager() + .clone(), + shutdown_rx: rx, + batch_opts: Arc::new(BatchingModeOptions::default()), + flow_eval_interval: None, + }) + .unwrap(); + + let err = Error::External { + source: BoxedError::new(PlainError::new( + "STALE_CURSOR: incremental query stale, region: 4398046511104(1024, 0), given_seq: 9, min_readable_seq: 18, retry_hint: FALLBACK_FULL_RECOMPUTE".to_string(), + StatusCode::EngineExecuteQuery, + )), + location: snafu::Location::generate(), + }; + + let query = PlanInfo { + plan, + filter: Some(FilterExprInfo { + expr: datafusion_expr::lit(true), + col_name: "ts".to_string(), + time_ranges: vec![(Timestamp::new_second(0), Timestamp::new_second(1))], + window_size: chrono::Duration::seconds(1), + }), + }; + let is_stale_cursor = task.handle_flow_query_failure(&err, Some(&query)); + task.restore_dirty_windows_after_failure(&query, is_stale_cursor); + + let state = task.state.read().unwrap(); + assert_eq!(state.dirty_time_windows.len(), 1); + assert_eq!( + state.dirty_time_windows.window_size(), + std::time::Duration::from_secs(1) + ); + } }