fix: only scan bind once

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-03-27 17:33:23 +08:00
parent a7bef32bb0
commit ace65a3804

View File

@@ -183,6 +183,10 @@ impl TableProvider for DummyTableProvider {
request.filters = filters.to_vec();
request.limit = limit;
if let Some(query_ctx) = &self.query_ctx {
reuse_bound_snapshot_sequence(query_ctx, self.region_id, &mut request);
}
let scanner = self
.engine
.handle_query(self.region_id, request.clone())
@@ -330,6 +334,8 @@ fn scan_request_from_query_context(
..Default::default()
};
reuse_bound_snapshot_sequence(query_ctx, region_id, &mut scan_request);
let flow_extensions = FlowQueryExtensions::from_extensions(&query_ctx.extensions())?;
let should_apply_incremental = flow_extensions.validate_for_scan(region_id)?;
@@ -346,6 +352,17 @@ fn scan_request_from_query_context(
Ok(scan_request)
}
fn reuse_bound_snapshot_sequence(
query_ctx: &QueryContext,
region_id: RegionId,
scan_request: &mut ScanRequest,
) {
if let Some(snapshot_sequence) = query_ctx.get_snapshot(region_id.as_u64()) {
scan_request.memtable_max_sequence = Some(snapshot_sequence);
scan_request.snapshot_on_scan = false;
}
}
fn query_requires_snapshot_bound(query_ctx: &QueryContext) -> bool {
FlowQueryExtensions::from_extensions(&query_ctx.extensions())
.map(|extensions| extensions.should_collect_region_watermark())
@@ -540,8 +557,8 @@ mod tests {
let request = scan_request_from_query_context(region_id, &query_ctx).unwrap();
assert!(request.snapshot_on_scan);
assert_eq!(request.memtable_max_sequence, None);
assert!(!request.snapshot_on_scan);
assert_eq!(request.memtable_max_sequence, Some(42));
assert_eq!(request.sst_min_sequence, Some(7));
}
@@ -577,12 +594,53 @@ mod tests {
.build();
let request = scan_request_from_query_context(region_id, &query_ctx).unwrap();
assert_eq!(request.memtable_max_sequence, None);
assert_eq!(request.memtable_max_sequence, Some(100));
assert_eq!(request.sst_min_sequence, Some(90));
assert_eq!(request.memtable_min_sequence, None);
assert!(!request.snapshot_on_scan);
}
#[test]
fn test_scan_request_from_query_context_reuses_existing_snapshot_for_incremental_scan() {
let region_id = test_region_id();
let query_ctx = QueryContextBuilder::default()
.extensions(HashMap::from([(
FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
format!(r#"{{"{}":10}}"#, region_id.as_u64()),
)]))
.snapshot_seqs(Arc::new(RwLock::new(HashMap::from([(
region_id.as_u64(),
42_u64,
)]))))
.build();
let request = scan_request_from_query_context(region_id, &query_ctx).unwrap();
assert_eq!(request.memtable_min_sequence, Some(10));
assert_eq!(request.memtable_max_sequence, Some(42));
assert!(!request.snapshot_on_scan);
}
#[test]
fn test_reuse_bound_snapshot_sequence_updates_cached_scan_request() {
let region_id = test_region_id();
let query_ctx = QueryContextBuilder::default()
.snapshot_seqs(Arc::new(RwLock::new(HashMap::from([(
region_id.as_u64(),
88_u64,
)]))))
.build();
let mut request = ScanRequest {
snapshot_on_scan: true,
..Default::default()
};
reuse_bound_snapshot_sequence(&query_ctx, region_id, &mut request);
assert_eq!(request.memtable_max_sequence, Some(88));
assert!(!request.snapshot_on_scan);
}
#[test]
fn test_bind_snapshot_bound_region_seq_reuses_existing_snapshot() {
let region_id = test_region_id();