From 6fcd9e4d17db1b8691c8a05b5ac6780b3bf31e0a Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 13 Apr 2026 14:38:04 +0800 Subject: [PATCH] refactor: pre review Signed-off-by: discord9 --- src/query/src/dummy_catalog.rs | 10 ++-- src/query/src/options.rs | 106 +++++++++++++++++++++++---------- 2 files changed, 80 insertions(+), 36 deletions(-) diff --git a/src/query/src/dummy_catalog.rs b/src/query/src/dummy_catalog.rs index acc219ce5f..945a2a67e0 100644 --- a/src/query/src/dummy_catalog.rs +++ b/src/query/src/dummy_catalog.rs @@ -363,18 +363,16 @@ impl FlowScanDecision { } fn decide_flow_scan(query_ctx: &QueryContext, region_id: RegionId) -> Result { - let flow_extensions = FlowQueryExtensions::from_extensions(&query_ctx.extensions())?; - - // Fast path for normal queries: no flow-related scan context at all, so keep - // the request as a plain scan and avoid any flow-specific decision making. - if !flow_extensions.has_flow_context() { + let Some(flow_extensions) = + FlowQueryExtensions::parse_flow_extensions(&query_ctx.extensions())? + else { return Ok(FlowScanDecision { is_sink_scan: false, snapshot_on_scan: false, memtable_min_sequence: None, memtable_max_sequence: query_ctx.get_snapshot(region_id.as_u64()), }); - } + }; // Sink-table scans intentionally bypass all flow scan semantics. They should // behave like plain reads and must not participate in incremental lower bounds diff --git a/src/query/src/options.rs b/src/query/src/options.rs index 0f81b40747..46b8f1e413 100644 --- a/src/query/src/options.rs +++ b/src/query/src/options.rs @@ -71,7 +71,21 @@ pub struct FlowQueryExtensions { } impl FlowQueryExtensions { - pub fn from_extensions(extensions: &HashMap) -> Result { + /// Parses flow-specific query extensions when any flow key is present. + /// + /// Returns `Ok(None)` for ordinary queries with no flow-related extensions, + /// `Ok(Some(_))` when flow context is present and valid, and `Err(_)` when a + /// flow-related extension is present but malformed or incomplete. + pub fn parse_flow_extensions(extensions: &HashMap) -> Result> { + let has_flow_context = extensions.contains_key(FLOW_INCREMENTAL_AFTER_SEQS) + || extensions.contains_key(FLOW_INCREMENTAL_MODE) + || extensions.contains_key(FLOW_RETURN_REGION_SEQ) + || extensions.contains_key(FLOW_SINK_TABLE_ID); + + if !has_flow_context { + return Ok(None); + } + let incremental_mode = extensions .get(FLOW_INCREMENTAL_MODE) .map(|value| match value.as_str() { @@ -127,12 +141,12 @@ impl FlowQueryExtensions { } } - Ok(Self { + Ok(Some(Self { incremental_after_seqs, incremental_mode, return_region_seq, sink_table_id, - }) + })) } pub fn validate_for_scan(&self, source_region_id: RegionId) -> Result { @@ -162,18 +176,6 @@ impl FlowQueryExtensions { Ok(self.incremental_after_seqs.is_some()) } - /// Returns whether the parsed extensions carry any flow-specific scan context. - /// - /// This is used as the fast-path guard for ordinary queries in `dummy_catalog`. - /// Keep this helper in sync when adding new `FlowQueryExtensions` fields that - /// influence flow scan semantics. - pub fn has_flow_context(&self) -> bool { - self.incremental_after_seqs.is_some() - || self.incremental_mode.is_some() - || self.return_region_seq - || self.sink_table_id.is_some() - } - pub fn should_collect_region_watermark(&self) -> bool { self.return_region_seq || self.incremental_after_seqs.is_some() } @@ -242,14 +244,11 @@ mod flow_extension_tests { use super::*; #[test] - fn test_parse_flow_extensions_default() { + fn test_parse_flow_extensions_returns_none_for_non_flow_query() { let exts = HashMap::new(); - let parsed = FlowQueryExtensions::from_extensions(&exts).unwrap(); + let parsed = FlowQueryExtensions::parse_flow_extensions(&exts).unwrap(); - assert_eq!(parsed.incremental_mode, None); - assert_eq!(parsed.incremental_after_seqs, None); - assert!(!parsed.return_region_seq); - assert_eq!(parsed.sink_table_id, None); + assert_eq!(parsed, None); } #[test] @@ -267,7 +266,9 @@ mod flow_extension_tests { (FLOW_SINK_TABLE_ID.to_string(), "1024".to_string()), ]); - let parsed = FlowQueryExtensions::from_extensions(&exts).unwrap(); + let parsed = FlowQueryExtensions::parse_flow_extensions(&exts) + .unwrap() + .unwrap(); assert_eq!( parsed.incremental_mode, Some(FlowIncrementalMode::MemtableOnly) @@ -287,7 +288,7 @@ mod flow_extension_tests { FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string(), )]); - let err = FlowQueryExtensions::from_extensions(&exts).unwrap_err(); + let err = FlowQueryExtensions::parse_flow_extensions(&exts).unwrap_err(); assert!(format!("{err}").contains(FLOW_INCREMENTAL_AFTER_SEQS)); } @@ -295,7 +296,7 @@ mod flow_extension_tests { fn test_parse_flow_extensions_invalid_mode() { let exts = HashMap::from([(FLOW_INCREMENTAL_MODE.to_string(), "foo".to_string())]); - let err = FlowQueryExtensions::from_extensions(&exts).unwrap_err(); + let err = FlowQueryExtensions::parse_flow_extensions(&exts).unwrap_err(); assert!(format!("{err}").contains(FLOW_INCREMENTAL_MODE)); } @@ -312,7 +313,7 @@ mod flow_extension_tests { ), ]); - let err = FlowQueryExtensions::from_extensions(&exts).unwrap_err(); + let err = FlowQueryExtensions::parse_flow_extensions(&exts).unwrap_err(); assert!(format!("{err}").contains(FLOW_INCREMENTAL_AFTER_SEQS)); } @@ -323,7 +324,9 @@ mod flow_extension_tests { r#"{"1":"10","2":"20"}"#.to_string(), )]); - let parsed = FlowQueryExtensions::from_extensions(&exts).unwrap(); + let parsed = FlowQueryExtensions::parse_flow_extensions(&exts) + .unwrap() + .unwrap(); assert_eq!( parsed.incremental_after_seqs.unwrap(), HashMap::from([(1, 10), (2, 20)]) @@ -337,7 +340,7 @@ mod flow_extension_tests { r#"{"1":true}"#.to_string(), )]); - let err = FlowQueryExtensions::from_extensions(&exts).unwrap_err(); + let err = FlowQueryExtensions::parse_flow_extensions(&exts).unwrap_err(); assert!(format!("{err}").contains(FLOW_INCREMENTAL_AFTER_SEQS)); } @@ -345,7 +348,7 @@ mod flow_extension_tests { fn test_parse_flow_extensions_invalid_sink_table_id() { let exts = HashMap::from([(FLOW_SINK_TABLE_ID.to_string(), "x".to_string())]); - let err = FlowQueryExtensions::from_extensions(&exts).unwrap_err(); + let err = FlowQueryExtensions::parse_flow_extensions(&exts).unwrap_err(); assert!(format!("{err}").contains(FLOW_SINK_TABLE_ID)); } @@ -364,7 +367,9 @@ mod flow_extension_tests { ), ]); - let parsed = FlowQueryExtensions::from_extensions(&exts).unwrap(); + let parsed = FlowQueryExtensions::parse_flow_extensions(&exts) + .unwrap() + .unwrap(); let err = parsed.validate_for_scan(source_region_id).unwrap_err(); assert!(format!("{err}").contains("Missing region")); } @@ -384,7 +389,9 @@ mod flow_extension_tests { (FLOW_SINK_TABLE_ID.to_string(), "1024".to_string()), ]); - let parsed = FlowQueryExtensions::from_extensions(&exts).unwrap(); + let parsed = FlowQueryExtensions::parse_flow_extensions(&exts) + .unwrap() + .unwrap(); let apply_incremental = parsed.validate_for_scan(source_region_id).unwrap(); assert!(!apply_incremental); } @@ -412,4 +419,43 @@ mod flow_extension_tests { }; assert!(parsed.should_collect_region_watermark()); } + + #[test] + fn test_parse_flow_extensions_return_region_seq_only_returns_some() { + let exts = HashMap::from([(FLOW_RETURN_REGION_SEQ.to_string(), "true".to_string())]); + + let parsed = FlowQueryExtensions::parse_flow_extensions(&exts) + .unwrap() + .unwrap(); + + assert!(parsed.return_region_seq); + } + + #[test] + fn test_parse_flow_extensions_sink_table_only_returns_some() { + let exts = HashMap::from([(FLOW_SINK_TABLE_ID.to_string(), "1024".to_string())]); + + let parsed = FlowQueryExtensions::parse_flow_extensions(&exts) + .unwrap() + .unwrap(); + + assert_eq!(parsed.sink_table_id, Some(1024)); + } + + #[test] + fn test_parse_flow_extensions_incremental_after_seqs_only_returns_some() { + let exts = HashMap::from([( + FLOW_INCREMENTAL_AFTER_SEQS.to_string(), + r#"{"1":10}"#.to_string(), + )]); + + let parsed = FlowQueryExtensions::parse_flow_extensions(&exts) + .unwrap() + .unwrap(); + + assert_eq!( + parsed.incremental_after_seqs, + Some(HashMap::from([(1, 10)])) + ); + } }