diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 46bf4632db..ddfe418aec 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -19,7 +19,7 @@ use std::fmt::Debug; use std::ops::Deref; use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, Mutex, RwLock}; +use std::sync::{Arc, RwLock}; use std::task::{Context, Poll}; use std::time::Duration; @@ -71,9 +71,9 @@ use store_api::metric_engine_consts::{ FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, }; use store_api::region_engine::{ - RegionEngine, RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, - RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse, - SettableRegionRoleState, SyncRegionFromRequest, + RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, RemapManifestsRequest, + RemapManifestsResponse, SetRegionRoleStateResponse, SettableRegionRoleState, + SyncRegionFromRequest, }; use store_api::region_request::{ AffectedRows, BatchRegionDdlRequest, RegionCatchupRequest, RegionCloseRequest, @@ -241,16 +241,6 @@ impl RegionServer { ); let engine = status.into_engine(); - if let Some(ctx) = &ctx - && should_return_region_seq(ctx) - && ctx.get_snapshot(region_id.as_u64()).is_none() - { - let sampled_sequence = engine - .get_committed_sequence(region_id) - .await - .with_context(|_| HandleRegionRequestSnafu { region_id })?; - bind_snapshot_bound_region_seq(ctx, region_id, sampled_sequence); - } self.inner .table_provider_factory @@ -313,12 +303,8 @@ impl RegionServer { }; if let Some(seq) = region_latest_seq { - Ok(Box::pin(RegionWatermarkStream::new( - stream, - region_id, - seq, - self.find_engine(region_id)?, - )) as SendableRecordBatchStream) + Ok(Box::pin(RegionWatermarkStream::new(stream, region_id, seq)) + as SendableRecordBatchStream) } else { Ok(stream) } @@ -789,80 +775,24 @@ fn should_return_region_seq(query_ctx: &QueryContext) -> bool { || query_ctx.extension(FLOW_INCREMENTAL_AFTER_SEQS).is_some() } -fn bind_snapshot_bound_region_seq( - query_ctx: &QueryContext, - region_id: RegionId, - sampled_sequence: u64, -) -> u64 { - if let Some(snapshot_seq) = query_ctx.get_snapshot(region_id.as_u64()) { - snapshot_seq - } else { - query_ctx.set_snapshot(region_id.as_u64(), sampled_sequence); - sampled_sequence - } -} - -/// Returns whether `snapshot_sequence` is a valid correctness watermark for this region, -/// i.e. whether this query can safely treat it as the read upper bound. -/// -/// For now this proof only covers memtable-side sequence information; SST-side proof -/// can be added later if finer sequence metadata becomes available. -fn can_prove_region_watermark_for_engine( - engine: &dyn RegionEngine, - region_id: RegionId, - snapshot_sequence: u64, -) -> bool { - if let Some(mito_engine) = engine.as_any().downcast_ref::() - && let Some(region) = mito_engine.find_region(region_id) - { - return snapshot_sequence >= region.flushed_sequence(); - } - true -} - +/// Wraps a region read stream so terminal metrics can carry the scan-open watermark. struct RegionWatermarkStream { stream: SendableRecordBatchStream, region_id: u64, snapshot_seq: u64, - proof_engine: Option, finished: AtomicBool, - proof_state: Mutex>, } impl RegionWatermarkStream { - fn new( - stream: SendableRecordBatchStream, - region_id: RegionId, - latest_sequence: u64, - proof_engine: Option, - ) -> Self { + fn new(stream: SendableRecordBatchStream, region_id: RegionId, latest_sequence: u64) -> Self { Self { stream, region_id: region_id.as_u64(), snapshot_seq: latest_sequence, - proof_engine, finished: AtomicBool::new(false), - proof_state: Mutex::new(None), } } - fn compute_proof(&self) -> bool { - self.proof_engine - .as_ref() - .map(|engine| { - can_prove_region_watermark_for_engine( - engine.as_ref(), - RegionId::from_u64(self.region_id), - self.snapshot_seq, - ) - }) - .unwrap_or(false) - } - - fn proof_passed(&self) -> bool { - self.proof_state.lock().unwrap().unwrap_or(false) - } - fn merged_metrics(&self, mut metrics: RecordBatchMetrics) -> RecordBatchMetrics { let entry = if let Some(entry) = metrics .region_watermarks @@ -880,12 +810,6 @@ impl RegionWatermarkStream { metrics.region_watermarks.last_mut().unwrap() }; - // TODO(discord9): Move correctness-watermark proof into the mito scan - // path. The current stream-end proof is conservatively safe, but still - // allows false negatives if a late flush happens after the scan snapshot. - if !self.proof_passed() { - return metrics; - } entry.watermark = Some(self.snapshot_seq); metrics } @@ -920,7 +844,6 @@ impl Stream for RegionWatermarkStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match Pin::new(&mut self.stream).poll_next(cx) { Poll::Ready(None) => { - *self.proof_state.lock().unwrap() = Some(self.compute_proof()); self.finished.store(true, Ordering::Relaxed); Poll::Ready(None) } @@ -943,11 +866,24 @@ mod watermark_tests { use mito2::config::MitoConfig; use mito2::test_util::{self, CreateRequestBuilder, TestEnv}; use session::context::QueryContextBuilder; + use store_api::region_engine::RegionEngine; use store_api::region_request::RegionRequest; use super::*; - use crate::tests::MockRegionEngine; + #[cfg(test)] + fn bind_snapshot_bound_region_seq( + query_ctx: &QueryContext, + region_id: RegionId, + sampled_sequence: u64, + ) -> u64 { + if let Some(snapshot_seq) = query_ctx.get_snapshot(region_id.as_u64()) { + snapshot_seq + } else { + query_ctx.set_snapshot(region_id.as_u64(), sampled_sequence); + sampled_sequence + } + } #[test] fn test_should_return_region_seq() { let ctx = QueryContextBuilder::default() @@ -1019,49 +955,6 @@ mod watermark_tests { assert_eq!(ctx.get_snapshot(region_id.as_u64()), Some(99)); } - #[test] - fn test_can_prove_region_watermark_for_non_mito_engine_defaults_true() { - let engine = MockRegionEngine::new(MITO_ENGINE_NAME).0; - assert!(can_prove_region_watermark_for_engine( - engine.as_ref(), - RegionId::new(1, 1), - 42, - )); - } - - #[tokio::test] - async fn test_can_prove_region_watermark_for_mito_engine_when_snapshot_ge_flushed() { - let mut env = TestEnv::with_prefix( - "test_can_prove_region_watermark_for_mito_engine_when_snapshot_ge_flushed", - ) - .await; - let engine = env.create_engine(MitoConfig::default()).await; - - let region_id = RegionId::new(1, 1); - let request = CreateRequestBuilder::new().build(); - let column_schemas = test_util::rows_schema(&request); - engine - .handle_request(region_id, RegionRequest::Create(request)) - .await - .unwrap(); - - let rows = Rows { - schema: column_schemas, - rows: test_util::build_rows(0, 3), - }; - test_util::put_rows(&engine, region_id, rows).await; - test_util::flush_region(&engine, region_id, None).await; - - let region = engine.find_region(region_id).unwrap(); - let snapshot_seq = region.flushed_sequence(); - - assert!(can_prove_region_watermark_for_engine( - &engine, - region_id, - snapshot_seq, - )); - } - #[tokio::test] async fn test_region_watermark_stream_only_sets_terminal_metrics() { let schema = Arc::new(Schema::new(vec![ColumnSchema::new( @@ -1076,8 +969,7 @@ mod watermark_tests { .as_stream(); let region_id = RegionId::new(42, 7); - let proof_engine = Some(MockRegionEngine::new(MITO_ENGINE_NAME).0 as RegionEngineRef); - let wrapped = RegionWatermarkStream::new(stream, region_id, 99, proof_engine); + let wrapped = RegionWatermarkStream::new(stream, region_id, 99); let mut pinned = Box::pin(wrapped); assert!(pinned.as_ref().get_ref().metrics().is_none()); @@ -1094,7 +986,7 @@ mod watermark_tests { } #[tokio::test] - async fn test_region_watermark_stream_without_proof_engine_omits_watermark() { + async fn test_region_watermark_stream_sets_watermark_from_snapshot() { let schema = Arc::new(Schema::new(vec![ColumnSchema::new( "v", ConcreteDataType::int32_datatype(), @@ -1107,7 +999,7 @@ mod watermark_tests { .as_stream(); let region_id = RegionId::new(42, 7); - let wrapped = RegionWatermarkStream::new(stream, region_id, 99, None); + let wrapped = RegionWatermarkStream::new(stream, region_id, 99); let mut pinned = Box::pin(wrapped); while pinned.next().await.is_some() {} @@ -1117,15 +1009,15 @@ mod watermark_tests { metrics.region_watermarks, vec![common_recordbatch::adapter::RegionWatermarkEntry { region_id: region_id.as_u64(), - watermark: None, + watermark: Some(99), }] ); } #[tokio::test] - async fn test_region_watermark_stream_omits_watermark_after_late_flush() { + async fn test_region_watermark_stream_keeps_watermark_after_late_flush() { let mut env = - TestEnv::with_prefix("test_region_watermark_stream_omits_watermark_after_late_flush") + TestEnv::with_prefix("test_region_watermark_stream_keeps_watermark_after_late_flush") .await; let engine = env.create_engine(MitoConfig::default()).await; @@ -1157,12 +1049,7 @@ mod watermark_tests { .unwrap() .as_stream(); - let wrapped = RegionWatermarkStream::new( - stream, - region_id, - snapshot_seq, - Some(Arc::new(engine.clone()) as RegionEngineRef), - ); + let wrapped = RegionWatermarkStream::new(stream, region_id, snapshot_seq); let mut pinned = Box::pin(wrapped); assert!(pinned.next().await.is_some()); @@ -1181,7 +1068,7 @@ mod watermark_tests { metrics.region_watermarks, vec![common_recordbatch::adapter::RegionWatermarkEntry { region_id: region_id.as_u64(), - watermark: None, + watermark: Some(snapshot_seq), }] ); } diff --git a/src/file-engine/src/engine.rs b/src/file-engine/src/engine.rs index 693ae325df..175ebef237 100644 --- a/src/file-engine/src/engine.rs +++ b/src/file-engine/src/engine.rs @@ -94,7 +94,7 @@ impl RegionEngine for FileRegionEngine { let stream = self.handle_query(region_id, request).await?; let metadata = self.get_metadata(region_id).await?; // We don't support enabling append mode for file engine. - let scanner = Box::new(SinglePartitionScanner::new(stream, false, metadata)); + let scanner = Box::new(SinglePartitionScanner::new(stream, false, metadata, None)); Ok(scanner) } diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 6f0a2c8c6f..c7db3ea8f9 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -1012,11 +1012,16 @@ impl EngineInner { /// Handles the scan `request` and returns a [ScanRegion]. #[tracing::instrument(skip_all, fields(region_id = %region_id))] - fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result { + fn scan_region(&self, region_id: RegionId, mut request: ScanRequest) -> Result { let query_start = Instant::now(); // Reading a region doesn't need to go through the region worker thread. let region = self.find_region(region_id)?; - let version = region.version(); + let version_data = region.version_control.current(); + let version = version_data.version; + + if request.snapshot_on_scan && request.memtable_max_sequence.is_none() { + request.memtable_max_sequence = Some(version_data.committed_sequence); + } if let Some(given_seq) = request.memtable_min_sequence { let min_readable_seq = version.flushed_sequence; diff --git a/src/mito2/src/engine/scan_test.rs b/src/mito2/src/engine/scan_test.rs index f488d1d4c4..02fb69cbc4 100644 --- a/src/mito2/src/engine/scan_test.rs +++ b/src/mito2/src/engine/scan_test.rs @@ -169,6 +169,166 @@ async fn test_full_snapshot_upper_bound_does_not_constrain_sst_rows() { assert!(pretty.contains("1970-01-01T00:00:04")); } +#[tokio::test] +async fn test_snapshot_bound_query_binds_memtable_upper_bound_at_scan_open() { + let mut env = + TestEnv::with_prefix("test_snapshot_bound_query_binds_memtable_upper_bound_at_scan_open") + .await; + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + let column_schemas = test_util::rows_schema(&request); + + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let first_rows = Rows { + schema: column_schemas.clone(), + rows: test_util::build_rows(0, 3), + }; + test_util::put_rows(&engine, region_id, first_rows).await; + + let expected_snapshot = engine.get_committed_sequence(region_id).await.unwrap(); + let scanner = engine + .scanner( + region_id, + ScanRequest { + snapshot_on_scan: true, + ..Default::default() + }, + ) + .await + .unwrap(); + assert_eq!(scanner.snapshot_sequence(), Some(expected_snapshot)); + + let second_rows = Rows { + schema: column_schemas, + rows: test_util::build_rows(3, 5), + }; + test_util::put_rows(&engine, region_id, second_rows).await; + + let stream = scanner.scan().await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!( + batches.pretty_print().unwrap(), + "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 2 | 2.0 | 1970-01-01T00:00:02 | ++-------+---------+---------------------+" + ); +} + +#[tokio::test] +async fn test_snapshot_bound_query_keeps_open_snapshot_after_late_flush() { + let mut env = + TestEnv::with_prefix("test_snapshot_bound_query_keeps_open_snapshot_after_late_flush") + .await; + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + let column_schemas = test_util::rows_schema(&request); + + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let first_rows = Rows { + schema: column_schemas.clone(), + rows: test_util::build_rows(0, 3), + }; + test_util::put_rows(&engine, region_id, first_rows).await; + + let expected_snapshot = engine.get_committed_sequence(region_id).await.unwrap(); + let scanner = engine + .scanner( + region_id, + ScanRequest { + snapshot_on_scan: true, + ..Default::default() + }, + ) + .await + .unwrap(); + assert_eq!(scanner.snapshot_sequence(), Some(expected_snapshot)); + + let second_rows = Rows { + schema: column_schemas, + rows: test_util::build_rows(3, 5), + }; + test_util::put_rows(&engine, region_id, second_rows).await; + test_util::flush_region(&engine, region_id, None).await; + + assert_eq!(scanner.snapshot_sequence(), Some(expected_snapshot)); +} + +#[tokio::test] +async fn test_snapshot_bound_query_keeps_correct_result_after_late_flush() { + let mut env = + TestEnv::with_prefix("test_snapshot_bound_query_keeps_correct_result_after_late_flush") + .await; + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + let column_schemas = test_util::rows_schema(&request); + + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let first_rows = Rows { + schema: column_schemas.clone(), + rows: test_util::build_rows(0, 3), + }; + test_util::put_rows(&engine, region_id, first_rows).await; + + let expected_snapshot = engine.get_committed_sequence(region_id).await.unwrap(); + let scanner = engine + .scanner( + region_id, + ScanRequest { + snapshot_on_scan: true, + ..Default::default() + }, + ) + .await + .unwrap(); + assert_eq!(scanner.snapshot_sequence(), Some(expected_snapshot)); + + let second_rows = Rows { + schema: column_schemas, + rows: test_util::build_rows(3, 5), + }; + test_util::put_rows(&engine, region_id, second_rows).await; + test_util::flush_region(&engine, region_id, None).await; + + assert_eq!(scanner.snapshot_sequence(), Some(expected_snapshot)); + + let stream = scanner.scan().await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!( + batches.pretty_print().unwrap(), + "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 2 | 2.0 | 1970-01-01T00:00:02 | ++-------+---------+---------------------+" + ); +} + async fn test_scan_with_min_sst_sequence_with_format(flat_format: bool) { let mut env = TestEnv::with_prefix("test_scan_with_min_sst_sequence").await; let engine = env diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 5d934afd2d..d63cfc65f7 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -38,7 +38,8 @@ use snafu::{OptionExt as _, ResultExt}; use store_api::metadata::{RegionMetadata, RegionMetadataRef}; use store_api::region_engine::{PartitionRange, RegionScannerRef}; use store_api::storage::{ - ColumnId, RegionId, ScanRequest, SequenceRange, TimeSeriesDistribution, TimeSeriesRowSelector, + ColumnId, RegionId, ScanRequest, SequenceNumber, SequenceRange, TimeSeriesDistribution, + TimeSeriesRowSelector, }; use table::predicate::{Predicate, build_time_range_predicate}; use tokio::sync::{Semaphore, mpsc}; @@ -149,6 +150,14 @@ impl Scanner { } } + pub(crate) fn snapshot_sequence(&self) -> Option { + match self { + Scanner::Seq(seq_scan) => seq_scan.input().snapshot_sequence, + Scanner::Unordered(unordered_scan) => unordered_scan.input().snapshot_sequence, + Scanner::Series(series_scan) => series_scan.input().snapshot_sequence, + } + } + /// Sets the target partitions for the scanner. It can controls the parallelism of the scanner. pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) { use store_api::region_engine::{PrepareRequest, RegionScanner}; @@ -551,6 +560,12 @@ impl ScanRegion { .with_merge_mode(self.version.options.merge_mode()) .with_series_row_selector(self.request.series_row_selector) .with_distribution(self.request.distribution) + .with_snapshot_sequence( + self.request + .snapshot_on_scan + .then_some(self.request.memtable_max_sequence) + .flatten(), + ) .with_flat_format(flat_format); #[cfg(feature = "vector_index")] let input = input @@ -856,6 +871,8 @@ pub struct ScanInput { pub(crate) distribution: Option, /// Whether to use flat format. pub(crate) flat_format: bool, + /// Snapshot upper bound bound at scan open and propagated back to the caller. + pub(crate) snapshot_sequence: Option, /// Whether this scan is for compaction. pub(crate) compaction: bool, #[cfg(feature = "enterprise")] @@ -893,6 +910,7 @@ impl ScanInput { series_row_selector: None, distribution: None, flat_format: false, + snapshot_sequence: None, compaction: false, #[cfg(feature = "enterprise")] extension_ranges: Vec::new(), @@ -1065,6 +1083,15 @@ impl ScanInput { self } + #[must_use] + pub(crate) fn with_snapshot_sequence( + mut self, + snapshot_sequence: Option, + ) -> Self { + self.snapshot_sequence = snapshot_sequence; + self + } + /// Sets whether this scan is for compaction. #[must_use] pub(crate) fn with_compaction(mut self, compaction: bool) -> Self { diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index d2be17cc83..d58c47d22a 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -672,6 +672,10 @@ impl RegionScanner for SeqScan { fn set_logical_region(&mut self, logical_region: bool) { self.properties.set_logical_region(logical_region); } + + fn snapshot_sequence(&self) -> Option { + self.stream_ctx.input.snapshot_sequence + } } impl DisplayAs for SeqScan { diff --git a/src/mito2/src/read/series_scan.rs b/src/mito2/src/read/series_scan.rs index 2d6994d0af..64b7e2dd4f 100644 --- a/src/mito2/src/read/series_scan.rs +++ b/src/mito2/src/read/series_scan.rs @@ -374,6 +374,10 @@ impl RegionScanner for SeriesScan { fn set_logical_region(&mut self, logical_region: bool) { self.properties.set_logical_region(logical_region); } + + fn snapshot_sequence(&self) -> Option { + self.stream_ctx.input.snapshot_sequence + } } impl DisplayAs for SeriesScan { diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index 2d557e8871..c233b5d85c 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -505,6 +505,10 @@ impl RegionScanner for UnorderedScan { fn set_logical_region(&mut self, logical_region: bool) { self.properties.set_logical_region(logical_region); } + + fn snapshot_sequence(&self) -> Option { + self.stream_ctx.input.snapshot_sequence + } } impl DisplayAs for UnorderedScan { diff --git a/src/query/src/dummy_catalog.rs b/src/query/src/dummy_catalog.rs index 75f612a37e..341b75ead9 100644 --- a/src/query/src/dummy_catalog.rs +++ b/src/query/src/dummy_catalog.rs @@ -188,6 +188,12 @@ impl TableProvider for DummyTableProvider { .handle_query(self.region_id, request.clone()) .await .map_err(|e| DataFusionError::External(Box::new(e)))?; + if request.snapshot_on_scan + && let Some(query_ctx) = &self.query_ctx + && let Some(snapshot_sequence) = scanner.snapshot_sequence() + { + bind_snapshot_bound_region_seq(query_ctx, self.region_id, snapshot_sequence); + } let query_memory_permit = self.engine.register_query_memory_permit(); let mut scan_exec = RegionScanExec::new(scanner, request, query_memory_permit)?; if let Some(query_ctx) = &self.query_ctx { @@ -317,8 +323,8 @@ fn scan_request_from_query_context( query_ctx: &QueryContext, ) -> Result { let mut scan_request = ScanRequest { - memtable_max_sequence: query_ctx.get_snapshot(region_id.as_u64()), sst_min_sequence: query_ctx.sst_min_sequence(region_id.as_u64()), + snapshot_on_scan: query_requires_snapshot_bound(query_ctx), ..Default::default() }; @@ -338,6 +344,33 @@ fn scan_request_from_query_context( Ok(scan_request) } +fn query_requires_snapshot_bound(query_ctx: &QueryContext) -> bool { + FlowQueryExtensions::from_extensions(&query_ctx.extensions()) + .map(|extensions| extensions.should_collect_region_watermark()) + .unwrap_or(false) +} + +fn bind_snapshot_bound_region_seq( + query_ctx: &QueryContext, + region_id: RegionId, + snapshot_sequence: u64, +) -> u64 { + if let Some(existing) = query_ctx.get_snapshot(region_id.as_u64()) { + if existing != snapshot_sequence { + common_telemetry::warn!( + "conflicting snapshot sequence observed for region {} in a single query; keeping the first bound snapshot (existing={}, new={})", + region_id, + existing, + snapshot_sequence, + ); + } + existing + } else { + query_ctx.set_snapshot(region_id.as_u64(), snapshot_sequence); + snapshot_sequence + } +} + #[async_trait] impl TableProviderFactory for DummyTableProviderFactory { async fn create( @@ -471,7 +504,7 @@ impl CatalogManager for DummyCatalogManager { #[cfg(test)] mod tests { use std::collections::HashMap; - use std::sync::RwLock; + use std::sync::{Arc, RwLock}; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; @@ -485,6 +518,48 @@ mod tests { RegionId::new(1024, 1) } + #[test] + fn test_scan_request_from_query_context_uses_snapshot_bound_intent() { + let region_id = test_region_id(); + let query_ctx = QueryContextBuilder::default() + .extensions(HashMap::from([( + "flow.return_region_seq".to_string(), + "true".to_string(), + )])) + .snapshot_seqs(Arc::new(RwLock::new(HashMap::from([( + region_id.as_u64(), + 42_u64, + )])))) + .sst_min_sequences(Arc::new(RwLock::new(HashMap::from([( + region_id.as_u64(), + 7_u64, + )])))) + .build(); + + let request = scan_request_from_query_context(region_id, &query_ctx).unwrap(); + + assert!(request.snapshot_on_scan); + assert_eq!(request.memtable_max_sequence, None); + assert_eq!(request.sst_min_sequence, Some(7)); + } + + #[test] + fn test_scan_request_from_incremental_context_uses_snapshot_bound_intent() { + let region_id = test_region_id(); + let query_ctx = QueryContextBuilder::default() + .extensions(HashMap::from([( + "flow.incremental_after_seqs".to_string(), + format!(r#"{{"{}":10}}"#, region_id.as_u64()), + )])) + .build(); + + let request = scan_request_from_query_context(region_id, &query_ctx).unwrap(); + + assert!(request.snapshot_on_scan); + assert_eq!(request.memtable_min_sequence, Some(10)); + assert_eq!(request.memtable_max_sequence, None); + } + #[test] fn test_scan_request_from_query_context_keeps_snapshot_fields() { let region_id = test_region_id(); @@ -500,9 +575,37 @@ mod tests { .build(); let request = scan_request_from_query_context(region_id, &query_ctx).unwrap(); - assert_eq!(request.memtable_max_sequence, Some(100)); + assert_eq!(request.memtable_max_sequence, None); assert_eq!(request.sst_min_sequence, Some(90)); assert_eq!(request.memtable_min_sequence, None); + assert!(!request.snapshot_on_scan); + } + + #[test] + fn test_bind_snapshot_bound_region_seq_reuses_existing_snapshot() { + let region_id = test_region_id(); + let query_ctx = QueryContextBuilder::default() + .snapshot_seqs(Arc::new(RwLock::new(HashMap::from([( + region_id.as_u64(), + 42_u64, + )])))) + .build(); + + let seq = bind_snapshot_bound_region_seq(&query_ctx, region_id, 99); + + assert_eq!(seq, 42); + assert_eq!(query_ctx.get_snapshot(region_id.as_u64()), Some(42)); + } + + #[test] + fn test_bind_snapshot_bound_region_seq_sets_snapshot_once() { + let region_id = test_region_id(); + let query_ctx = QueryContextBuilder::default().build(); + + let seq = bind_snapshot_bound_region_seq(&query_ctx, region_id, 99); + + assert_eq!(seq, 99); + assert_eq!(query_ctx.get_snapshot(region_id.as_u64()), Some(99)); } #[test] diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index b3f460d01d..84aba836ba 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -462,6 +462,10 @@ pub trait RegionScanner: Debug + DisplayAs + Send { /// Sets whether the scanner is reading a logical region. fn set_logical_region(&mut self, logical_region: bool); + + fn snapshot_sequence(&self) -> Option { + None + } } pub type RegionScannerRef = Box; @@ -945,6 +949,7 @@ pub struct SinglePartitionScanner { schema: SchemaRef, properties: ScannerProperties, metadata: RegionMetadataRef, + snapshot_sequence: Option, } impl SinglePartitionScanner { @@ -953,6 +958,7 @@ impl SinglePartitionScanner { stream: SendableRecordBatchStream, append_mode: bool, metadata: RegionMetadataRef, + snapshot_sequence: Option, ) -> Self { let schema = stream.schema(); Self { @@ -960,6 +966,7 @@ impl SinglePartitionScanner { schema, properties: ScannerProperties::default().with_append_mode(append_mode), metadata, + snapshot_sequence, } } } @@ -1019,6 +1026,10 @@ impl RegionScanner for SinglePartitionScanner { fn set_logical_region(&mut self, logical_region: bool) { self.properties.set_logical_region(logical_region); } + + fn snapshot_sequence(&self) -> Option { + self.snapshot_sequence + } } impl DisplayAs for SinglePartitionScanner { diff --git a/src/store-api/src/storage/requests.rs b/src/store-api/src/storage/requests.rs index 6725de92e3..d072ec1b39 100644 --- a/src/store-api/src/storage/requests.rs +++ b/src/store-api/src/storage/requests.rs @@ -112,6 +112,8 @@ pub struct ScanRequest { /// Optional constraint on the sequence number of the rows to read. /// If set, only rows with a sequence number **lesser or equal** to this value /// will be returned. + /// This is the effective memtable upper bound used by the scan, whether provided + /// explicitly or bound on scan open. pub memtable_max_sequence: Option, /// Optional constraint on the minimal sequence number in the memtable. /// If set, only the memtables that contain sequences **greater than** this value will be scanned @@ -119,6 +121,8 @@ pub struct ScanRequest { /// Optional constraint on the minimal sequence number in the SST files. /// If set, only the SST files that contain sequences greater than this value will be scanned. pub sst_min_sequence: Option, + /// Whether to bind the effective snapshot upper bound when opening the scan. + pub snapshot_on_scan: bool, /// Optional hint for the distribution of time-series data. pub distribution: Option, /// Optional hint for KNN vector search. When set, the scan should use @@ -195,6 +199,14 @@ impl Display for ScanRequest { sst_min_sequence )?; } + if self.snapshot_on_scan { + write!( + f, + "{}snapshot_on_scan: {}", + delimiter.as_str(), + self.snapshot_on_scan + )?; + } if let Some(distribution) = &self.distribution { write!(f, "{}distribution: {}", delimiter.as_str(), distribution)?; } @@ -278,5 +290,14 @@ mod tests { request.to_string(), "ScanRequest { force_flat_format: true }" ); + + let request = ScanRequest { + snapshot_on_scan: true, + ..Default::default() + }; + assert_eq!( + request.to_string(), + "ScanRequest { snapshot_on_scan: true }" + ); } } diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index e2d8f794da..b1ca82f30d 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -599,7 +599,12 @@ mod test { .primary_key(vec![1]); let region_metadata = Arc::new(builder.build().unwrap()); - let scanner = Box::new(SinglePartitionScanner::new(stream, false, region_metadata)); + let scanner = Box::new(SinglePartitionScanner::new( + stream, + false, + region_metadata, + None, + )); let plan = RegionScanExec::new(scanner, ScanRequest::default(), None).unwrap(); let actual: SchemaRef = Arc::new( plan.properties