From ace65a380438067bc7dd4a993bb27bd62aadcd27 Mon Sep 17 00:00:00 2001 From: discord9 Date: Fri, 27 Mar 2026 17:33:23 +0800 Subject: [PATCH] fix: only scan bind once Signed-off-by: discord9 --- src/query/src/dummy_catalog.rs | 64 ++++++++++++++++++++++++++++++++-- 1 file changed, 61 insertions(+), 3 deletions(-) diff --git a/src/query/src/dummy_catalog.rs b/src/query/src/dummy_catalog.rs index af485a6142..7cd01d4ceb 100644 --- a/src/query/src/dummy_catalog.rs +++ b/src/query/src/dummy_catalog.rs @@ -183,6 +183,10 @@ impl TableProvider for DummyTableProvider { request.filters = filters.to_vec(); request.limit = limit; + if let Some(query_ctx) = &self.query_ctx { + reuse_bound_snapshot_sequence(query_ctx, self.region_id, &mut request); + } + let scanner = self .engine .handle_query(self.region_id, request.clone()) @@ -330,6 +334,8 @@ fn scan_request_from_query_context( ..Default::default() }; + reuse_bound_snapshot_sequence(query_ctx, region_id, &mut scan_request); + let flow_extensions = FlowQueryExtensions::from_extensions(&query_ctx.extensions())?; let should_apply_incremental = flow_extensions.validate_for_scan(region_id)?; @@ -346,6 +352,17 @@ fn scan_request_from_query_context( Ok(scan_request) } +fn reuse_bound_snapshot_sequence( + query_ctx: &QueryContext, + region_id: RegionId, + scan_request: &mut ScanRequest, +) { + if let Some(snapshot_sequence) = query_ctx.get_snapshot(region_id.as_u64()) { + scan_request.memtable_max_sequence = Some(snapshot_sequence); + scan_request.snapshot_on_scan = false; + } +} + fn query_requires_snapshot_bound(query_ctx: &QueryContext) -> bool { FlowQueryExtensions::from_extensions(&query_ctx.extensions()) .map(|extensions| extensions.should_collect_region_watermark()) @@ -540,8 +557,8 @@ mod tests { let request = scan_request_from_query_context(region_id, &query_ctx).unwrap(); - assert!(request.snapshot_on_scan); - assert_eq!(request.memtable_max_sequence, None); + assert!(!request.snapshot_on_scan); + assert_eq!(request.memtable_max_sequence, Some(42)); assert_eq!(request.sst_min_sequence, Some(7)); } @@ -577,12 +594,53 @@ mod tests { .build(); let request = scan_request_from_query_context(region_id, &query_ctx).unwrap(); - assert_eq!(request.memtable_max_sequence, None); + assert_eq!(request.memtable_max_sequence, Some(100)); assert_eq!(request.sst_min_sequence, Some(90)); assert_eq!(request.memtable_min_sequence, None); assert!(!request.snapshot_on_scan); } + #[test] + fn test_scan_request_from_query_context_reuses_existing_snapshot_for_incremental_scan() { + let region_id = test_region_id(); + let query_ctx = QueryContextBuilder::default() + .extensions(HashMap::from([( + FLOW_INCREMENTAL_AFTER_SEQS.to_string(), + format!(r#"{{"{}":10}}"#, region_id.as_u64()), + )])) + .snapshot_seqs(Arc::new(RwLock::new(HashMap::from([( + region_id.as_u64(), + 42_u64, + )])))) + .build(); + + let request = scan_request_from_query_context(region_id, &query_ctx).unwrap(); + + assert_eq!(request.memtable_min_sequence, Some(10)); + assert_eq!(request.memtable_max_sequence, Some(42)); + assert!(!request.snapshot_on_scan); + } + + #[test] + fn test_reuse_bound_snapshot_sequence_updates_cached_scan_request() { + let region_id = test_region_id(); + let query_ctx = QueryContextBuilder::default() + .snapshot_seqs(Arc::new(RwLock::new(HashMap::from([( + region_id.as_u64(), + 88_u64, + )])))) + .build(); + let mut request = ScanRequest { + snapshot_on_scan: true, + ..Default::default() + }; + + reuse_bound_snapshot_sequence(&query_ctx, region_id, &mut request); + + assert_eq!(request.memtable_max_sequence, Some(88)); + assert!(!request.snapshot_on_scan); + } + #[test] fn test_bind_snapshot_bound_region_seq_reuses_existing_snapshot() { let region_id = test_region_id();