diff --git a/src/query/src/dummy_catalog.rs b/src/query/src/dummy_catalog.rs index 61fddbf46f..1902e3cd2b 100644 --- a/src/query/src/dummy_catalog.rs +++ b/src/query/src/dummy_catalog.rs @@ -329,20 +329,25 @@ fn scan_request_from_query_context( query_ctx: &QueryContext, ) -> Result { let decision = decide_flow_scan(query_ctx, region_id)?; - let mut scan_request = build_scan_request(query_ctx, region_id, &decision); - apply_cached_snapshot(query_ctx, region_id, &decision, &mut scan_request); - - Ok(scan_request) + Ok(build_scan_request(query_ctx, region_id, &decision)) } #[derive(Debug, Clone, PartialEq, Eq)] struct FlowScanDecision { /// Whether this region is the flow sink-table scan. + /// Sink scans intentionally bypass incremental and snapshot-binding semantics. is_sink_scan: bool, - /// Whether this region should bind a snapshot upper bound on scan open. + /// Whether this scan should bind a memtable upper bound when opening the scan. + /// This is only the initial intent; if a cached bound already exists in `query_ctx`, + /// we reuse that cached bound instead and clear this flag. snapshot_on_scan: bool, - /// Optional lower exclusive memtable sequence bound. + /// Optional lower exclusive memtable sequence bound for incremental reads. + /// When set, only rows with sequence strictly greater than this bound are read from memtables. memtable_min_sequence: Option, + /// Optional cached per-region snapshot already bound in `query_ctx`. + /// When present, this becomes the effective memtable upper bound and suppresses + /// binding a new snapshot on scan open. + memtable_max_sequence: Option, } impl FlowScanDecision { @@ -351,6 +356,7 @@ impl FlowScanDecision { is_sink_scan: true, snapshot_on_scan: false, memtable_min_sequence: None, + memtable_max_sequence: None, } } } @@ -358,16 +364,20 @@ impl FlowScanDecision { fn decide_flow_scan(query_ctx: &QueryContext, region_id: RegionId) -> Result { let flow_extensions = FlowQueryExtensions::from_extensions(&query_ctx.extensions())?; - // Fast path for normal queries: no flow-related scan context at all. + // Fast path for normal queries: no flow-related scan context at all, so keep + // the request as a plain scan and avoid any flow-specific decision making. if !flow_extensions.has_flow_context() { return Ok(FlowScanDecision { is_sink_scan: false, snapshot_on_scan: false, memtable_min_sequence: None, + memtable_max_sequence: query_ctx.get_snapshot(region_id.as_u64()), }); } - // Sink-table scans intentionally bypass all flow scan semantics. + // Sink-table scans intentionally bypass all flow scan semantics. They should + // behave like plain reads and must not participate in incremental lower bounds + // or per-region snapshot binding/reuse. if flow_extensions.sink_table_id == Some(region_id.table_id()) { return Ok(FlowScanDecision::plain_scan()); } @@ -384,10 +394,14 @@ fn decide_flow_scan(query_ctx: &QueryContext, region_id: RegionId) -> Result ScanRequest { + // Build the initial scan request from the final decision known at provider creation + // time. A later scan may still refresh `memtable_max_sequence` if another source scan + // has bound a snapshot into `query_ctx` after this provider was created. ScanRequest { sst_min_sequence: query_ctx.sst_min_sequence(region_id.as_u64()), snapshot_on_scan: decision.snapshot_on_scan, memtable_min_sequence: decision.memtable_min_sequence, + memtable_max_sequence: decision.memtable_max_sequence, ..Default::default() } } -fn apply_cached_snapshot( - query_ctx: &QueryContext, - region_id: RegionId, - decision: &FlowScanDecision, - scan_request: &mut ScanRequest, -) { - if decision.is_sink_scan { - return; - } - - apply_cached_snapshot_to_request(query_ctx, region_id, scan_request); -} - fn apply_cached_snapshot_to_request( query_ctx: &QueryContext, region_id: RegionId, scan_request: &mut ScanRequest, ) { if let Some(snapshot_sequence) = query_ctx.get_snapshot(region_id.as_u64()) { + // Reuse the previously bound per-region snapshot instead of rebinding a new + // upper bound on scan open. This refresh is still needed at scan time because + // the provider's cached request may have been built before another source scan + // bound the shared query-level snapshot into `query_ctx`. scan_request.memtable_max_sequence = Some(snapshot_sequence); scan_request.snapshot_on_scan = false; }