diff --git a/src/query/src/dummy_catalog.rs b/src/query/src/dummy_catalog.rs index 7cd01d4ceb..61fddbf46f 100644 --- a/src/query/src/dummy_catalog.rs +++ b/src/query/src/dummy_catalog.rs @@ -184,7 +184,7 @@ impl TableProvider for DummyTableProvider { request.limit = limit; if let Some(query_ctx) = &self.query_ctx { - reuse_bound_snapshot_sequence(query_ctx, self.region_id, &mut request); + apply_cached_snapshot_to_request(query_ctx, self.region_id, &mut request); } let scanner = self @@ -328,31 +328,96 @@ fn scan_request_from_query_context( region_id: RegionId, query_ctx: &QueryContext, ) -> Result { - let mut scan_request = ScanRequest { - sst_min_sequence: query_ctx.sst_min_sequence(region_id.as_u64()), - snapshot_on_scan: query_requires_snapshot_bound(query_ctx), - ..Default::default() - }; - - reuse_bound_snapshot_sequence(query_ctx, region_id, &mut scan_request); - - let flow_extensions = FlowQueryExtensions::from_extensions(&query_ctx.extensions())?; - - let should_apply_incremental = flow_extensions.validate_for_scan(region_id)?; - if should_apply_incremental - && let Some(after_seq) = flow_extensions - .incremental_after_seqs - .as_ref() - .and_then(|seqs| seqs.get(®ion_id.as_u64())) - .copied() - { - scan_request.memtable_min_sequence = Some(after_seq); - } + let decision = decide_flow_scan(query_ctx, region_id)?; + let mut scan_request = build_scan_request(query_ctx, region_id, &decision); + apply_cached_snapshot(query_ctx, region_id, &decision, &mut scan_request); Ok(scan_request) } -fn reuse_bound_snapshot_sequence( +#[derive(Debug, Clone, PartialEq, Eq)] +struct FlowScanDecision { + /// Whether this region is the flow sink-table scan. + is_sink_scan: bool, + /// Whether this region should bind a snapshot upper bound on scan open. + snapshot_on_scan: bool, + /// Optional lower exclusive memtable sequence bound. + memtable_min_sequence: Option, +} + +impl FlowScanDecision { + fn plain_scan() -> Self { + Self { + is_sink_scan: true, + snapshot_on_scan: false, + memtable_min_sequence: None, + } + } +} + +fn decide_flow_scan(query_ctx: &QueryContext, region_id: RegionId) -> Result { + let flow_extensions = FlowQueryExtensions::from_extensions(&query_ctx.extensions())?; + + // Fast path for normal queries: no flow-related scan context at all. + if !flow_extensions.has_flow_context() { + return Ok(FlowScanDecision { + is_sink_scan: false, + snapshot_on_scan: false, + memtable_min_sequence: None, + }); + } + + // Sink-table scans intentionally bypass all flow scan semantics. + if flow_extensions.sink_table_id == Some(region_id.table_id()) { + return Ok(FlowScanDecision::plain_scan()); + } + + let apply_incremental = flow_extensions.validate_for_scan(region_id)?; + + let memtable_min_sequence = if apply_incremental { + flow_extensions + .incremental_after_seqs + .as_ref() + .and_then(|seqs| seqs.get(®ion_id.as_u64())) + .copied() + } else { + None + }; + + Ok(FlowScanDecision { + is_sink_scan: false, + snapshot_on_scan: flow_extensions.should_collect_region_watermark(), + memtable_min_sequence, + }) +} + +fn build_scan_request( + query_ctx: &QueryContext, + region_id: RegionId, + decision: &FlowScanDecision, +) -> ScanRequest { + ScanRequest { + sst_min_sequence: query_ctx.sst_min_sequence(region_id.as_u64()), + snapshot_on_scan: decision.snapshot_on_scan, + memtable_min_sequence: decision.memtable_min_sequence, + ..Default::default() + } +} + +fn apply_cached_snapshot( + query_ctx: &QueryContext, + region_id: RegionId, + decision: &FlowScanDecision, + scan_request: &mut ScanRequest, +) { + if decision.is_sink_scan { + return; + } + + apply_cached_snapshot_to_request(query_ctx, region_id, scan_request); +} + +fn apply_cached_snapshot_to_request( query_ctx: &QueryContext, region_id: RegionId, scan_request: &mut ScanRequest, @@ -363,12 +428,6 @@ fn reuse_bound_snapshot_sequence( } } -fn query_requires_snapshot_bound(query_ctx: &QueryContext) -> bool { - FlowQueryExtensions::from_extensions(&query_ctx.extensions()) - .map(|extensions| extensions.should_collect_region_watermark()) - .unwrap_or(false) -} - fn bind_snapshot_bound_region_seq( query_ctx: &QueryContext, region_id: RegionId, @@ -622,7 +681,7 @@ mod tests { } #[test] - fn test_reuse_bound_snapshot_sequence_updates_cached_scan_request() { + fn test_apply_cached_snapshot_to_request_updates_cached_scan_request() { let region_id = test_region_id(); let query_ctx = QueryContextBuilder::default() .snapshot_seqs(Arc::new(RwLock::new(HashMap::from([( @@ -635,7 +694,7 @@ mod tests { ..Default::default() }; - reuse_bound_snapshot_sequence(&query_ctx, region_id, &mut request); + apply_cached_snapshot_to_request(&query_ctx, region_id, &mut request); assert_eq!(request.memtable_max_sequence, Some(88)); assert!(!request.snapshot_on_scan); @@ -706,10 +765,16 @@ mod tests { region_id.table_id().to_string(), ), ])) + .snapshot_seqs(Arc::new(RwLock::new(HashMap::from([( + region_id.as_u64(), + 88_u64, + )])))) .build(); let request = scan_request_from_query_context(region_id, &query_ctx).unwrap(); assert_eq!(request.memtable_min_sequence, None); + assert_eq!(request.memtable_max_sequence, None); + assert!(!request.snapshot_on_scan); } #[test] diff --git a/src/query/src/options.rs b/src/query/src/options.rs index 9b60b64759..2e1be0d79a 100644 --- a/src/query/src/options.rs +++ b/src/query/src/options.rs @@ -162,6 +162,13 @@ impl FlowQueryExtensions { Ok(self.incremental_after_seqs.is_some()) } + pub fn has_flow_context(&self) -> bool { + self.incremental_after_seqs.is_some() + || self.incremental_mode.is_some() + || self.return_region_seq + || self.sink_table_id.is_some() + } + pub fn should_collect_region_watermark(&self) -> bool { self.return_region_seq || self.incremental_after_seqs.is_some() }