feat: flownode detect stale query

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-03-17 12:23:28 +08:00
parent da6eb85bbb
commit bc768617fb
2 changed files with 256 additions and 3 deletions

View File

@@ -179,6 +179,27 @@ pub struct DatabaseWithPeer {
pub peer: Peer,
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct FlowQueryFailure {
pub stale_cursor: Option<FlowStaleCursorDetail>,
}
impl FlowQueryFailure {
pub fn is_stale_cursor(&self) -> bool {
self.stale_cursor.is_some()
}
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct FlowStaleCursorDetail {
pub region_id: Option<String>,
pub given_seq: Option<u64>,
pub min_readable_seq: Option<u64>,
}
const STALE_CURSOR_TOKEN: &str = "STALE_CURSOR";
const STALE_CURSOR_RETRY_HINT: &str = "FALLBACK_FULL_RECOMPUTE";
impl DatabaseWithPeer {
fn new(database: Database, peer: Peer) -> Self {
Self { database, peer }
@@ -199,6 +220,13 @@ impl DatabaseWithPeer {
}
impl FrontendClient {
/// TODO(discord9): better way to detect stale cursor error instead of parsing the error message
pub fn inspect_query_error(err: &Error) -> FlowQueryFailure {
let debug = format!("{err:?}");
let stale_cursor = parse_stale_cursor_detail(&debug);
FlowQueryFailure { stale_cursor }
}
/// scan for available frontend from metadata
pub(crate) async fn scan_for_frontend(&self) -> Result<Vec<(NodeInfoKey, NodeInfo)>, Error> {
let Self::Distributed { meta_client, .. } = self else {
@@ -491,11 +519,37 @@ impl std::fmt::Display for PeerDesc {
}
}
fn parse_stale_cursor_detail(message: &str) -> Option<FlowStaleCursorDetail> {
if !message.contains(STALE_CURSOR_TOKEN) || !message.contains(STALE_CURSOR_RETRY_HINT) {
return None;
}
Some(FlowStaleCursorDetail {
region_id: extract_segment(message, "region: ", ", given_seq:"),
given_seq: extract_u64_segment(message, "given_seq: ", ", min_readable_seq:"),
min_readable_seq: extract_u64_segment(message, "min_readable_seq: ", ", retry_hint:"),
})
}
fn extract_segment(message: &str, start: &str, end: &str) -> Option<String> {
let start_idx = message.find(start)? + start.len();
let tail = &message[start_idx..];
let end_idx = tail.find(end)?;
Some(tail[..end_idx].trim().to_string())
}
fn extract_u64_segment(message: &str, start: &str, end: &str) -> Option<u64> {
extract_segment(message, start, end)?.parse().ok()
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use common_error::ext::PlainError;
use common_error::status_code::StatusCode;
use common_query::Output;
use snafu::GenerateImplicitData;
use tokio::time::timeout;
use super::*;
@@ -559,4 +613,41 @@ mod tests {
.is_ok()
);
}
#[test]
fn test_inspect_query_error_detects_stale_cursor() {
let err = Error::External {
source: BoxedError::new(PlainError::new(
"STALE_CURSOR: incremental query stale, region: 4398046511104(1024, 0), given_seq: 9, min_readable_seq: 18, retry_hint: FALLBACK_FULL_RECOMPUTE".to_string(),
StatusCode::EngineExecuteQuery,
)),
location: snafu::Location::generate(),
};
let failure = FrontendClient::inspect_query_error(&err);
assert!(failure.is_stale_cursor());
assert_eq!(
failure.stale_cursor,
Some(FlowStaleCursorDetail {
region_id: Some("4398046511104(1024, 0)".to_string()),
given_seq: Some(9),
min_readable_seq: Some(18),
})
);
}
#[test]
fn test_inspect_query_error_ignores_non_stale_error() {
let err = Error::External {
source: BoxedError::new(PlainError::new(
"ordinary query failure".to_string(),
StatusCode::EngineExecuteQuery,
)),
location: snafu::Location::generate(),
};
let failure = FrontendClient::inspect_query_error(&err);
assert!(!failure.is_stale_cursor());
assert_eq!(failure.stale_cursor, None);
}
}

View File

@@ -455,6 +455,44 @@ impl BatchingTask {
Ok(Some((res, elapsed)))
}
fn handle_flow_query_failure(&self, err: &Error, query: Option<&PlanInfo>) -> bool {
let failure = FrontendClient::inspect_query_error(err);
if failure.is_stale_cursor() {
warn!(
"Flow {} detected stale incremental query failure, switching to non-incremental recompute semantics for current query scope: {:?}",
self.config.flow_id, failure.stale_cursor
);
// notice that we only mark all as dirty if query itself has no time window filter.
if query.is_none_or(|query| query.filter.is_none())
&& let Err(mark_err) = self.mark_all_windows_as_dirty()
{
warn!(
"Flow {} failed to mark all windows dirty after stale incremental query without time-window scope: {}",
self.config.flow_id, mark_err
);
}
true
} else {
false
}
}
fn restore_dirty_windows_after_failure(&self, query: &PlanInfo, is_stale_cursor: bool) {
if is_stale_cursor && query.filter.is_none() {
return;
}
self.state.write().unwrap().dirty_time_windows.add_windows(
query
.filter
.as_ref()
.map(|f| f.time_ranges.clone())
.unwrap_or_default(),
);
}
/// start executing query in a loop, break when receive shutdown signal
///
/// any error will be logged when executing query
@@ -558,6 +596,7 @@ impl BatchingTask {
}
// TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed
Err(err) => {
let is_stale_cursor = self.handle_flow_query_failure(&err, new_query.as_ref());
METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT
.with_label_values(&[&flow_id_str])
.inc();
@@ -565,9 +604,7 @@ impl BatchingTask {
Some(query) => {
common_telemetry::error!(err; "Failed to execute query for flow={} with query: {}", self.config.flow_id, query.plan);
// Re-add dirty windows back since query failed
self.state.write().unwrap().dirty_time_windows.add_windows(
query.filter.map(|f| f.time_ranges).unwrap_or_default(),
);
self.restore_dirty_windows_after_failure(&query, is_stale_cursor);
// TODO(discord9): add some backoff here? half the query time window or what
// backoff meaning use smaller `max_window_cnt` for next query
@@ -979,8 +1016,11 @@ fn build_pk_from_aggr(plan: &LogicalPlan) -> Result<Option<TableDef>, Error> {
#[cfg(test)]
mod test {
use api::v1::column_def::try_as_column_schema;
use common_error::ext::{BoxedError, PlainError};
use common_error::status_code::StatusCode;
use pretty_assertions::assert_eq;
use session::context::QueryContext;
use snafu::GenerateImplicitData;
use super::*;
use crate::test_utils::create_test_query_engine;
@@ -1106,4 +1146,126 @@ mod test {
assert_eq!(tc.time_index, expr.time_index, "{:?}", tc.sql);
}
}
#[tokio::test]
async fn test_handle_flow_query_failure_marks_full_recompute_on_stale() {
let query_engine = create_test_query_engine();
let query_ctx = QueryContext::arc();
let plan = sql_to_df_plan(
query_ctx.clone(),
query_engine,
"SELECT number, ts FROM numbers_with_ts",
true,
)
.await
.unwrap();
let (_tx, rx) = tokio::sync::oneshot::channel();
let task = BatchingTask::try_new(TaskArgs {
flow_id: 42,
query: "SELECT number, ts FROM numbers_with_ts",
plan,
time_window_expr: None,
expire_after: None,
sink_table_name: [
"greptime".to_string(),
"public".to_string(),
"sink".to_string(),
],
source_table_names: vec![[
"greptime".to_string(),
"public".to_string(),
"numbers_with_ts".to_string(),
]],
query_ctx,
catalog_manager: create_test_query_engine()
.engine_state()
.catalog_manager()
.clone(),
shutdown_rx: rx,
batch_opts: Arc::new(BatchingModeOptions::default()),
flow_eval_interval: None,
})
.unwrap();
let err = Error::External {
source: BoxedError::new(PlainError::new(
"STALE_CURSOR: incremental query stale, region: 4398046511104(1024, 0), given_seq: 9, min_readable_seq: 18, retry_hint: FALLBACK_FULL_RECOMPUTE".to_string(),
StatusCode::EngineExecuteQuery,
)),
location: snafu::Location::generate(),
};
task.handle_flow_query_failure(&err, None);
let state = task.state.read().unwrap();
assert_eq!(state.dirty_time_windows.len(), 1);
}
#[tokio::test]
async fn test_stale_failure_preserves_current_time_window_scope() {
let query_engine = create_test_query_engine();
let query_ctx = QueryContext::arc();
let plan = sql_to_df_plan(
query_ctx.clone(),
query_engine,
"SELECT number, ts FROM numbers_with_ts",
true,
)
.await
.unwrap();
let (_tx, rx) = tokio::sync::oneshot::channel();
let task = BatchingTask::try_new(TaskArgs {
flow_id: 43,
query: "SELECT number, ts FROM numbers_with_ts",
plan: plan.clone(),
time_window_expr: None,
expire_after: None,
sink_table_name: [
"greptime".to_string(),
"public".to_string(),
"sink".to_string(),
],
source_table_names: vec![[
"greptime".to_string(),
"public".to_string(),
"numbers_with_ts".to_string(),
]],
query_ctx,
catalog_manager: create_test_query_engine()
.engine_state()
.catalog_manager()
.clone(),
shutdown_rx: rx,
batch_opts: Arc::new(BatchingModeOptions::default()),
flow_eval_interval: None,
})
.unwrap();
let err = Error::External {
source: BoxedError::new(PlainError::new(
"STALE_CURSOR: incremental query stale, region: 4398046511104(1024, 0), given_seq: 9, min_readable_seq: 18, retry_hint: FALLBACK_FULL_RECOMPUTE".to_string(),
StatusCode::EngineExecuteQuery,
)),
location: snafu::Location::generate(),
};
let query = PlanInfo {
plan,
filter: Some(FilterExprInfo {
expr: datafusion_expr::lit(true),
col_name: "ts".to_string(),
time_ranges: vec![(Timestamp::new_second(0), Timestamp::new_second(1))],
window_size: chrono::Duration::seconds(1),
}),
};
let is_stale_cursor = task.handle_flow_query_failure(&err, Some(&query));
task.restore_dirty_windows_after_failure(&query, is_stale_cursor);
let state = task.state.read().unwrap();
assert_eq!(state.dirty_time_windows.len(), 1);
assert_eq!(
state.dirty_time_windows.window_size(),
std::time::Duration::from_secs(1)
);
}
}