more per review

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-05-06 12:35:17 +08:00
parent 2965487722
commit 9c106ebda6
3 changed files with 33 additions and 20 deletions

View File

@@ -352,10 +352,15 @@ where
};
}
FlightMessage::AffectedRows { .. } | FlightMessage::Schema(_) => {
yield IllegalFlightMessagesSnafu {reason: format!("A Schema message must be succeeded exclusively by a set of RecordBatch messages, flight_message: {:?}", flight_message)}
.fail()
.map_err(BoxedError::new)
.context(ExternalSnafu);
yield IllegalFlightMessagesSnafu {
reason: format!(
"A Schema message must be succeeded exclusively by a set of RecordBatch messages, flight_message: {:?}",
flight_message
)
}
.fail()
.map_err(BoxedError::new)
.context(ExternalSnafu);
break;
}
}

View File

@@ -196,6 +196,9 @@ impl DatabaseWithPeer {
}
impl FrontendClient {
// TODO: support more fine-grained load balancing strategies for frontend
// selection, such as AZ (availability zone) awareness, to prefer frontends
// in the same zone as the flownode and reduce cross-AZ latency.
/// scan for available frontend from metadata
pub(crate) async fn scan_for_frontend(&self) -> Result<Vec<Peer>, Error> {
let Self::Distributed { meta_client, .. } = self else {
@@ -314,12 +317,7 @@ impl FrontendClient {
database_client
.handler
.lock()
.map_err(|e| {
UnexpectedSnafu {
reason: format!("Failed to lock database client: {e}"),
}
.build()
})?
.unwrap()
.as_ref()
.context(UnexpectedSnafu {
reason: "Standalone's frontend instance is not set",
@@ -392,12 +390,7 @@ impl FrontendClient {
database_client
.handler
.lock()
.map_err(|e| {
UnexpectedSnafu {
reason: format!("Failed to lock database client: {e}"),
}
.build()
})?
.unwrap()
.as_ref()
.context(UnexpectedSnafu {
reason: "Standalone's frontend instance is not set",

View File

@@ -272,14 +272,29 @@ fn collect_region_watermarks(plan: Arc<dyn ExecutionPlan>) -> Vec<RegionWatermar
finalize_region_watermarks(merged)
}
/// Merge a batch of per-region watermark entries into the global merged state.
///
/// # Merge strategy: correctness over maximum
///
/// Flow checkpoint advancement requires provable watermarks so that incremental
/// queries never miss rows. This merge uses correctness-first semantics:
///
/// | Current state | New entry | Result | Rationale |
/// |---------------|-----------------|-------------------|-----------|
/// | Participated | Proved(seq) | Proved(seq) | First proof for the region |
/// | Participated | Unproved | Unproved | One branch cannot prove → region is unsafe |
/// | Proved(old) | Proved(same) | Proved(old) | Convergent proof, keep |
/// | Proved(old) | Proved(diff) | Conflict([old,diff]) | Ambiguous → degrade to unproved |
/// | Unproved | _anything_ | Unproved | Already unsafe, stays unsafe |
/// | Conflict{..} | Proved(seq) | Conflict[...seq] | Record for diagnostics |
///
/// Using `max(old, new)` would be incorrect because it could advance a
/// checkpoint past rows that a competing MergeScan sub-stage has not yet
/// scanned, causing Flow to skip data.
fn merge_region_watermark_entries(
merged: &mut BTreeMap<u64, MergeState>,
entries: impl IntoIterator<Item = RegionWatermarkEntry>,
) {
// Merge by correctness rather than by maximum/minimum sequence. Any
// explicit unproved entry (`None`) vetoes a proved watermark, and
// conflicting proofs degrade the region back to unproved so Flow cannot
// advance checkpoints from ambiguous terminal metrics.
for entry in entries {
merged
.entry(entry.region_id)