refactor: one place modify

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-04-01 15:23:31 +08:00
parent e3a0d1769f
commit 83822730e5

View File

@@ -329,20 +329,25 @@ fn scan_request_from_query_context(
query_ctx: &QueryContext,
) -> Result<ScanRequest> {
let decision = decide_flow_scan(query_ctx, region_id)?;
let mut scan_request = build_scan_request(query_ctx, region_id, &decision);
apply_cached_snapshot(query_ctx, region_id, &decision, &mut scan_request);
Ok(scan_request)
Ok(build_scan_request(query_ctx, region_id, &decision))
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct FlowScanDecision {
/// Whether this region is the flow sink-table scan.
/// Sink scans intentionally bypass incremental and snapshot-binding semantics.
is_sink_scan: bool,
/// Whether this region should bind a snapshot upper bound on scan open.
/// Whether this scan should bind a memtable upper bound when opening the scan.
/// This is only the initial intent; if a cached bound already exists in `query_ctx`,
/// we reuse that cached bound instead and clear this flag.
snapshot_on_scan: bool,
/// Optional lower exclusive memtable sequence bound.
/// Optional lower exclusive memtable sequence bound for incremental reads.
/// When set, only rows with sequence strictly greater than this bound are read from memtables.
memtable_min_sequence: Option<u64>,
/// Optional cached per-region snapshot already bound in `query_ctx`.
/// When present, this becomes the effective memtable upper bound and suppresses
/// binding a new snapshot on scan open.
memtable_max_sequence: Option<u64>,
}
impl FlowScanDecision {
@@ -351,6 +356,7 @@ impl FlowScanDecision {
is_sink_scan: true,
snapshot_on_scan: false,
memtable_min_sequence: None,
memtable_max_sequence: None,
}
}
}
@@ -358,16 +364,20 @@ impl FlowScanDecision {
fn decide_flow_scan(query_ctx: &QueryContext, region_id: RegionId) -> Result<FlowScanDecision> {
let flow_extensions = FlowQueryExtensions::from_extensions(&query_ctx.extensions())?;
// Fast path for normal queries: no flow-related scan context at all.
// Fast path for normal queries: no flow-related scan context at all, so keep
// the request as a plain scan and avoid any flow-specific decision making.
if !flow_extensions.has_flow_context() {
return Ok(FlowScanDecision {
is_sink_scan: false,
snapshot_on_scan: false,
memtable_min_sequence: None,
memtable_max_sequence: query_ctx.get_snapshot(region_id.as_u64()),
});
}
// Sink-table scans intentionally bypass all flow scan semantics.
// Sink-table scans intentionally bypass all flow scan semantics. They should
// behave like plain reads and must not participate in incremental lower bounds
// or per-region snapshot binding/reuse.
if flow_extensions.sink_table_id == Some(region_id.table_id()) {
return Ok(FlowScanDecision::plain_scan());
}
@@ -384,10 +394,14 @@ fn decide_flow_scan(query_ctx: &QueryContext, region_id: RegionId) -> Result<Flo
None
};
let memtable_max_sequence = query_ctx.get_snapshot(region_id.as_u64());
Ok(FlowScanDecision {
is_sink_scan: false,
snapshot_on_scan: flow_extensions.should_collect_region_watermark(),
snapshot_on_scan: memtable_max_sequence.is_none()
&& flow_extensions.should_collect_region_watermark(),
memtable_min_sequence,
memtable_max_sequence,
})
}
@@ -396,33 +410,28 @@ fn build_scan_request(
region_id: RegionId,
decision: &FlowScanDecision,
) -> ScanRequest {
// Build the initial scan request from the final decision known at provider creation
// time. A later scan may still refresh `memtable_max_sequence` if another source scan
// has bound a snapshot into `query_ctx` after this provider was created.
ScanRequest {
sst_min_sequence: query_ctx.sst_min_sequence(region_id.as_u64()),
snapshot_on_scan: decision.snapshot_on_scan,
memtable_min_sequence: decision.memtable_min_sequence,
memtable_max_sequence: decision.memtable_max_sequence,
..Default::default()
}
}
fn apply_cached_snapshot(
query_ctx: &QueryContext,
region_id: RegionId,
decision: &FlowScanDecision,
scan_request: &mut ScanRequest,
) {
if decision.is_sink_scan {
return;
}
apply_cached_snapshot_to_request(query_ctx, region_id, scan_request);
}
fn apply_cached_snapshot_to_request(
query_ctx: &QueryContext,
region_id: RegionId,
scan_request: &mut ScanRequest,
) {
if let Some(snapshot_sequence) = query_ctx.get_snapshot(region_id.as_u64()) {
// Reuse the previously bound per-region snapshot instead of rebinding a new
// upper bound on scan open. This refresh is still needed at scan time because
// the provider's cached request may have been built before another source scan
// bound the shared query-level snapshot into `query_ctx`.
scan_request.memtable_max_sequence = Some(snapshot_sequence);
scan_request.snapshot_on_scan = false;
}