feat: better seq snapshot loc

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-03-20 16:18:33 +08:00
parent 9fdc6b2d3c
commit 576c802394
12 changed files with 382 additions and 151 deletions

View File

@@ -19,7 +19,7 @@ use std::fmt::Debug;
use std::ops::Deref;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::sync::{Arc, RwLock};
use std::task::{Context, Poll};
use std::time::Duration;
@@ -71,9 +71,9 @@ use store_api::metric_engine_consts::{
FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME,
};
use store_api::region_engine::{
RegionEngine, RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic,
RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse,
SettableRegionRoleState, SyncRegionFromRequest,
RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, RemapManifestsRequest,
RemapManifestsResponse, SetRegionRoleStateResponse, SettableRegionRoleState,
SyncRegionFromRequest,
};
use store_api::region_request::{
AffectedRows, BatchRegionDdlRequest, RegionCatchupRequest, RegionCloseRequest,
@@ -241,16 +241,6 @@ impl RegionServer {
);
let engine = status.into_engine();
if let Some(ctx) = &ctx
&& should_return_region_seq(ctx)
&& ctx.get_snapshot(region_id.as_u64()).is_none()
{
let sampled_sequence = engine
.get_committed_sequence(region_id)
.await
.with_context(|_| HandleRegionRequestSnafu { region_id })?;
bind_snapshot_bound_region_seq(ctx, region_id, sampled_sequence);
}
self.inner
.table_provider_factory
@@ -313,12 +303,8 @@ impl RegionServer {
};
if let Some(seq) = region_latest_seq {
Ok(Box::pin(RegionWatermarkStream::new(
stream,
region_id,
seq,
self.find_engine(region_id)?,
)) as SendableRecordBatchStream)
Ok(Box::pin(RegionWatermarkStream::new(stream, region_id, seq))
as SendableRecordBatchStream)
} else {
Ok(stream)
}
@@ -789,80 +775,24 @@ fn should_return_region_seq(query_ctx: &QueryContext) -> bool {
|| query_ctx.extension(FLOW_INCREMENTAL_AFTER_SEQS).is_some()
}
fn bind_snapshot_bound_region_seq(
query_ctx: &QueryContext,
region_id: RegionId,
sampled_sequence: u64,
) -> u64 {
if let Some(snapshot_seq) = query_ctx.get_snapshot(region_id.as_u64()) {
snapshot_seq
} else {
query_ctx.set_snapshot(region_id.as_u64(), sampled_sequence);
sampled_sequence
}
}
/// Returns whether `snapshot_sequence` is a valid correctness watermark for this region,
/// i.e. whether this query can safely treat it as the read upper bound.
///
/// For now this proof only covers memtable-side sequence information; SST-side proof
/// can be added later if finer sequence metadata becomes available.
fn can_prove_region_watermark_for_engine(
engine: &dyn RegionEngine,
region_id: RegionId,
snapshot_sequence: u64,
) -> bool {
if let Some(mito_engine) = engine.as_any().downcast_ref::<MitoEngine>()
&& let Some(region) = mito_engine.find_region(region_id)
{
return snapshot_sequence >= region.flushed_sequence();
}
true
}
/// Wraps a region read stream so terminal metrics can carry the scan-open watermark.
struct RegionWatermarkStream {
stream: SendableRecordBatchStream,
region_id: u64,
snapshot_seq: u64,
proof_engine: Option<RegionEngineRef>,
finished: AtomicBool,
proof_state: Mutex<Option<bool>>,
}
impl RegionWatermarkStream {
fn new(
stream: SendableRecordBatchStream,
region_id: RegionId,
latest_sequence: u64,
proof_engine: Option<RegionEngineRef>,
) -> Self {
fn new(stream: SendableRecordBatchStream, region_id: RegionId, latest_sequence: u64) -> Self {
Self {
stream,
region_id: region_id.as_u64(),
snapshot_seq: latest_sequence,
proof_engine,
finished: AtomicBool::new(false),
proof_state: Mutex::new(None),
}
}
fn compute_proof(&self) -> bool {
self.proof_engine
.as_ref()
.map(|engine| {
can_prove_region_watermark_for_engine(
engine.as_ref(),
RegionId::from_u64(self.region_id),
self.snapshot_seq,
)
})
.unwrap_or(false)
}
fn proof_passed(&self) -> bool {
self.proof_state.lock().unwrap().unwrap_or(false)
}
fn merged_metrics(&self, mut metrics: RecordBatchMetrics) -> RecordBatchMetrics {
let entry = if let Some(entry) = metrics
.region_watermarks
@@ -880,12 +810,6 @@ impl RegionWatermarkStream {
metrics.region_watermarks.last_mut().unwrap()
};
// TODO(discord9): Move correctness-watermark proof into the mito scan
// path. The current stream-end proof is conservatively safe, but still
// allows false negatives if a late flush happens after the scan snapshot.
if !self.proof_passed() {
return metrics;
}
entry.watermark = Some(self.snapshot_seq);
metrics
}
@@ -920,7 +844,6 @@ impl Stream for RegionWatermarkStream {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.stream).poll_next(cx) {
Poll::Ready(None) => {
*self.proof_state.lock().unwrap() = Some(self.compute_proof());
self.finished.store(true, Ordering::Relaxed);
Poll::Ready(None)
}
@@ -943,11 +866,24 @@ mod watermark_tests {
use mito2::config::MitoConfig;
use mito2::test_util::{self, CreateRequestBuilder, TestEnv};
use session::context::QueryContextBuilder;
use store_api::region_engine::RegionEngine;
use store_api::region_request::RegionRequest;
use super::*;
use crate::tests::MockRegionEngine;
#[cfg(test)]
fn bind_snapshot_bound_region_seq(
query_ctx: &QueryContext,
region_id: RegionId,
sampled_sequence: u64,
) -> u64 {
if let Some(snapshot_seq) = query_ctx.get_snapshot(region_id.as_u64()) {
snapshot_seq
} else {
query_ctx.set_snapshot(region_id.as_u64(), sampled_sequence);
sampled_sequence
}
}
#[test]
fn test_should_return_region_seq() {
let ctx = QueryContextBuilder::default()
@@ -1019,49 +955,6 @@ mod watermark_tests {
assert_eq!(ctx.get_snapshot(region_id.as_u64()), Some(99));
}
#[test]
fn test_can_prove_region_watermark_for_non_mito_engine_defaults_true() {
let engine = MockRegionEngine::new(MITO_ENGINE_NAME).0;
assert!(can_prove_region_watermark_for_engine(
engine.as_ref(),
RegionId::new(1, 1),
42,
));
}
#[tokio::test]
async fn test_can_prove_region_watermark_for_mito_engine_when_snapshot_ge_flushed() {
let mut env = TestEnv::with_prefix(
"test_can_prove_region_watermark_for_mito_engine_when_snapshot_ge_flushed",
)
.await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let column_schemas = test_util::rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let rows = Rows {
schema: column_schemas,
rows: test_util::build_rows(0, 3),
};
test_util::put_rows(&engine, region_id, rows).await;
test_util::flush_region(&engine, region_id, None).await;
let region = engine.find_region(region_id).unwrap();
let snapshot_seq = region.flushed_sequence();
assert!(can_prove_region_watermark_for_engine(
&engine,
region_id,
snapshot_seq,
));
}
#[tokio::test]
async fn test_region_watermark_stream_only_sets_terminal_metrics() {
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
@@ -1076,8 +969,7 @@ mod watermark_tests {
.as_stream();
let region_id = RegionId::new(42, 7);
let proof_engine = Some(MockRegionEngine::new(MITO_ENGINE_NAME).0 as RegionEngineRef);
let wrapped = RegionWatermarkStream::new(stream, region_id, 99, proof_engine);
let wrapped = RegionWatermarkStream::new(stream, region_id, 99);
let mut pinned = Box::pin(wrapped);
assert!(pinned.as_ref().get_ref().metrics().is_none());
@@ -1094,7 +986,7 @@ mod watermark_tests {
}
#[tokio::test]
async fn test_region_watermark_stream_without_proof_engine_omits_watermark() {
async fn test_region_watermark_stream_sets_watermark_from_snapshot() {
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
"v",
ConcreteDataType::int32_datatype(),
@@ -1107,7 +999,7 @@ mod watermark_tests {
.as_stream();
let region_id = RegionId::new(42, 7);
let wrapped = RegionWatermarkStream::new(stream, region_id, 99, None);
let wrapped = RegionWatermarkStream::new(stream, region_id, 99);
let mut pinned = Box::pin(wrapped);
while pinned.next().await.is_some() {}
@@ -1117,15 +1009,15 @@ mod watermark_tests {
metrics.region_watermarks,
vec![common_recordbatch::adapter::RegionWatermarkEntry {
region_id: region_id.as_u64(),
watermark: None,
watermark: Some(99),
}]
);
}
#[tokio::test]
async fn test_region_watermark_stream_omits_watermark_after_late_flush() {
async fn test_region_watermark_stream_keeps_watermark_after_late_flush() {
let mut env =
TestEnv::with_prefix("test_region_watermark_stream_omits_watermark_after_late_flush")
TestEnv::with_prefix("test_region_watermark_stream_keeps_watermark_after_late_flush")
.await;
let engine = env.create_engine(MitoConfig::default()).await;
@@ -1157,12 +1049,7 @@ mod watermark_tests {
.unwrap()
.as_stream();
let wrapped = RegionWatermarkStream::new(
stream,
region_id,
snapshot_seq,
Some(Arc::new(engine.clone()) as RegionEngineRef),
);
let wrapped = RegionWatermarkStream::new(stream, region_id, snapshot_seq);
let mut pinned = Box::pin(wrapped);
assert!(pinned.next().await.is_some());
@@ -1181,7 +1068,7 @@ mod watermark_tests {
metrics.region_watermarks,
vec![common_recordbatch::adapter::RegionWatermarkEntry {
region_id: region_id.as_u64(),
watermark: None,
watermark: Some(snapshot_seq),
}]
);
}

View File

@@ -94,7 +94,7 @@ impl RegionEngine for FileRegionEngine {
let stream = self.handle_query(region_id, request).await?;
let metadata = self.get_metadata(region_id).await?;
// We don't support enabling append mode for file engine.
let scanner = Box::new(SinglePartitionScanner::new(stream, false, metadata));
let scanner = Box::new(SinglePartitionScanner::new(stream, false, metadata, None));
Ok(scanner)
}

View File

@@ -1012,11 +1012,16 @@ impl EngineInner {
/// Handles the scan `request` and returns a [ScanRegion].
#[tracing::instrument(skip_all, fields(region_id = %region_id))]
fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
fn scan_region(&self, region_id: RegionId, mut request: ScanRequest) -> Result<ScanRegion> {
let query_start = Instant::now();
// Reading a region doesn't need to go through the region worker thread.
let region = self.find_region(region_id)?;
let version = region.version();
let version_data = region.version_control.current();
let version = version_data.version;
if request.snapshot_on_scan && request.memtable_max_sequence.is_none() {
request.memtable_max_sequence = Some(version_data.committed_sequence);
}
if let Some(given_seq) = request.memtable_min_sequence {
let min_readable_seq = version.flushed_sequence;

View File

@@ -169,6 +169,166 @@ async fn test_full_snapshot_upper_bound_does_not_constrain_sst_rows() {
assert!(pretty.contains("1970-01-01T00:00:04"));
}
#[tokio::test]
async fn test_snapshot_bound_query_binds_memtable_upper_bound_at_scan_open() {
let mut env =
TestEnv::with_prefix("test_snapshot_bound_query_binds_memtable_upper_bound_at_scan_open")
.await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let column_schemas = test_util::rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let first_rows = Rows {
schema: column_schemas.clone(),
rows: test_util::build_rows(0, 3),
};
test_util::put_rows(&engine, region_id, first_rows).await;
let expected_snapshot = engine.get_committed_sequence(region_id).await.unwrap();
let scanner = engine
.scanner(
region_id,
ScanRequest {
snapshot_on_scan: true,
..Default::default()
},
)
.await
.unwrap();
assert_eq!(scanner.snapshot_sequence(), Some(expected_snapshot));
let second_rows = Rows {
schema: column_schemas,
rows: test_util::build_rows(3, 5),
};
test_util::put_rows(&engine, region_id, second_rows).await;
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(
batches.pretty_print().unwrap(),
"\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
+-------+---------+---------------------+"
);
}
#[tokio::test]
async fn test_snapshot_bound_query_keeps_open_snapshot_after_late_flush() {
let mut env =
TestEnv::with_prefix("test_snapshot_bound_query_keeps_open_snapshot_after_late_flush")
.await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let column_schemas = test_util::rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let first_rows = Rows {
schema: column_schemas.clone(),
rows: test_util::build_rows(0, 3),
};
test_util::put_rows(&engine, region_id, first_rows).await;
let expected_snapshot = engine.get_committed_sequence(region_id).await.unwrap();
let scanner = engine
.scanner(
region_id,
ScanRequest {
snapshot_on_scan: true,
..Default::default()
},
)
.await
.unwrap();
assert_eq!(scanner.snapshot_sequence(), Some(expected_snapshot));
let second_rows = Rows {
schema: column_schemas,
rows: test_util::build_rows(3, 5),
};
test_util::put_rows(&engine, region_id, second_rows).await;
test_util::flush_region(&engine, region_id, None).await;
assert_eq!(scanner.snapshot_sequence(), Some(expected_snapshot));
}
#[tokio::test]
async fn test_snapshot_bound_query_keeps_correct_result_after_late_flush() {
let mut env =
TestEnv::with_prefix("test_snapshot_bound_query_keeps_correct_result_after_late_flush")
.await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let column_schemas = test_util::rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let first_rows = Rows {
schema: column_schemas.clone(),
rows: test_util::build_rows(0, 3),
};
test_util::put_rows(&engine, region_id, first_rows).await;
let expected_snapshot = engine.get_committed_sequence(region_id).await.unwrap();
let scanner = engine
.scanner(
region_id,
ScanRequest {
snapshot_on_scan: true,
..Default::default()
},
)
.await
.unwrap();
assert_eq!(scanner.snapshot_sequence(), Some(expected_snapshot));
let second_rows = Rows {
schema: column_schemas,
rows: test_util::build_rows(3, 5),
};
test_util::put_rows(&engine, region_id, second_rows).await;
test_util::flush_region(&engine, region_id, None).await;
assert_eq!(scanner.snapshot_sequence(), Some(expected_snapshot));
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(
batches.pretty_print().unwrap(),
"\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
+-------+---------+---------------------+"
);
}
async fn test_scan_with_min_sst_sequence_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("test_scan_with_min_sst_sequence").await;
let engine = env

View File

@@ -38,7 +38,8 @@ use snafu::{OptionExt as _, ResultExt};
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::region_engine::{PartitionRange, RegionScannerRef};
use store_api::storage::{
ColumnId, RegionId, ScanRequest, SequenceRange, TimeSeriesDistribution, TimeSeriesRowSelector,
ColumnId, RegionId, ScanRequest, SequenceNumber, SequenceRange, TimeSeriesDistribution,
TimeSeriesRowSelector,
};
use table::predicate::{Predicate, build_time_range_predicate};
use tokio::sync::{Semaphore, mpsc};
@@ -149,6 +150,14 @@ impl Scanner {
}
}
pub(crate) fn snapshot_sequence(&self) -> Option<SequenceNumber> {
match self {
Scanner::Seq(seq_scan) => seq_scan.input().snapshot_sequence,
Scanner::Unordered(unordered_scan) => unordered_scan.input().snapshot_sequence,
Scanner::Series(series_scan) => series_scan.input().snapshot_sequence,
}
}
/// Sets the target partitions for the scanner. It can controls the parallelism of the scanner.
pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
use store_api::region_engine::{PrepareRequest, RegionScanner};
@@ -551,6 +560,12 @@ impl ScanRegion {
.with_merge_mode(self.version.options.merge_mode())
.with_series_row_selector(self.request.series_row_selector)
.with_distribution(self.request.distribution)
.with_snapshot_sequence(
self.request
.snapshot_on_scan
.then_some(self.request.memtable_max_sequence)
.flatten(),
)
.with_flat_format(flat_format);
#[cfg(feature = "vector_index")]
let input = input
@@ -856,6 +871,8 @@ pub struct ScanInput {
pub(crate) distribution: Option<TimeSeriesDistribution>,
/// Whether to use flat format.
pub(crate) flat_format: bool,
/// Snapshot upper bound bound at scan open and propagated back to the caller.
pub(crate) snapshot_sequence: Option<SequenceNumber>,
/// Whether this scan is for compaction.
pub(crate) compaction: bool,
#[cfg(feature = "enterprise")]
@@ -893,6 +910,7 @@ impl ScanInput {
series_row_selector: None,
distribution: None,
flat_format: false,
snapshot_sequence: None,
compaction: false,
#[cfg(feature = "enterprise")]
extension_ranges: Vec::new(),
@@ -1065,6 +1083,15 @@ impl ScanInput {
self
}
#[must_use]
pub(crate) fn with_snapshot_sequence(
mut self,
snapshot_sequence: Option<SequenceNumber>,
) -> Self {
self.snapshot_sequence = snapshot_sequence;
self
}
/// Sets whether this scan is for compaction.
#[must_use]
pub(crate) fn with_compaction(mut self, compaction: bool) -> Self {

View File

@@ -672,6 +672,10 @@ impl RegionScanner for SeqScan {
fn set_logical_region(&mut self, logical_region: bool) {
self.properties.set_logical_region(logical_region);
}
fn snapshot_sequence(&self) -> Option<u64> {
self.stream_ctx.input.snapshot_sequence
}
}
impl DisplayAs for SeqScan {

View File

@@ -374,6 +374,10 @@ impl RegionScanner for SeriesScan {
fn set_logical_region(&mut self, logical_region: bool) {
self.properties.set_logical_region(logical_region);
}
fn snapshot_sequence(&self) -> Option<u64> {
self.stream_ctx.input.snapshot_sequence
}
}
impl DisplayAs for SeriesScan {

View File

@@ -505,6 +505,10 @@ impl RegionScanner for UnorderedScan {
fn set_logical_region(&mut self, logical_region: bool) {
self.properties.set_logical_region(logical_region);
}
fn snapshot_sequence(&self) -> Option<u64> {
self.stream_ctx.input.snapshot_sequence
}
}
impl DisplayAs for UnorderedScan {

View File

@@ -188,6 +188,12 @@ impl TableProvider for DummyTableProvider {
.handle_query(self.region_id, request.clone())
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
if request.snapshot_on_scan
&& let Some(query_ctx) = &self.query_ctx
&& let Some(snapshot_sequence) = scanner.snapshot_sequence()
{
bind_snapshot_bound_region_seq(query_ctx, self.region_id, snapshot_sequence);
}
let query_memory_permit = self.engine.register_query_memory_permit();
let mut scan_exec = RegionScanExec::new(scanner, request, query_memory_permit)?;
if let Some(query_ctx) = &self.query_ctx {
@@ -317,8 +323,8 @@ fn scan_request_from_query_context(
query_ctx: &QueryContext,
) -> Result<ScanRequest> {
let mut scan_request = ScanRequest {
memtable_max_sequence: query_ctx.get_snapshot(region_id.as_u64()),
sst_min_sequence: query_ctx.sst_min_sequence(region_id.as_u64()),
snapshot_on_scan: query_requires_snapshot_bound(query_ctx),
..Default::default()
};
@@ -338,6 +344,33 @@ fn scan_request_from_query_context(
Ok(scan_request)
}
fn query_requires_snapshot_bound(query_ctx: &QueryContext) -> bool {
FlowQueryExtensions::from_extensions(&query_ctx.extensions())
.map(|extensions| extensions.should_collect_region_watermark())
.unwrap_or(false)
}
fn bind_snapshot_bound_region_seq(
query_ctx: &QueryContext,
region_id: RegionId,
snapshot_sequence: u64,
) -> u64 {
if let Some(existing) = query_ctx.get_snapshot(region_id.as_u64()) {
if existing != snapshot_sequence {
common_telemetry::warn!(
"conflicting snapshot sequence observed for region {} in a single query; keeping the first bound snapshot (existing={}, new={})",
region_id,
existing,
snapshot_sequence,
);
}
existing
} else {
query_ctx.set_snapshot(region_id.as_u64(), snapshot_sequence);
snapshot_sequence
}
}
#[async_trait]
impl TableProviderFactory for DummyTableProviderFactory {
async fn create(
@@ -471,7 +504,7 @@ impl CatalogManager for DummyCatalogManager {
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::RwLock;
use std::sync::{Arc, RwLock};
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
@@ -485,6 +518,48 @@ mod tests {
RegionId::new(1024, 1)
}
#[test]
fn test_scan_request_from_query_context_uses_snapshot_bound_intent() {
let region_id = test_region_id();
let query_ctx = QueryContextBuilder::default()
.extensions(HashMap::from([(
"flow.return_region_seq".to_string(),
"true".to_string(),
)]))
.snapshot_seqs(Arc::new(RwLock::new(HashMap::from([(
region_id.as_u64(),
42_u64,
)]))))
.sst_min_sequences(Arc::new(RwLock::new(HashMap::from([(
region_id.as_u64(),
7_u64,
)]))))
.build();
let request = scan_request_from_query_context(region_id, &query_ctx).unwrap();
assert!(request.snapshot_on_scan);
assert_eq!(request.memtable_max_sequence, None);
assert_eq!(request.sst_min_sequence, Some(7));
}
#[test]
fn test_scan_request_from_incremental_context_uses_snapshot_bound_intent() {
let region_id = test_region_id();
let query_ctx = QueryContextBuilder::default()
.extensions(HashMap::from([(
"flow.incremental_after_seqs".to_string(),
format!(r#"{{"{}":10}}"#, region_id.as_u64()),
)]))
.build();
let request = scan_request_from_query_context(region_id, &query_ctx).unwrap();
assert!(request.snapshot_on_scan);
assert_eq!(request.memtable_min_sequence, Some(10));
assert_eq!(request.memtable_max_sequence, None);
}
#[test]
fn test_scan_request_from_query_context_keeps_snapshot_fields() {
let region_id = test_region_id();
@@ -500,9 +575,37 @@ mod tests {
.build();
let request = scan_request_from_query_context(region_id, &query_ctx).unwrap();
assert_eq!(request.memtable_max_sequence, Some(100));
assert_eq!(request.memtable_max_sequence, None);
assert_eq!(request.sst_min_sequence, Some(90));
assert_eq!(request.memtable_min_sequence, None);
assert!(!request.snapshot_on_scan);
}
#[test]
fn test_bind_snapshot_bound_region_seq_reuses_existing_snapshot() {
let region_id = test_region_id();
let query_ctx = QueryContextBuilder::default()
.snapshot_seqs(Arc::new(RwLock::new(HashMap::from([(
region_id.as_u64(),
42_u64,
)]))))
.build();
let seq = bind_snapshot_bound_region_seq(&query_ctx, region_id, 99);
assert_eq!(seq, 42);
assert_eq!(query_ctx.get_snapshot(region_id.as_u64()), Some(42));
}
#[test]
fn test_bind_snapshot_bound_region_seq_sets_snapshot_once() {
let region_id = test_region_id();
let query_ctx = QueryContextBuilder::default().build();
let seq = bind_snapshot_bound_region_seq(&query_ctx, region_id, 99);
assert_eq!(seq, 99);
assert_eq!(query_ctx.get_snapshot(region_id.as_u64()), Some(99));
}
#[test]

View File

@@ -462,6 +462,10 @@ pub trait RegionScanner: Debug + DisplayAs + Send {
/// Sets whether the scanner is reading a logical region.
fn set_logical_region(&mut self, logical_region: bool);
fn snapshot_sequence(&self) -> Option<SequenceNumber> {
None
}
}
pub type RegionScannerRef = Box<dyn RegionScanner>;
@@ -945,6 +949,7 @@ pub struct SinglePartitionScanner {
schema: SchemaRef,
properties: ScannerProperties,
metadata: RegionMetadataRef,
snapshot_sequence: Option<SequenceNumber>,
}
impl SinglePartitionScanner {
@@ -953,6 +958,7 @@ impl SinglePartitionScanner {
stream: SendableRecordBatchStream,
append_mode: bool,
metadata: RegionMetadataRef,
snapshot_sequence: Option<SequenceNumber>,
) -> Self {
let schema = stream.schema();
Self {
@@ -960,6 +966,7 @@ impl SinglePartitionScanner {
schema,
properties: ScannerProperties::default().with_append_mode(append_mode),
metadata,
snapshot_sequence,
}
}
}
@@ -1019,6 +1026,10 @@ impl RegionScanner for SinglePartitionScanner {
fn set_logical_region(&mut self, logical_region: bool) {
self.properties.set_logical_region(logical_region);
}
fn snapshot_sequence(&self) -> Option<SequenceNumber> {
self.snapshot_sequence
}
}
impl DisplayAs for SinglePartitionScanner {

View File

@@ -112,6 +112,8 @@ pub struct ScanRequest {
/// Optional constraint on the sequence number of the rows to read.
/// If set, only rows with a sequence number **lesser or equal** to this value
/// will be returned.
/// This is the effective memtable upper bound used by the scan, whether provided
/// explicitly or bound on scan open.
pub memtable_max_sequence: Option<SequenceNumber>,
/// Optional constraint on the minimal sequence number in the memtable.
/// If set, only the memtables that contain sequences **greater than** this value will be scanned
@@ -119,6 +121,8 @@ pub struct ScanRequest {
/// Optional constraint on the minimal sequence number in the SST files.
/// If set, only the SST files that contain sequences greater than this value will be scanned.
pub sst_min_sequence: Option<SequenceNumber>,
/// Whether to bind the effective snapshot upper bound when opening the scan.
pub snapshot_on_scan: bool,
/// Optional hint for the distribution of time-series data.
pub distribution: Option<TimeSeriesDistribution>,
/// Optional hint for KNN vector search. When set, the scan should use
@@ -195,6 +199,14 @@ impl Display for ScanRequest {
sst_min_sequence
)?;
}
if self.snapshot_on_scan {
write!(
f,
"{}snapshot_on_scan: {}",
delimiter.as_str(),
self.snapshot_on_scan
)?;
}
if let Some(distribution) = &self.distribution {
write!(f, "{}distribution: {}", delimiter.as_str(), distribution)?;
}
@@ -278,5 +290,14 @@ mod tests {
request.to_string(),
"ScanRequest { force_flat_format: true }"
);
let request = ScanRequest {
snapshot_on_scan: true,
..Default::default()
};
assert_eq!(
request.to_string(),
"ScanRequest { snapshot_on_scan: true }"
);
}
}

View File

@@ -599,7 +599,12 @@ mod test {
.primary_key(vec![1]);
let region_metadata = Arc::new(builder.build().unwrap());
let scanner = Box::new(SinglePartitionScanner::new(stream, false, region_metadata));
let scanner = Box::new(SinglePartitionScanner::new(
stream,
false,
region_metadata,
None,
));
let plan = RegionScanExec::new(scanner, ScanRequest::default(), None).unwrap();
let actual: SchemaRef = Arc::new(
plan.properties