From 8123406fae4bfd8bc04bbc55fa221bd57f64be04 Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Mon, 20 Apr 2026 11:32:31 +0800 Subject: [PATCH] feat(flow): inc query scan bind seq (#7879) * feat: add sequence scan boundaries and stale detection Signed-off-by: discord9 * fix: only scan bind once Signed-off-by: discord9 * refactor: flow snapshot decision Signed-off-by: discord9 * refactor: one place modify Signed-off-by: discord9 * chore: per review Signed-off-by: discord9 * refactor: pre review Signed-off-by: discord9 * per review Signed-off-by: discord9 --------- Signed-off-by: discord9 --- src/mito2/src/engine.rs | 26 +- src/mito2/src/engine/scan_test.rs | 296 ++++++++++++++++++ src/mito2/src/error.rs | 16 + src/mito2/src/read/scan_region.rs | 29 +- src/mito2/src/read/seq_scan.rs | 4 + src/mito2/src/read/series_scan.rs | 4 + src/mito2/src/read/unordered_scan.rs | 4 + src/mito2/src/region.rs | 9 + src/query/src/dummy_catalog.rs | 449 ++++++++++++++++++++++++++- src/query/src/error.rs | 17 +- src/query/src/options.rs | 94 ++++-- 11 files changed, 916 insertions(+), 32 deletions(-) diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 5bd1002581..b3fa6ce75b 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -129,8 +129,8 @@ use crate::cache::{CacheManagerRef, CacheStrategy}; use crate::config::MitoConfig; use crate::engine::puffin_index::{IndexEntryContext, collect_index_entries_from_puffin}; use crate::error::{ - InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result, - SerdeJsonSnafu, SerializeColumnMetadataSnafu, + IncrementalQueryStaleSnafu, InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, + RegionNotFoundSnafu, Result, SerdeJsonSnafu, SerializeColumnMetadataSnafu, }; #[cfg(feature = "enterprise")] use crate::extension::BoxedExtensionRangeProviderFactory; @@ -1013,11 +1013,29 @@ impl EngineInner { /// Handles the scan `request` and returns a [ScanRegion]. #[tracing::instrument(skip_all, fields(region_id = %region_id))] - fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result { + fn scan_region(&self, region_id: RegionId, mut request: ScanRequest) -> Result { let query_start = Instant::now(); // Reading a region doesn't need to go through the region worker thread. let region = self.find_region(region_id)?; - let version = region.version(); + let version_data = region.version_control.current(); + let version = version_data.version; + + if request.snapshot_on_scan && request.memtable_max_sequence.is_none() { + request.memtable_max_sequence = Some(version_data.committed_sequence); + } + + if let Some(given_seq) = request.memtable_min_sequence { + let min_readable_seq = version.flushed_sequence; + ensure!( + given_seq >= min_readable_seq, + IncrementalQueryStaleSnafu { + region_id, + given_seq, + min_readable_seq, + } + ); + } + // Get cache. let cache_manager = self.workers.cache_manager(); diff --git a/src/mito2/src/engine/scan_test.rs b/src/mito2/src/engine/scan_test.rs index a39761ad01..d4d48b9fe6 100644 --- a/src/mito2/src/engine/scan_test.rs +++ b/src/mito2/src/engine/scan_test.rs @@ -27,16 +27,312 @@ use store_api::region_request::RegionRequest; use store_api::storage::{RegionId, ScanRequest, TimeSeriesDistribution}; use crate::config::MitoConfig; +use crate::error::Error; use crate::read::scan_region::Scanner; use crate::test_util; use crate::test_util::{CreateRequestBuilder, TestEnv}; +#[tokio::test] +async fn test_incremental_query_stale_error() { + let mut env = TestEnv::with_prefix("test_incremental_query_stale_error").await; + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + let column_schemas = test_util::rows_schema(&request); + + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let rows = Rows { + schema: column_schemas.clone(), + rows: test_util::build_rows(0, 3), + }; + test_util::put_rows(&engine, region_id, rows).await; + test_util::flush_region(&engine, region_id, None).await; + + let err = engine + .scanner( + region_id, + ScanRequest { + memtable_min_sequence: Some(0), + ..Default::default() + }, + ) + .await + .err() + .expect("expect stale incremental error"); + + let min_readable_seq = match &err { + Error::IncrementalQueryStale { + region_id: err_region_id, + given_seq, + min_readable_seq, + .. + } => { + assert_eq!(*err_region_id, region_id); + assert_eq!(*given_seq, 0); + assert!(*min_readable_seq > 0); + *min_readable_seq + } + _ => panic!("unexpected err: {err}"), + }; + assert_eq!(StatusCode::RequestOutdated, err.status_code()); + let err_msg = err.to_string(); + assert!(err_msg.contains("STALE_CURSOR")); + assert!(err_msg.contains(®ion_id.to_string())); + assert!(err_msg.contains("given_seq: 0")); + assert!(err_msg.contains(&format!("min_readable_seq: {min_readable_seq}"))); + + let incremental_rows = Rows { + schema: column_schemas, + rows: test_util::build_rows(3, 5), + }; + test_util::put_rows(&engine, region_id, incremental_rows).await; + + let scanner = engine + .scanner( + region_id, + ScanRequest { + memtable_min_sequence: Some(min_readable_seq), + sst_min_sequence: Some(u64::MAX), + ..Default::default() + }, + ) + .await + .unwrap(); + let stream = scanner.scan().await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!( + batches.pretty_print().unwrap(), + "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 3 | 3.0 | 1970-01-01T00:00:03 | +| 4 | 4.0 | 1970-01-01T00:00:04 | ++-------+---------+---------------------+" + ); +} + #[tokio::test] async fn test_scan_with_min_sst_sequence() { test_scan_with_min_sst_sequence_with_format(false).await; test_scan_with_min_sst_sequence_with_format(true).await; } +#[tokio::test] +async fn test_full_snapshot_upper_bound_does_not_constrain_sst_rows() { + let mut env = + TestEnv::with_prefix("test_full_snapshot_upper_bound_does_not_constrain_sst_rows").await; + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + let column_schemas = test_util::rows_schema(&request); + + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let first_rows = Rows { + schema: column_schemas.clone(), + rows: test_util::build_rows(0, 3), + }; + test_util::put_rows(&engine, region_id, first_rows).await; + test_util::flush_region(&engine, region_id, None).await; + + let snapshot_upper_bound = engine.get_committed_sequence(region_id).await.unwrap(); + + let second_rows = Rows { + schema: column_schemas, + rows: test_util::build_rows(3, 5), + }; + test_util::put_rows(&engine, region_id, second_rows).await; + test_util::flush_region(&engine, region_id, None).await; + + let scanner = engine + .scanner( + region_id, + ScanRequest { + memtable_max_sequence: Some(snapshot_upper_bound), + ..Default::default() + }, + ) + .await + .unwrap(); + + let stream = scanner.scan().await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let pretty = batches.pretty_print().unwrap(); + + assert!(pretty.contains("1970-01-01T00:00:03")); + assert!(pretty.contains("1970-01-01T00:00:04")); +} + +#[tokio::test] +async fn test_snapshot_bound_query_binds_memtable_upper_bound_at_scan_open() { + let mut env = + TestEnv::with_prefix("test_snapshot_bound_query_binds_memtable_upper_bound_at_scan_open") + .await; + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + let column_schemas = test_util::rows_schema(&request); + + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let first_rows = Rows { + schema: column_schemas.clone(), + rows: test_util::build_rows(0, 3), + }; + test_util::put_rows(&engine, region_id, first_rows).await; + + let expected_snapshot = engine.get_committed_sequence(region_id).await.unwrap(); + let scanner = engine + .scanner( + region_id, + ScanRequest { + snapshot_on_scan: true, + ..Default::default() + }, + ) + .await + .unwrap(); + assert_eq!(scanner.snapshot_sequence(), Some(expected_snapshot)); + + let second_rows = Rows { + schema: column_schemas, + rows: test_util::build_rows(3, 5), + }; + test_util::put_rows(&engine, region_id, second_rows).await; + + let stream = scanner.scan().await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!( + batches.pretty_print().unwrap(), + "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 2 | 2.0 | 1970-01-01T00:00:02 | ++-------+---------+---------------------+" + ); +} + +#[tokio::test] +async fn test_snapshot_bound_query_keeps_open_snapshot_after_late_flush() { + let mut env = + TestEnv::with_prefix("test_snapshot_bound_query_keeps_open_snapshot_after_late_flush") + .await; + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + let column_schemas = test_util::rows_schema(&request); + + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let first_rows = Rows { + schema: column_schemas.clone(), + rows: test_util::build_rows(0, 3), + }; + test_util::put_rows(&engine, region_id, first_rows).await; + + let expected_snapshot = engine.get_committed_sequence(region_id).await.unwrap(); + let scanner = engine + .scanner( + region_id, + ScanRequest { + snapshot_on_scan: true, + ..Default::default() + }, + ) + .await + .unwrap(); + assert_eq!(scanner.snapshot_sequence(), Some(expected_snapshot)); + + let second_rows = Rows { + schema: column_schemas, + rows: test_util::build_rows(3, 5), + }; + test_util::put_rows(&engine, region_id, second_rows).await; + test_util::flush_region(&engine, region_id, None).await; + + assert_eq!(scanner.snapshot_sequence(), Some(expected_snapshot)); +} + +#[tokio::test] +async fn test_snapshot_bound_query_keeps_correct_result_after_late_flush() { + let mut env = + TestEnv::with_prefix("test_snapshot_bound_query_keeps_correct_result_after_late_flush") + .await; + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + let column_schemas = test_util::rows_schema(&request); + + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let first_rows = Rows { + schema: column_schemas.clone(), + rows: test_util::build_rows(0, 3), + }; + test_util::put_rows(&engine, region_id, first_rows).await; + + let expected_snapshot = engine.get_committed_sequence(region_id).await.unwrap(); + let scanner = engine + .scanner( + region_id, + ScanRequest { + snapshot_on_scan: true, + ..Default::default() + }, + ) + .await + .unwrap(); + assert_eq!(scanner.snapshot_sequence(), Some(expected_snapshot)); + + let second_rows = Rows { + schema: column_schemas, + rows: test_util::build_rows(3, 5), + }; + test_util::put_rows(&engine, region_id, second_rows).await; + test_util::flush_region(&engine, region_id, None).await; + + assert_eq!(scanner.snapshot_sequence(), Some(expected_snapshot)); + + let stream = scanner.scan().await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!( + batches.pretty_print().unwrap(), + "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 2 | 2.0 | 1970-01-01T00:00:02 | ++-------+---------+---------------------+" + ); +} + async fn test_scan_with_min_sst_sequence_with_format(flat_format: bool) { let mut env = TestEnv::with_prefix("test_scan_with_min_sst_sequence").await; let engine = env diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index eb802d50b7..d7d3f656c5 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -228,6 +228,20 @@ pub enum Error { location: Location, }, + #[snafu(display( + "STALE_CURSOR: incremental query stale, region: {}, given_seq: {}, min_readable_seq: {}, retry_hint: FALLBACK_FULL_RECOMPUTE", + region_id, + given_seq, + min_readable_seq + ))] + IncrementalQueryStale { + region_id: RegionId, + given_seq: u64, + min_readable_seq: u64, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Old manifest missing for region {}", region_id))] MissingOldManifest { region_id: RegionId, @@ -1298,6 +1312,8 @@ impl ErrorExt for Error { | SerializePartitionExpr { .. } | InvalidSourceAndTargetRegion { .. } => StatusCode::InvalidArguments, + IncrementalQueryStale { .. } => StatusCode::RequestOutdated, + RegionMetadataNotFound { .. } | Join { .. } | WorkerStopped { .. } diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index e77b9422ab..99a78a5448 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -38,7 +38,8 @@ use snafu::{OptionExt as _, ResultExt}; use store_api::metadata::{RegionMetadata, RegionMetadataRef}; use store_api::region_engine::{PartitionRange, RegionScannerRef}; use store_api::storage::{ - ColumnId, RegionId, ScanRequest, SequenceRange, TimeSeriesDistribution, TimeSeriesRowSelector, + ColumnId, RegionId, ScanRequest, SequenceNumber, SequenceRange, TimeSeriesDistribution, + TimeSeriesRowSelector, }; use table::predicate::{Predicate, build_time_range_predicate, extract_time_range_from_expr}; use tokio::sync::{Semaphore, mpsc}; @@ -147,6 +148,14 @@ impl Scanner { } } + pub(crate) fn snapshot_sequence(&self) -> Option { + match self { + Scanner::Seq(seq_scan) => seq_scan.input().snapshot_sequence, + Scanner::Unordered(unordered_scan) => unordered_scan.input().snapshot_sequence, + Scanner::Series(series_scan) => series_scan.input().snapshot_sequence, + } + } + /// Sets the target partitions for the scanner. It can controls the parallelism of the scanner. pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) { use store_api::region_engine::{PrepareRequest, RegionScanner}; @@ -523,6 +532,12 @@ impl ScanRegion { .with_distribution(self.request.distribution) .with_explain_flat_format( self.version.options.sst_format == Some(crate::sst::FormatType::Flat), + ) + .with_snapshot_sequence( + self.request + .snapshot_on_scan + .then_some(self.request.memtable_max_sequence) + .flatten(), ); #[cfg(feature = "vector_index")] let input = input @@ -826,6 +841,8 @@ pub struct ScanInput { pub(crate) distribution: Option, /// Whether the region's configured SST format is flat. explain_flat_format: bool, + /// Snapshot upper bound bound at scan open and propagated back to the caller. + pub(crate) snapshot_sequence: Option, /// Whether this scan is for compaction. pub(crate) compaction: bool, #[cfg(feature = "enterprise")] @@ -862,6 +879,7 @@ impl ScanInput { series_row_selector: None, distribution: None, explain_flat_format: false, + snapshot_sequence: None, compaction: false, #[cfg(feature = "enterprise")] extension_ranges: Vec::new(), @@ -1024,6 +1042,15 @@ impl ScanInput { self } + #[must_use] + pub(crate) fn with_snapshot_sequence( + mut self, + snapshot_sequence: Option, + ) -> Self { + self.snapshot_sequence = snapshot_sequence; + self + } + /// Sets whether this scan is for compaction. #[must_use] pub(crate) fn with_compaction(mut self, compaction: bool) -> Self { diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 43e41c100f..fecd50564e 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -572,6 +572,10 @@ impl RegionScanner for SeqScan { fn set_logical_region(&mut self, logical_region: bool) { self.properties.set_logical_region(logical_region); } + + fn snapshot_sequence(&self) -> Option { + self.stream_ctx.input.snapshot_sequence + } } impl DisplayAs for SeqScan { diff --git a/src/mito2/src/read/series_scan.rs b/src/mito2/src/read/series_scan.rs index 7883c1d553..003a754363 100644 --- a/src/mito2/src/read/series_scan.rs +++ b/src/mito2/src/read/series_scan.rs @@ -380,6 +380,10 @@ impl RegionScanner for SeriesScan { fn set_logical_region(&mut self, logical_region: bool) { self.properties.set_logical_region(logical_region); } + + fn snapshot_sequence(&self) -> Option { + self.stream_ctx.input.snapshot_sequence + } } impl DisplayAs for SeriesScan { diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index 9763d14cd2..cb6e850439 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -351,6 +351,10 @@ impl RegionScanner for UnorderedScan { fn set_logical_region(&mut self, logical_region: bool) { self.properties.set_logical_region(logical_region); } + + fn snapshot_sequence(&self) -> Option { + self.stream_ctx.input.snapshot_sequence + } } impl DisplayAs for UnorderedScan { diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 3804b28afb..2e7b7a8ae5 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -313,6 +313,15 @@ impl MitoRegion { self.version_control.committed_sequence() } + /// Returns the latest sequence that has already been persisted into SSTs. + /// + /// Incremental memtable-only reads must use a cursor greater than or equal to + /// this boundary; older cursors are stale because the corresponding updates may + /// already have been flushed out of memtables. + pub fn flushed_sequence(&self) -> SequenceNumber { + self.version_control.current().version.flushed_sequence + } + /// Returns whether the region is readonly. pub fn is_follower(&self) -> bool { self.manifest_ctx.state.load() == RegionRoleState::Follower diff --git a/src/query/src/dummy_catalog.rs b/src/query/src/dummy_catalog.rs index 591d068f52..15001a81fa 100644 --- a/src/query/src/dummy_catalog.rs +++ b/src/query/src/dummy_catalog.rs @@ -43,6 +43,7 @@ use table::metadata::{TableId, TableInfoRef}; use table::table::scan::RegionScanExec; use crate::error::{GetRegionMetadataSnafu, Result}; +use crate::options::FlowQueryExtensions; /// Resolve to the given region (specified by [RegionId]) unconditionally. #[derive(Clone, Debug)] @@ -182,11 +183,26 @@ impl TableProvider for DummyTableProvider { request.filters = filters.to_vec(); request.limit = limit; + if let Some(query_ctx) = &self.query_ctx { + let is_sink_scan = is_sink_scan(query_ctx, self.region_id) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + apply_cached_snapshot_to_request(query_ctx, self.region_id, is_sink_scan, &mut request); + } + let scanner = self .engine .handle_query(self.region_id, request.clone()) .await .map_err(|e| DataFusionError::External(Box::new(e)))?; + + if request.snapshot_on_scan + && let Some(query_ctx) = &self.query_ctx + && let Some(snapshot_sequence) = scanner.snapshot_sequence() + { + bind_snapshot_bound_region_seq(query_ctx, self.region_id, snapshot_sequence) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + } + let query_memory_tracker = self.engine.query_memory_tracker(); let mut scan_exec = RegionScanExec::new(scanner, request, query_memory_tracker)?; if let Some(query_ctx) = &self.query_ctx { @@ -295,14 +311,11 @@ impl DummyTableProviderFactory { region_id, })?; - let scan_request = query_ctx - .as_ref() - .map(|ctx| ScanRequest { - memtable_max_sequence: ctx.get_snapshot(region_id.as_u64()), - sst_min_sequence: ctx.sst_min_sequence(region_id.as_u64()), - ..Default::default() - }) - .unwrap_or_default(); + let scan_request = if let Some(ctx) = query_ctx.as_ref() { + scan_request_from_query_context(region_id, ctx)? + } else { + ScanRequest::default() + }; Ok(DummyTableProvider { region_id, @@ -314,6 +327,152 @@ impl DummyTableProviderFactory { } } +fn scan_request_from_query_context( + region_id: RegionId, + query_ctx: &QueryContext, +) -> Result { + let decision = decide_flow_scan(query_ctx, region_id)?; + Ok(build_scan_request(query_ctx, region_id, &decision)) +} + +#[derive(Debug, Clone, PartialEq, Eq)] +struct FlowScanDecision { + /// Whether this region is the flow sink-table scan. + /// Sink scans intentionally bypass incremental and snapshot-binding semantics. + is_sink_scan: bool, + /// Whether this scan should bind a memtable upper bound when opening the scan. + /// This is only the initial intent; if a cached bound already exists in `query_ctx`, + /// we reuse that cached bound instead and clear this flag. + snapshot_on_scan: bool, + /// Optional lower exclusive memtable sequence bound for incremental reads. + /// When set, only rows with sequence strictly greater than this bound are read from memtables. + memtable_min_sequence: Option, + /// Optional cached per-region snapshot already bound in `query_ctx`. + /// When present, this becomes the effective memtable upper bound and suppresses + /// binding a new snapshot on scan open. + memtable_max_sequence: Option, +} + +impl FlowScanDecision { + fn plain_scan() -> Self { + Self { + is_sink_scan: true, + snapshot_on_scan: false, + memtable_min_sequence: None, + memtable_max_sequence: None, + } + } +} + +fn decide_flow_scan(query_ctx: &QueryContext, region_id: RegionId) -> Result { + let Some(flow_extensions) = + FlowQueryExtensions::parse_flow_extensions(&query_ctx.extensions())? + else { + return Ok(FlowScanDecision { + is_sink_scan: false, + snapshot_on_scan: false, + memtable_min_sequence: None, + memtable_max_sequence: query_ctx.get_snapshot(region_id.as_u64()), + }); + }; + + // Sink-table scans intentionally bypass all flow scan semantics. They should + // behave like plain reads and must not participate in incremental lower bounds + // or per-region snapshot binding/reuse. + if flow_extensions.sink_table_id == Some(region_id.table_id()) { + return Ok(FlowScanDecision::plain_scan()); + } + + let apply_incremental = flow_extensions.validate_for_scan(region_id)?; + + let memtable_min_sequence = if apply_incremental { + flow_extensions + .incremental_after_seqs + .as_ref() + .and_then(|seqs| seqs.get(®ion_id.as_u64())) + .copied() + } else { + None + }; + + let memtable_max_sequence = query_ctx.get_snapshot(region_id.as_u64()); + + Ok(FlowScanDecision { + is_sink_scan: false, + snapshot_on_scan: memtable_max_sequence.is_none() + && flow_extensions.should_collect_region_watermark(), + memtable_min_sequence, + memtable_max_sequence, + }) +} + +fn build_scan_request( + query_ctx: &QueryContext, + region_id: RegionId, + decision: &FlowScanDecision, +) -> ScanRequest { + // Build the initial scan request from the final decision known at provider creation + // time. A later scan may still refresh `memtable_max_sequence` if another source scan + // has bound a snapshot into `query_ctx` after this provider was created. + ScanRequest { + sst_min_sequence: (!decision.is_sink_scan) + .then(|| query_ctx.sst_min_sequence(region_id.as_u64())) + .flatten(), + snapshot_on_scan: decision.snapshot_on_scan, + memtable_min_sequence: decision.memtable_min_sequence, + memtable_max_sequence: decision.memtable_max_sequence, + ..Default::default() + } +} + +fn is_sink_scan(query_ctx: &QueryContext, region_id: RegionId) -> Result { + Ok( + FlowQueryExtensions::parse_flow_extensions(&query_ctx.extensions())? + .is_some_and(|exts| exts.sink_table_id == Some(region_id.table_id())), + ) +} + +fn apply_cached_snapshot_to_request( + query_ctx: &QueryContext, + region_id: RegionId, + is_sink_scan: bool, + scan_request: &mut ScanRequest, +) { + if is_sink_scan { + return; + } + + if let Some(snapshot_sequence) = query_ctx.get_snapshot(region_id.as_u64()) { + // Reuse the previously bound per-region snapshot instead of rebinding a new + // upper bound on scan open. This refresh is still needed at scan time because + // the provider's cached request may have been built before another source scan + // bound the shared query-level snapshot into `query_ctx`. + scan_request.memtable_max_sequence = Some(snapshot_sequence); + scan_request.snapshot_on_scan = false; + } +} + +fn bind_snapshot_bound_region_seq( + query_ctx: &QueryContext, + region_id: RegionId, + snapshot_sequence: u64, +) -> Result { + if let Some(existing) = query_ctx.get_snapshot(region_id.as_u64()) { + if existing != snapshot_sequence { + return crate::error::ConflictingSnapshotSequenceSnafu { + region_id, + existing, + new: snapshot_sequence, + } + .fail(); + } + Ok(existing) + } else { + query_ctx.set_snapshot(region_id.as_u64(), snapshot_sequence); + Ok(snapshot_sequence) + } +} + #[async_trait] impl TableProviderFactory for DummyTableProviderFactory { async fn create( @@ -443,3 +602,277 @@ impl CatalogManager for DummyCatalogManager { Box::pin(futures::stream::empty()) } } + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::{Arc, RwLock}; + + use common_error::ext::ErrorExt; + use common_error::status_code::StatusCode; + use session::context::QueryContextBuilder; + + use super::*; + use crate::error::Error; + use crate::options::{FLOW_INCREMENTAL_AFTER_SEQS, FLOW_INCREMENTAL_MODE, FLOW_SINK_TABLE_ID}; + + fn test_region_id() -> RegionId { + RegionId::new(1024, 1) + } + + #[test] + fn test_scan_request_from_query_context_uses_snapshot_bound_intent() { + let region_id = test_region_id(); + let query_ctx = QueryContextBuilder::default() + .extensions(HashMap::from([( + "flow.return_region_seq".to_string(), + "true".to_string(), + )])) + .snapshot_seqs(Arc::new(RwLock::new(HashMap::from([( + region_id.as_u64(), + 42_u64, + )])))) + .sst_min_sequences(Arc::new(RwLock::new(HashMap::from([( + region_id.as_u64(), + 7_u64, + )])))) + .build(); + + let request = scan_request_from_query_context(region_id, &query_ctx).unwrap(); + + assert!(!request.snapshot_on_scan); + assert_eq!(request.memtable_max_sequence, Some(42)); + assert_eq!(request.sst_min_sequence, Some(7)); + } + + #[test] + fn test_scan_request_from_incremental_context_uses_snapshot_bound_intent() { + let region_id = test_region_id(); + let query_ctx = QueryContextBuilder::default() + .extensions(HashMap::from([( + "flow.incremental_after_seqs".to_string(), + format!(r#"{{"{}":10}}"#, region_id.as_u64()), + )])) + .build(); + + let request = scan_request_from_query_context(region_id, &query_ctx).unwrap(); + + assert!(request.snapshot_on_scan); + assert_eq!(request.memtable_min_sequence, Some(10)); + assert_eq!(request.memtable_max_sequence, None); + } + + #[test] + fn test_scan_request_from_query_context_keeps_snapshot_fields() { + let region_id = test_region_id(); + let query_ctx = QueryContextBuilder::default() + .snapshot_seqs(Arc::new(RwLock::new(HashMap::from([( + region_id.as_u64(), + 100, + )])))) + .sst_min_sequences(Arc::new(RwLock::new(HashMap::from([( + region_id.as_u64(), + 90, + )])))) + .build(); + + let request = scan_request_from_query_context(region_id, &query_ctx).unwrap(); + assert_eq!(request.memtable_max_sequence, Some(100)); + assert_eq!(request.sst_min_sequence, Some(90)); + assert_eq!(request.memtable_min_sequence, None); + assert!(!request.snapshot_on_scan); + } + + #[test] + fn test_scan_request_from_query_context_reuses_existing_snapshot_for_incremental_scan() { + let region_id = test_region_id(); + let query_ctx = QueryContextBuilder::default() + .extensions(HashMap::from([( + FLOW_INCREMENTAL_AFTER_SEQS.to_string(), + format!(r#"{{"{}":10}}"#, region_id.as_u64()), + )])) + .snapshot_seqs(Arc::new(RwLock::new(HashMap::from([( + region_id.as_u64(), + 42_u64, + )])))) + .build(); + + let request = scan_request_from_query_context(region_id, &query_ctx).unwrap(); + + assert_eq!(request.memtable_min_sequence, Some(10)); + assert_eq!(request.memtable_max_sequence, Some(42)); + assert!(!request.snapshot_on_scan); + } + + #[test] + fn test_apply_cached_snapshot_to_request_updates_cached_scan_request() { + let region_id = test_region_id(); + let query_ctx = QueryContextBuilder::default() + .snapshot_seqs(Arc::new(RwLock::new(HashMap::from([( + region_id.as_u64(), + 88_u64, + )])))) + .build(); + let mut request = ScanRequest { + snapshot_on_scan: true, + ..Default::default() + }; + + apply_cached_snapshot_to_request(&query_ctx, region_id, false, &mut request); + + assert_eq!(request.memtable_max_sequence, Some(88)); + assert!(!request.snapshot_on_scan); + } + + #[test] + fn test_apply_cached_snapshot_to_request_skips_sink_scan() { + let region_id = test_region_id(); + let query_ctx = QueryContextBuilder::default() + .snapshot_seqs(Arc::new(RwLock::new(HashMap::from([( + region_id.as_u64(), + 88_u64, + )])))) + .build(); + let mut request = ScanRequest { + snapshot_on_scan: true, + ..Default::default() + }; + + apply_cached_snapshot_to_request(&query_ctx, region_id, true, &mut request); + + assert_eq!(request.memtable_max_sequence, None); + assert!(request.snapshot_on_scan); + } + + #[test] + fn test_bind_snapshot_bound_region_seq_reuses_existing_snapshot() { + let region_id = test_region_id(); + let query_ctx = QueryContextBuilder::default() + .snapshot_seqs(Arc::new(RwLock::new(HashMap::from([( + region_id.as_u64(), + 42_u64, + )])))) + .build(); + + let err = bind_snapshot_bound_region_seq(&query_ctx, region_id, 99).unwrap_err(); + + assert!(matches!(err, Error::ConflictingSnapshotSequence { .. })); + assert_eq!(query_ctx.get_snapshot(region_id.as_u64()), Some(42)); + } + + #[test] + fn test_bind_snapshot_bound_region_seq_sets_snapshot_once() { + let region_id = test_region_id(); + let query_ctx = QueryContextBuilder::default().build(); + + let seq = bind_snapshot_bound_region_seq(&query_ctx, region_id, 99).unwrap(); + + assert_eq!(seq, 99); + assert_eq!(query_ctx.get_snapshot(region_id.as_u64()), Some(99)); + } + + #[test] + fn test_scan_request_from_query_context_applies_incremental_after_seq_for_source_scan() { + let region_id = test_region_id(); + let query_ctx = QueryContextBuilder::default() + .extensions(HashMap::from([ + ( + FLOW_INCREMENTAL_MODE.to_string(), + "memtable_only".to_string(), + ), + ( + FLOW_INCREMENTAL_AFTER_SEQS.to_string(), + format!(r#"{{"{}":55}}"#, region_id.as_u64()), + ), + ])) + .build(); + + let request = scan_request_from_query_context(region_id, &query_ctx).unwrap(); + assert_eq!(request.memtable_min_sequence, Some(55)); + } + + #[test] + fn test_scan_request_from_query_context_does_not_apply_incremental_for_sink_table() { + let region_id = test_region_id(); + let query_ctx = QueryContextBuilder::default() + .extensions(HashMap::from([ + ( + FLOW_INCREMENTAL_MODE.to_string(), + "memtable_only".to_string(), + ), + ( + FLOW_INCREMENTAL_AFTER_SEQS.to_string(), + format!(r#"{{"{}":55}}"#, region_id.as_u64()), + ), + ( + FLOW_SINK_TABLE_ID.to_string(), + region_id.table_id().to_string(), + ), + ])) + .snapshot_seqs(Arc::new(RwLock::new(HashMap::from([( + region_id.as_u64(), + 88_u64, + )])))) + .sst_min_sequences(Arc::new(RwLock::new(HashMap::from([( + region_id.as_u64(), + 77_u64, + )])))) + .build(); + + let request = scan_request_from_query_context(region_id, &query_ctx).unwrap(); + assert_eq!(request.memtable_min_sequence, None); + assert_eq!(request.memtable_max_sequence, None); + assert_eq!(request.sst_min_sequence, None); + assert!(!request.snapshot_on_scan); + } + + #[test] + fn test_scan_request_from_query_context_rejects_missing_memtable_only_region() { + let region_id = test_region_id(); + let query_ctx = QueryContextBuilder::default() + .extensions(HashMap::from([ + ( + FLOW_INCREMENTAL_MODE.to_string(), + "memtable_only".to_string(), + ), + ( + FLOW_INCREMENTAL_AFTER_SEQS.to_string(), + r#"{"9":55}"#.to_string(), + ), + ])) + .build(); + + let err = scan_request_from_query_context(region_id, &query_ctx).unwrap_err(); + assert!(matches!(err, Error::InvalidQueryContextExtension { .. })); + } + + #[test] + fn test_scan_request_from_query_context_rejects_invalid_incremental_json() { + let region_id = test_region_id(); + let query_ctx = QueryContextBuilder::default() + .extensions(HashMap::from([( + FLOW_INCREMENTAL_AFTER_SEQS.to_string(), + "not-json".to_string(), + )])) + .build(); + + let err = scan_request_from_query_context(region_id, &query_ctx).unwrap_err(); + assert!(matches!(err, Error::InvalidQueryContextExtension { .. })); + assert_eq!(err.status_code(), StatusCode::InvalidArguments); + } + + #[test] + fn test_scan_request_from_query_context_rejects_invalid_sink_table_id() { + let region_id = test_region_id(); + let query_ctx = QueryContextBuilder::default() + .extensions(HashMap::from([( + FLOW_SINK_TABLE_ID.to_string(), + "abc".to_string(), + )])) + .build(); + + let err = scan_request_from_query_context(region_id, &query_ctx).unwrap_err(); + assert!(matches!(err, Error::InvalidQueryContextExtension { .. })); + assert_eq!(err.status_code(), StatusCode::InvalidArguments); + } +} diff --git a/src/query/src/error.rs b/src/query/src/error.rs index b3a4ebeba5..6d2a81bd73 100644 --- a/src/query/src/error.rs +++ b/src/query/src/error.rs @@ -375,6 +375,20 @@ pub enum Error { location: Location, }, + #[snafu(display( + "Conflicting snapshot sequence observed for region {} in a single query (existing={}, new={})", + region_id, + existing, + new + ))] + ConflictingSnapshotSequence { + region_id: RegionId, + existing: u64, + new: u64, + #[snafu(implicit)] + location: Location, + }, + #[snafu(transparent)] Datatypes { source: datatypes::error::Error, @@ -407,7 +421,8 @@ impl ErrorExt for Error { | CteColumnSchemaMismatch { .. } | ConvertValue { .. } | TryIntoDuration { .. } - | InvalidQueryContextExtension { .. } => StatusCode::InvalidArguments, + | InvalidQueryContextExtension { .. } + | ConflictingSnapshotSequence { .. } => StatusCode::InvalidArguments, BuildBackend { .. } | ListObjects { .. } => StatusCode::StorageUnavailable, diff --git a/src/query/src/options.rs b/src/query/src/options.rs index 9b60b64759..46b8f1e413 100644 --- a/src/query/src/options.rs +++ b/src/query/src/options.rs @@ -71,7 +71,21 @@ pub struct FlowQueryExtensions { } impl FlowQueryExtensions { - pub fn from_extensions(extensions: &HashMap) -> Result { + /// Parses flow-specific query extensions when any flow key is present. + /// + /// Returns `Ok(None)` for ordinary queries with no flow-related extensions, + /// `Ok(Some(_))` when flow context is present and valid, and `Err(_)` when a + /// flow-related extension is present but malformed or incomplete. + pub fn parse_flow_extensions(extensions: &HashMap) -> Result> { + let has_flow_context = extensions.contains_key(FLOW_INCREMENTAL_AFTER_SEQS) + || extensions.contains_key(FLOW_INCREMENTAL_MODE) + || extensions.contains_key(FLOW_RETURN_REGION_SEQ) + || extensions.contains_key(FLOW_SINK_TABLE_ID); + + if !has_flow_context { + return Ok(None); + } + let incremental_mode = extensions .get(FLOW_INCREMENTAL_MODE) .map(|value| match value.as_str() { @@ -127,12 +141,12 @@ impl FlowQueryExtensions { } } - Ok(Self { + Ok(Some(Self { incremental_after_seqs, incremental_mode, return_region_seq, sink_table_id, - }) + })) } pub fn validate_for_scan(&self, source_region_id: RegionId) -> Result { @@ -230,14 +244,11 @@ mod flow_extension_tests { use super::*; #[test] - fn test_parse_flow_extensions_default() { + fn test_parse_flow_extensions_returns_none_for_non_flow_query() { let exts = HashMap::new(); - let parsed = FlowQueryExtensions::from_extensions(&exts).unwrap(); + let parsed = FlowQueryExtensions::parse_flow_extensions(&exts).unwrap(); - assert_eq!(parsed.incremental_mode, None); - assert_eq!(parsed.incremental_after_seqs, None); - assert!(!parsed.return_region_seq); - assert_eq!(parsed.sink_table_id, None); + assert_eq!(parsed, None); } #[test] @@ -255,7 +266,9 @@ mod flow_extension_tests { (FLOW_SINK_TABLE_ID.to_string(), "1024".to_string()), ]); - let parsed = FlowQueryExtensions::from_extensions(&exts).unwrap(); + let parsed = FlowQueryExtensions::parse_flow_extensions(&exts) + .unwrap() + .unwrap(); assert_eq!( parsed.incremental_mode, Some(FlowIncrementalMode::MemtableOnly) @@ -275,7 +288,7 @@ mod flow_extension_tests { FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string(), )]); - let err = FlowQueryExtensions::from_extensions(&exts).unwrap_err(); + let err = FlowQueryExtensions::parse_flow_extensions(&exts).unwrap_err(); assert!(format!("{err}").contains(FLOW_INCREMENTAL_AFTER_SEQS)); } @@ -283,7 +296,7 @@ mod flow_extension_tests { fn test_parse_flow_extensions_invalid_mode() { let exts = HashMap::from([(FLOW_INCREMENTAL_MODE.to_string(), "foo".to_string())]); - let err = FlowQueryExtensions::from_extensions(&exts).unwrap_err(); + let err = FlowQueryExtensions::parse_flow_extensions(&exts).unwrap_err(); assert!(format!("{err}").contains(FLOW_INCREMENTAL_MODE)); } @@ -300,7 +313,7 @@ mod flow_extension_tests { ), ]); - let err = FlowQueryExtensions::from_extensions(&exts).unwrap_err(); + let err = FlowQueryExtensions::parse_flow_extensions(&exts).unwrap_err(); assert!(format!("{err}").contains(FLOW_INCREMENTAL_AFTER_SEQS)); } @@ -311,7 +324,9 @@ mod flow_extension_tests { r#"{"1":"10","2":"20"}"#.to_string(), )]); - let parsed = FlowQueryExtensions::from_extensions(&exts).unwrap(); + let parsed = FlowQueryExtensions::parse_flow_extensions(&exts) + .unwrap() + .unwrap(); assert_eq!( parsed.incremental_after_seqs.unwrap(), HashMap::from([(1, 10), (2, 20)]) @@ -325,7 +340,7 @@ mod flow_extension_tests { r#"{"1":true}"#.to_string(), )]); - let err = FlowQueryExtensions::from_extensions(&exts).unwrap_err(); + let err = FlowQueryExtensions::parse_flow_extensions(&exts).unwrap_err(); assert!(format!("{err}").contains(FLOW_INCREMENTAL_AFTER_SEQS)); } @@ -333,7 +348,7 @@ mod flow_extension_tests { fn test_parse_flow_extensions_invalid_sink_table_id() { let exts = HashMap::from([(FLOW_SINK_TABLE_ID.to_string(), "x".to_string())]); - let err = FlowQueryExtensions::from_extensions(&exts).unwrap_err(); + let err = FlowQueryExtensions::parse_flow_extensions(&exts).unwrap_err(); assert!(format!("{err}").contains(FLOW_SINK_TABLE_ID)); } @@ -352,7 +367,9 @@ mod flow_extension_tests { ), ]); - let parsed = FlowQueryExtensions::from_extensions(&exts).unwrap(); + let parsed = FlowQueryExtensions::parse_flow_extensions(&exts) + .unwrap() + .unwrap(); let err = parsed.validate_for_scan(source_region_id).unwrap_err(); assert!(format!("{err}").contains("Missing region")); } @@ -372,7 +389,9 @@ mod flow_extension_tests { (FLOW_SINK_TABLE_ID.to_string(), "1024".to_string()), ]); - let parsed = FlowQueryExtensions::from_extensions(&exts).unwrap(); + let parsed = FlowQueryExtensions::parse_flow_extensions(&exts) + .unwrap() + .unwrap(); let apply_incremental = parsed.validate_for_scan(source_region_id).unwrap(); assert!(!apply_incremental); } @@ -400,4 +419,43 @@ mod flow_extension_tests { }; assert!(parsed.should_collect_region_watermark()); } + + #[test] + fn test_parse_flow_extensions_return_region_seq_only_returns_some() { + let exts = HashMap::from([(FLOW_RETURN_REGION_SEQ.to_string(), "true".to_string())]); + + let parsed = FlowQueryExtensions::parse_flow_extensions(&exts) + .unwrap() + .unwrap(); + + assert!(parsed.return_region_seq); + } + + #[test] + fn test_parse_flow_extensions_sink_table_only_returns_some() { + let exts = HashMap::from([(FLOW_SINK_TABLE_ID.to_string(), "1024".to_string())]); + + let parsed = FlowQueryExtensions::parse_flow_extensions(&exts) + .unwrap() + .unwrap(); + + assert_eq!(parsed.sink_table_id, Some(1024)); + } + + #[test] + fn test_parse_flow_extensions_incremental_after_seqs_only_returns_some() { + let exts = HashMap::from([( + FLOW_INCREMENTAL_AFTER_SEQS.to_string(), + r#"{"1":10}"#.to_string(), + )]); + + let parsed = FlowQueryExtensions::parse_flow_extensions(&exts) + .unwrap() + .unwrap(); + + assert_eq!( + parsed.incremental_after_seqs, + Some(HashMap::from([(1, 10)])) + ); + } }