refactor: pre review

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-04-13 14:38:04 +08:00
parent 3aa82a46b6
commit 6fcd9e4d17
2 changed files with 80 additions and 36 deletions

View File

@@ -363,18 +363,16 @@ impl FlowScanDecision {
}
fn decide_flow_scan(query_ctx: &QueryContext, region_id: RegionId) -> Result<FlowScanDecision> {
let flow_extensions = FlowQueryExtensions::from_extensions(&query_ctx.extensions())?;
// Fast path for normal queries: no flow-related scan context at all, so keep
// the request as a plain scan and avoid any flow-specific decision making.
if !flow_extensions.has_flow_context() {
let Some(flow_extensions) =
FlowQueryExtensions::parse_flow_extensions(&query_ctx.extensions())?
else {
return Ok(FlowScanDecision {
is_sink_scan: false,
snapshot_on_scan: false,
memtable_min_sequence: None,
memtable_max_sequence: query_ctx.get_snapshot(region_id.as_u64()),
});
}
};
// Sink-table scans intentionally bypass all flow scan semantics. They should
// behave like plain reads and must not participate in incremental lower bounds

View File

@@ -71,7 +71,21 @@ pub struct FlowQueryExtensions {
}
impl FlowQueryExtensions {
pub fn from_extensions(extensions: &HashMap<String, String>) -> Result<Self> {
/// Parses flow-specific query extensions when any flow key is present.
///
/// Returns `Ok(None)` for ordinary queries with no flow-related extensions,
/// `Ok(Some(_))` when flow context is present and valid, and `Err(_)` when a
/// flow-related extension is present but malformed or incomplete.
pub fn parse_flow_extensions(extensions: &HashMap<String, String>) -> Result<Option<Self>> {
let has_flow_context = extensions.contains_key(FLOW_INCREMENTAL_AFTER_SEQS)
|| extensions.contains_key(FLOW_INCREMENTAL_MODE)
|| extensions.contains_key(FLOW_RETURN_REGION_SEQ)
|| extensions.contains_key(FLOW_SINK_TABLE_ID);
if !has_flow_context {
return Ok(None);
}
let incremental_mode = extensions
.get(FLOW_INCREMENTAL_MODE)
.map(|value| match value.as_str() {
@@ -127,12 +141,12 @@ impl FlowQueryExtensions {
}
}
Ok(Self {
Ok(Some(Self {
incremental_after_seqs,
incremental_mode,
return_region_seq,
sink_table_id,
})
}))
}
pub fn validate_for_scan(&self, source_region_id: RegionId) -> Result<bool> {
@@ -162,18 +176,6 @@ impl FlowQueryExtensions {
Ok(self.incremental_after_seqs.is_some())
}
/// Returns whether the parsed extensions carry any flow-specific scan context.
///
/// This is used as the fast-path guard for ordinary queries in `dummy_catalog`.
/// Keep this helper in sync when adding new `FlowQueryExtensions` fields that
/// influence flow scan semantics.
pub fn has_flow_context(&self) -> bool {
self.incremental_after_seqs.is_some()
|| self.incremental_mode.is_some()
|| self.return_region_seq
|| self.sink_table_id.is_some()
}
pub fn should_collect_region_watermark(&self) -> bool {
self.return_region_seq || self.incremental_after_seqs.is_some()
}
@@ -242,14 +244,11 @@ mod flow_extension_tests {
use super::*;
#[test]
fn test_parse_flow_extensions_default() {
fn test_parse_flow_extensions_returns_none_for_non_flow_query() {
let exts = HashMap::new();
let parsed = FlowQueryExtensions::from_extensions(&exts).unwrap();
let parsed = FlowQueryExtensions::parse_flow_extensions(&exts).unwrap();
assert_eq!(parsed.incremental_mode, None);
assert_eq!(parsed.incremental_after_seqs, None);
assert!(!parsed.return_region_seq);
assert_eq!(parsed.sink_table_id, None);
assert_eq!(parsed, None);
}
#[test]
@@ -267,7 +266,9 @@ mod flow_extension_tests {
(FLOW_SINK_TABLE_ID.to_string(), "1024".to_string()),
]);
let parsed = FlowQueryExtensions::from_extensions(&exts).unwrap();
let parsed = FlowQueryExtensions::parse_flow_extensions(&exts)
.unwrap()
.unwrap();
assert_eq!(
parsed.incremental_mode,
Some(FlowIncrementalMode::MemtableOnly)
@@ -287,7 +288,7 @@ mod flow_extension_tests {
FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string(),
)]);
let err = FlowQueryExtensions::from_extensions(&exts).unwrap_err();
let err = FlowQueryExtensions::parse_flow_extensions(&exts).unwrap_err();
assert!(format!("{err}").contains(FLOW_INCREMENTAL_AFTER_SEQS));
}
@@ -295,7 +296,7 @@ mod flow_extension_tests {
fn test_parse_flow_extensions_invalid_mode() {
let exts = HashMap::from([(FLOW_INCREMENTAL_MODE.to_string(), "foo".to_string())]);
let err = FlowQueryExtensions::from_extensions(&exts).unwrap_err();
let err = FlowQueryExtensions::parse_flow_extensions(&exts).unwrap_err();
assert!(format!("{err}").contains(FLOW_INCREMENTAL_MODE));
}
@@ -312,7 +313,7 @@ mod flow_extension_tests {
),
]);
let err = FlowQueryExtensions::from_extensions(&exts).unwrap_err();
let err = FlowQueryExtensions::parse_flow_extensions(&exts).unwrap_err();
assert!(format!("{err}").contains(FLOW_INCREMENTAL_AFTER_SEQS));
}
@@ -323,7 +324,9 @@ mod flow_extension_tests {
r#"{"1":"10","2":"20"}"#.to_string(),
)]);
let parsed = FlowQueryExtensions::from_extensions(&exts).unwrap();
let parsed = FlowQueryExtensions::parse_flow_extensions(&exts)
.unwrap()
.unwrap();
assert_eq!(
parsed.incremental_after_seqs.unwrap(),
HashMap::from([(1, 10), (2, 20)])
@@ -337,7 +340,7 @@ mod flow_extension_tests {
r#"{"1":true}"#.to_string(),
)]);
let err = FlowQueryExtensions::from_extensions(&exts).unwrap_err();
let err = FlowQueryExtensions::parse_flow_extensions(&exts).unwrap_err();
assert!(format!("{err}").contains(FLOW_INCREMENTAL_AFTER_SEQS));
}
@@ -345,7 +348,7 @@ mod flow_extension_tests {
fn test_parse_flow_extensions_invalid_sink_table_id() {
let exts = HashMap::from([(FLOW_SINK_TABLE_ID.to_string(), "x".to_string())]);
let err = FlowQueryExtensions::from_extensions(&exts).unwrap_err();
let err = FlowQueryExtensions::parse_flow_extensions(&exts).unwrap_err();
assert!(format!("{err}").contains(FLOW_SINK_TABLE_ID));
}
@@ -364,7 +367,9 @@ mod flow_extension_tests {
),
]);
let parsed = FlowQueryExtensions::from_extensions(&exts).unwrap();
let parsed = FlowQueryExtensions::parse_flow_extensions(&exts)
.unwrap()
.unwrap();
let err = parsed.validate_for_scan(source_region_id).unwrap_err();
assert!(format!("{err}").contains("Missing region"));
}
@@ -384,7 +389,9 @@ mod flow_extension_tests {
(FLOW_SINK_TABLE_ID.to_string(), "1024".to_string()),
]);
let parsed = FlowQueryExtensions::from_extensions(&exts).unwrap();
let parsed = FlowQueryExtensions::parse_flow_extensions(&exts)
.unwrap()
.unwrap();
let apply_incremental = parsed.validate_for_scan(source_region_id).unwrap();
assert!(!apply_incremental);
}
@@ -412,4 +419,43 @@ mod flow_extension_tests {
};
assert!(parsed.should_collect_region_watermark());
}
#[test]
fn test_parse_flow_extensions_return_region_seq_only_returns_some() {
let exts = HashMap::from([(FLOW_RETURN_REGION_SEQ.to_string(), "true".to_string())]);
let parsed = FlowQueryExtensions::parse_flow_extensions(&exts)
.unwrap()
.unwrap();
assert!(parsed.return_region_seq);
}
#[test]
fn test_parse_flow_extensions_sink_table_only_returns_some() {
let exts = HashMap::from([(FLOW_SINK_TABLE_ID.to_string(), "1024".to_string())]);
let parsed = FlowQueryExtensions::parse_flow_extensions(&exts)
.unwrap()
.unwrap();
assert_eq!(parsed.sink_table_id, Some(1024));
}
#[test]
fn test_parse_flow_extensions_incremental_after_seqs_only_returns_some() {
let exts = HashMap::from([(
FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
r#"{"1":10}"#.to_string(),
)]);
let parsed = FlowQueryExtensions::parse_flow_extensions(&exts)
.unwrap()
.unwrap();
assert_eq!(
parsed.incremental_after_seqs,
Some(HashMap::from([(1, 10)]))
);
}
}