refactor: flow snapshot decision

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-03-31 20:06:37 +08:00
parent 01b3c0370a
commit 72abb5169d
2 changed files with 102 additions and 30 deletions

View File

@@ -184,7 +184,7 @@ impl TableProvider for DummyTableProvider {
request.limit = limit;
if let Some(query_ctx) = &self.query_ctx {
reuse_bound_snapshot_sequence(query_ctx, self.region_id, &mut request);
apply_cached_snapshot_to_request(query_ctx, self.region_id, &mut request);
}
let scanner = self
@@ -328,31 +328,96 @@ fn scan_request_from_query_context(
region_id: RegionId,
query_ctx: &QueryContext,
) -> Result<ScanRequest> {
let mut scan_request = ScanRequest {
sst_min_sequence: query_ctx.sst_min_sequence(region_id.as_u64()),
snapshot_on_scan: query_requires_snapshot_bound(query_ctx),
..Default::default()
};
reuse_bound_snapshot_sequence(query_ctx, region_id, &mut scan_request);
let flow_extensions = FlowQueryExtensions::from_extensions(&query_ctx.extensions())?;
let should_apply_incremental = flow_extensions.validate_for_scan(region_id)?;
if should_apply_incremental
&& let Some(after_seq) = flow_extensions
.incremental_after_seqs
.as_ref()
.and_then(|seqs| seqs.get(&region_id.as_u64()))
.copied()
{
scan_request.memtable_min_sequence = Some(after_seq);
}
let decision = decide_flow_scan(query_ctx, region_id)?;
let mut scan_request = build_scan_request(query_ctx, region_id, &decision);
apply_cached_snapshot(query_ctx, region_id, &decision, &mut scan_request);
Ok(scan_request)
}
fn reuse_bound_snapshot_sequence(
#[derive(Debug, Clone, PartialEq, Eq)]
struct FlowScanDecision {
/// Whether this region is the flow sink-table scan.
is_sink_scan: bool,
/// Whether this region should bind a snapshot upper bound on scan open.
snapshot_on_scan: bool,
/// Optional lower exclusive memtable sequence bound.
memtable_min_sequence: Option<u64>,
}
impl FlowScanDecision {
fn plain_scan() -> Self {
Self {
is_sink_scan: true,
snapshot_on_scan: false,
memtable_min_sequence: None,
}
}
}
fn decide_flow_scan(query_ctx: &QueryContext, region_id: RegionId) -> Result<FlowScanDecision> {
let flow_extensions = FlowQueryExtensions::from_extensions(&query_ctx.extensions())?;
// Fast path for normal queries: no flow-related scan context at all.
if !flow_extensions.has_flow_context() {
return Ok(FlowScanDecision {
is_sink_scan: false,
snapshot_on_scan: false,
memtable_min_sequence: None,
});
}
// Sink-table scans intentionally bypass all flow scan semantics.
if flow_extensions.sink_table_id == Some(region_id.table_id()) {
return Ok(FlowScanDecision::plain_scan());
}
let apply_incremental = flow_extensions.validate_for_scan(region_id)?;
let memtable_min_sequence = if apply_incremental {
flow_extensions
.incremental_after_seqs
.as_ref()
.and_then(|seqs| seqs.get(&region_id.as_u64()))
.copied()
} else {
None
};
Ok(FlowScanDecision {
is_sink_scan: false,
snapshot_on_scan: flow_extensions.should_collect_region_watermark(),
memtable_min_sequence,
})
}
fn build_scan_request(
query_ctx: &QueryContext,
region_id: RegionId,
decision: &FlowScanDecision,
) -> ScanRequest {
ScanRequest {
sst_min_sequence: query_ctx.sst_min_sequence(region_id.as_u64()),
snapshot_on_scan: decision.snapshot_on_scan,
memtable_min_sequence: decision.memtable_min_sequence,
..Default::default()
}
}
fn apply_cached_snapshot(
query_ctx: &QueryContext,
region_id: RegionId,
decision: &FlowScanDecision,
scan_request: &mut ScanRequest,
) {
if decision.is_sink_scan {
return;
}
apply_cached_snapshot_to_request(query_ctx, region_id, scan_request);
}
fn apply_cached_snapshot_to_request(
query_ctx: &QueryContext,
region_id: RegionId,
scan_request: &mut ScanRequest,
@@ -363,12 +428,6 @@ fn reuse_bound_snapshot_sequence(
}
}
fn query_requires_snapshot_bound(query_ctx: &QueryContext) -> bool {
FlowQueryExtensions::from_extensions(&query_ctx.extensions())
.map(|extensions| extensions.should_collect_region_watermark())
.unwrap_or(false)
}
fn bind_snapshot_bound_region_seq(
query_ctx: &QueryContext,
region_id: RegionId,
@@ -622,7 +681,7 @@ mod tests {
}
#[test]
fn test_reuse_bound_snapshot_sequence_updates_cached_scan_request() {
fn test_apply_cached_snapshot_to_request_updates_cached_scan_request() {
let region_id = test_region_id();
let query_ctx = QueryContextBuilder::default()
.snapshot_seqs(Arc::new(RwLock::new(HashMap::from([(
@@ -635,7 +694,7 @@ mod tests {
..Default::default()
};
reuse_bound_snapshot_sequence(&query_ctx, region_id, &mut request);
apply_cached_snapshot_to_request(&query_ctx, region_id, &mut request);
assert_eq!(request.memtable_max_sequence, Some(88));
assert!(!request.snapshot_on_scan);
@@ -706,10 +765,16 @@ mod tests {
region_id.table_id().to_string(),
),
]))
.snapshot_seqs(Arc::new(RwLock::new(HashMap::from([(
region_id.as_u64(),
88_u64,
)]))))
.build();
let request = scan_request_from_query_context(region_id, &query_ctx).unwrap();
assert_eq!(request.memtable_min_sequence, None);
assert_eq!(request.memtable_max_sequence, None);
assert!(!request.snapshot_on_scan);
}
#[test]

View File

@@ -162,6 +162,13 @@ impl FlowQueryExtensions {
Ok(self.incremental_after_seqs.is_some())
}
pub fn has_flow_context(&self) -> bool {
self.incremental_after_seqs.is_some()
|| self.incremental_mode.is_some()
|| self.return_region_seq
|| self.sink_table_id.is_some()
}
pub fn should_collect_region_watermark(&self) -> bool {
self.return_region_seq || self.incremental_after_seqs.is_some()
}