From 9c106ebda695c24a80e84801ba8b66cc1b969d3d Mon Sep 17 00:00:00 2001 From: discord9 Date: Wed, 6 May 2026 12:35:17 +0800 Subject: [PATCH] more per review Signed-off-by: discord9 --- src/client/src/database.rs | 13 +++++++---- src/flow/src/batching_mode/frontend_client.rs | 17 ++++---------- src/query/src/metrics.rs | 23 +++++++++++++++---- 3 files changed, 33 insertions(+), 20 deletions(-) diff --git a/src/client/src/database.rs b/src/client/src/database.rs index d065eae273..96905af786 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -352,10 +352,15 @@ where }; } FlightMessage::AffectedRows { .. } | FlightMessage::Schema(_) => { - yield IllegalFlightMessagesSnafu {reason: format!("A Schema message must be succeeded exclusively by a set of RecordBatch messages, flight_message: {:?}", flight_message)} - .fail() - .map_err(BoxedError::new) - .context(ExternalSnafu); + yield IllegalFlightMessagesSnafu { + reason: format!( + "A Schema message must be succeeded exclusively by a set of RecordBatch messages, flight_message: {:?}", + flight_message + ) + } + .fail() + .map_err(BoxedError::new) + .context(ExternalSnafu); break; } } diff --git a/src/flow/src/batching_mode/frontend_client.rs b/src/flow/src/batching_mode/frontend_client.rs index 9875564c78..c29c52846b 100644 --- a/src/flow/src/batching_mode/frontend_client.rs +++ b/src/flow/src/batching_mode/frontend_client.rs @@ -196,6 +196,9 @@ impl DatabaseWithPeer { } impl FrontendClient { + // TODO: support more fine-grained load balancing strategies for frontend + // selection, such as AZ (availability zone) awareness, to prefer frontends + // in the same zone as the flownode and reduce cross-AZ latency. /// scan for available frontend from metadata pub(crate) async fn scan_for_frontend(&self) -> Result, Error> { let Self::Distributed { meta_client, .. } = self else { @@ -314,12 +317,7 @@ impl FrontendClient { database_client .handler .lock() - .map_err(|e| { - UnexpectedSnafu { - reason: format!("Failed to lock database client: {e}"), - } - .build() - })? + .unwrap() .as_ref() .context(UnexpectedSnafu { reason: "Standalone's frontend instance is not set", @@ -392,12 +390,7 @@ impl FrontendClient { database_client .handler .lock() - .map_err(|e| { - UnexpectedSnafu { - reason: format!("Failed to lock database client: {e}"), - } - .build() - })? + .unwrap() .as_ref() .context(UnexpectedSnafu { reason: "Standalone's frontend instance is not set", diff --git a/src/query/src/metrics.rs b/src/query/src/metrics.rs index 7b517354fe..7541b191fa 100644 --- a/src/query/src/metrics.rs +++ b/src/query/src/metrics.rs @@ -272,14 +272,29 @@ fn collect_region_watermarks(plan: Arc) -> Vec, entries: impl IntoIterator, ) { - // Merge by correctness rather than by maximum/minimum sequence. Any - // explicit unproved entry (`None`) vetoes a proved watermark, and - // conflicting proofs degrade the region back to unproved so Flow cannot - // advance checkpoints from ambiguous terminal metrics. for entry in entries { merged .entry(entry.region_id)