feat: add sequence scan boundaries and stale detection

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-04-08 15:05:16 +08:00
parent 2f8607138d
commit a7bef32bb0
9 changed files with 647 additions and 13 deletions

View File

@@ -129,8 +129,8 @@ use crate::cache::{CacheManagerRef, CacheStrategy};
use crate::config::MitoConfig;
use crate::engine::puffin_index::{IndexEntryContext, collect_index_entries_from_puffin};
use crate::error::{
InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result,
SerdeJsonSnafu, SerializeColumnMetadataSnafu,
IncrementalQueryStaleSnafu, InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu,
RegionNotFoundSnafu, Result, SerdeJsonSnafu, SerializeColumnMetadataSnafu,
};
#[cfg(feature = "enterprise")]
use crate::extension::BoxedExtensionRangeProviderFactory;
@@ -1013,11 +1013,29 @@ impl EngineInner {
/// Handles the scan `request` and returns a [ScanRegion].
#[tracing::instrument(skip_all, fields(region_id = %region_id))]
fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
fn scan_region(&self, region_id: RegionId, mut request: ScanRequest) -> Result<ScanRegion> {
let query_start = Instant::now();
// Reading a region doesn't need to go through the region worker thread.
let region = self.find_region(region_id)?;
let version = region.version();
let version_data = region.version_control.current();
let version = version_data.version;
if request.snapshot_on_scan && request.memtable_max_sequence.is_none() {
request.memtable_max_sequence = Some(version_data.committed_sequence);
}
if let Some(given_seq) = request.memtable_min_sequence {
let min_readable_seq = version.flushed_sequence;
ensure!(
given_seq >= min_readable_seq,
IncrementalQueryStaleSnafu {
region_id,
given_seq,
min_readable_seq,
}
);
}
// Get cache.
let cache_manager = self.workers.cache_manager();

View File

@@ -27,16 +27,312 @@ use store_api::region_request::RegionRequest;
use store_api::storage::{RegionId, ScanRequest, TimeSeriesDistribution};
use crate::config::MitoConfig;
use crate::error::Error;
use crate::read::scan_region::Scanner;
use crate::test_util;
use crate::test_util::{CreateRequestBuilder, TestEnv};
#[tokio::test]
async fn test_incremental_query_stale_error() {
let mut env = TestEnv::with_prefix("test_incremental_query_stale_error").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let column_schemas = test_util::rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let rows = Rows {
schema: column_schemas.clone(),
rows: test_util::build_rows(0, 3),
};
test_util::put_rows(&engine, region_id, rows).await;
test_util::flush_region(&engine, region_id, None).await;
let err = engine
.scanner(
region_id,
ScanRequest {
memtable_min_sequence: Some(0),
..Default::default()
},
)
.await
.err()
.expect("expect stale incremental error");
let min_readable_seq = match &err {
Error::IncrementalQueryStale {
region_id: err_region_id,
given_seq,
min_readable_seq,
..
} => {
assert_eq!(*err_region_id, region_id);
assert_eq!(*given_seq, 0);
assert!(*min_readable_seq > 0);
*min_readable_seq
}
_ => panic!("unexpected err: {err}"),
};
assert_eq!(StatusCode::RequestOutdated, err.status_code());
let err_msg = err.to_string();
assert!(err_msg.contains("STALE_CURSOR"));
assert!(err_msg.contains(&region_id.to_string()));
assert!(err_msg.contains("given_seq: 0"));
assert!(err_msg.contains(&format!("min_readable_seq: {min_readable_seq}")));
let incremental_rows = Rows {
schema: column_schemas,
rows: test_util::build_rows(3, 5),
};
test_util::put_rows(&engine, region_id, incremental_rows).await;
let scanner = engine
.scanner(
region_id,
ScanRequest {
memtable_min_sequence: Some(min_readable_seq),
sst_min_sequence: Some(u64::MAX),
..Default::default()
},
)
.await
.unwrap();
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(
batches.pretty_print().unwrap(),
"\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 3 | 3.0 | 1970-01-01T00:00:03 |
| 4 | 4.0 | 1970-01-01T00:00:04 |
+-------+---------+---------------------+"
);
}
#[tokio::test]
async fn test_scan_with_min_sst_sequence() {
test_scan_with_min_sst_sequence_with_format(false).await;
test_scan_with_min_sst_sequence_with_format(true).await;
}
#[tokio::test]
async fn test_full_snapshot_upper_bound_does_not_constrain_sst_rows() {
let mut env =
TestEnv::with_prefix("test_full_snapshot_upper_bound_does_not_constrain_sst_rows").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let column_schemas = test_util::rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let first_rows = Rows {
schema: column_schemas.clone(),
rows: test_util::build_rows(0, 3),
};
test_util::put_rows(&engine, region_id, first_rows).await;
test_util::flush_region(&engine, region_id, None).await;
let snapshot_upper_bound = engine.get_committed_sequence(region_id).await.unwrap();
let second_rows = Rows {
schema: column_schemas,
rows: test_util::build_rows(3, 5),
};
test_util::put_rows(&engine, region_id, second_rows).await;
test_util::flush_region(&engine, region_id, None).await;
let scanner = engine
.scanner(
region_id,
ScanRequest {
memtable_max_sequence: Some(snapshot_upper_bound),
..Default::default()
},
)
.await
.unwrap();
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let pretty = batches.pretty_print().unwrap();
assert!(pretty.contains("1970-01-01T00:00:03"));
assert!(pretty.contains("1970-01-01T00:00:04"));
}
#[tokio::test]
async fn test_snapshot_bound_query_binds_memtable_upper_bound_at_scan_open() {
let mut env =
TestEnv::with_prefix("test_snapshot_bound_query_binds_memtable_upper_bound_at_scan_open")
.await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let column_schemas = test_util::rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let first_rows = Rows {
schema: column_schemas.clone(),
rows: test_util::build_rows(0, 3),
};
test_util::put_rows(&engine, region_id, first_rows).await;
let expected_snapshot = engine.get_committed_sequence(region_id).await.unwrap();
let scanner = engine
.scanner(
region_id,
ScanRequest {
snapshot_on_scan: true,
..Default::default()
},
)
.await
.unwrap();
assert_eq!(scanner.snapshot_sequence(), Some(expected_snapshot));
let second_rows = Rows {
schema: column_schemas,
rows: test_util::build_rows(3, 5),
};
test_util::put_rows(&engine, region_id, second_rows).await;
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(
batches.pretty_print().unwrap(),
"\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
+-------+---------+---------------------+"
);
}
#[tokio::test]
async fn test_snapshot_bound_query_keeps_open_snapshot_after_late_flush() {
let mut env =
TestEnv::with_prefix("test_snapshot_bound_query_keeps_open_snapshot_after_late_flush")
.await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let column_schemas = test_util::rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let first_rows = Rows {
schema: column_schemas.clone(),
rows: test_util::build_rows(0, 3),
};
test_util::put_rows(&engine, region_id, first_rows).await;
let expected_snapshot = engine.get_committed_sequence(region_id).await.unwrap();
let scanner = engine
.scanner(
region_id,
ScanRequest {
snapshot_on_scan: true,
..Default::default()
},
)
.await
.unwrap();
assert_eq!(scanner.snapshot_sequence(), Some(expected_snapshot));
let second_rows = Rows {
schema: column_schemas,
rows: test_util::build_rows(3, 5),
};
test_util::put_rows(&engine, region_id, second_rows).await;
test_util::flush_region(&engine, region_id, None).await;
assert_eq!(scanner.snapshot_sequence(), Some(expected_snapshot));
}
#[tokio::test]
async fn test_snapshot_bound_query_keeps_correct_result_after_late_flush() {
let mut env =
TestEnv::with_prefix("test_snapshot_bound_query_keeps_correct_result_after_late_flush")
.await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let column_schemas = test_util::rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let first_rows = Rows {
schema: column_schemas.clone(),
rows: test_util::build_rows(0, 3),
};
test_util::put_rows(&engine, region_id, first_rows).await;
let expected_snapshot = engine.get_committed_sequence(region_id).await.unwrap();
let scanner = engine
.scanner(
region_id,
ScanRequest {
snapshot_on_scan: true,
..Default::default()
},
)
.await
.unwrap();
assert_eq!(scanner.snapshot_sequence(), Some(expected_snapshot));
let second_rows = Rows {
schema: column_schemas,
rows: test_util::build_rows(3, 5),
};
test_util::put_rows(&engine, region_id, second_rows).await;
test_util::flush_region(&engine, region_id, None).await;
assert_eq!(scanner.snapshot_sequence(), Some(expected_snapshot));
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(
batches.pretty_print().unwrap(),
"\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
+-------+---------+---------------------+"
);
}
async fn test_scan_with_min_sst_sequence_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("test_scan_with_min_sst_sequence").await;
let engine = env

View File

@@ -228,6 +228,20 @@ pub enum Error {
location: Location,
},
#[snafu(display(
"STALE_CURSOR: incremental query stale, region: {}, given_seq: {}, min_readable_seq: {}, retry_hint: FALLBACK_FULL_RECOMPUTE",
region_id,
given_seq,
min_readable_seq
))]
IncrementalQueryStale {
region_id: RegionId,
given_seq: u64,
min_readable_seq: u64,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Old manifest missing for region {}", region_id))]
MissingOldManifest {
region_id: RegionId,
@@ -1295,6 +1309,8 @@ impl ErrorExt for Error {
| SerializePartitionExpr { .. }
| InvalidSourceAndTargetRegion { .. } => StatusCode::InvalidArguments,
IncrementalQueryStale { .. } => StatusCode::RequestOutdated,
RegionMetadataNotFound { .. }
| Join { .. }
| WorkerStopped { .. }

View File

@@ -38,7 +38,8 @@ use snafu::{OptionExt as _, ResultExt};
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::region_engine::{PartitionRange, RegionScannerRef};
use store_api::storage::{
ColumnId, RegionId, ScanRequest, SequenceRange, TimeSeriesDistribution, TimeSeriesRowSelector,
ColumnId, RegionId, ScanRequest, SequenceNumber, SequenceRange, TimeSeriesDistribution,
TimeSeriesRowSelector,
};
use table::predicate::{Predicate, build_time_range_predicate, extract_time_range_from_expr};
use tokio::sync::{Semaphore, mpsc};
@@ -147,6 +148,14 @@ impl Scanner {
}
}
pub(crate) fn snapshot_sequence(&self) -> Option<SequenceNumber> {
match self {
Scanner::Seq(seq_scan) => seq_scan.input().snapshot_sequence,
Scanner::Unordered(unordered_scan) => unordered_scan.input().snapshot_sequence,
Scanner::Series(series_scan) => series_scan.input().snapshot_sequence,
}
}
/// Sets the target partitions for the scanner. It can controls the parallelism of the scanner.
pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
use store_api::region_engine::{PrepareRequest, RegionScanner};
@@ -523,6 +532,12 @@ impl ScanRegion {
.with_distribution(self.request.distribution)
.with_explain_flat_format(
self.version.options.sst_format == Some(crate::sst::FormatType::Flat),
)
.with_snapshot_sequence(
self.request
.snapshot_on_scan
.then_some(self.request.memtable_max_sequence)
.flatten(),
);
#[cfg(feature = "vector_index")]
let input = input
@@ -826,6 +841,8 @@ pub struct ScanInput {
pub(crate) distribution: Option<TimeSeriesDistribution>,
/// Whether the region's configured SST format is flat.
explain_flat_format: bool,
/// Snapshot upper bound bound at scan open and propagated back to the caller.
pub(crate) snapshot_sequence: Option<SequenceNumber>,
/// Whether this scan is for compaction.
pub(crate) compaction: bool,
#[cfg(feature = "enterprise")]
@@ -862,6 +879,7 @@ impl ScanInput {
series_row_selector: None,
distribution: None,
explain_flat_format: false,
snapshot_sequence: None,
compaction: false,
#[cfg(feature = "enterprise")]
extension_ranges: Vec::new(),
@@ -1024,6 +1042,15 @@ impl ScanInput {
self
}
#[must_use]
pub(crate) fn with_snapshot_sequence(
mut self,
snapshot_sequence: Option<SequenceNumber>,
) -> Self {
self.snapshot_sequence = snapshot_sequence;
self
}
/// Sets whether this scan is for compaction.
#[must_use]
pub(crate) fn with_compaction(mut self, compaction: bool) -> Self {

View File

@@ -511,6 +511,10 @@ impl RegionScanner for SeqScan {
fn set_logical_region(&mut self, logical_region: bool) {
self.properties.set_logical_region(logical_region);
}
fn snapshot_sequence(&self) -> Option<u64> {
self.stream_ctx.input.snapshot_sequence
}
}
impl DisplayAs for SeqScan {

View File

@@ -379,6 +379,10 @@ impl RegionScanner for SeriesScan {
fn set_logical_region(&mut self, logical_region: bool) {
self.properties.set_logical_region(logical_region);
}
fn snapshot_sequence(&self) -> Option<u64> {
self.stream_ctx.input.snapshot_sequence
}
}
impl DisplayAs for SeriesScan {

View File

@@ -351,6 +351,10 @@ impl RegionScanner for UnorderedScan {
fn set_logical_region(&mut self, logical_region: bool) {
self.properties.set_logical_region(logical_region);
}
fn snapshot_sequence(&self) -> Option<u64> {
self.stream_ctx.input.snapshot_sequence
}
}
impl DisplayAs for UnorderedScan {

View File

@@ -318,6 +318,10 @@ impl MitoRegion {
self.version_control.committed_sequence()
}
pub fn flushed_sequence(&self) -> SequenceNumber {
self.version_control.current().version.flushed_sequence
}
/// Returns whether the region is readonly.
pub fn is_follower(&self) -> bool {
self.manifest_ctx.state.load() == RegionRoleState::Follower

View File

@@ -43,6 +43,7 @@ use table::metadata::{TableId, TableInfoRef};
use table::table::scan::RegionScanExec;
use crate::error::{GetRegionMetadataSnafu, Result};
use crate::options::FlowQueryExtensions;
/// Resolve to the given region (specified by [RegionId]) unconditionally.
#[derive(Clone, Debug)]
@@ -187,6 +188,14 @@ impl TableProvider for DummyTableProvider {
.handle_query(self.region_id, request.clone())
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
if request.snapshot_on_scan
&& let Some(query_ctx) = &self.query_ctx
&& let Some(snapshot_sequence) = scanner.snapshot_sequence()
{
bind_snapshot_bound_region_seq(query_ctx, self.region_id, snapshot_sequence);
}
let query_memory_tracker = self.engine.query_memory_tracker();
let mut scan_exec = RegionScanExec::new(scanner, request, query_memory_tracker)?;
if let Some(query_ctx) = &self.query_ctx {
@@ -295,14 +304,11 @@ impl DummyTableProviderFactory {
region_id,
})?;
let scan_request = query_ctx
.as_ref()
.map(|ctx| ScanRequest {
memtable_max_sequence: ctx.get_snapshot(region_id.as_u64()),
sst_min_sequence: ctx.sst_min_sequence(region_id.as_u64()),
..Default::default()
})
.unwrap_or_default();
let scan_request = if let Some(ctx) = query_ctx.as_ref() {
scan_request_from_query_context(region_id, ctx)?
} else {
ScanRequest::default()
};
Ok(DummyTableProvider {
region_id,
@@ -314,6 +320,59 @@ impl DummyTableProviderFactory {
}
}
fn scan_request_from_query_context(
region_id: RegionId,
query_ctx: &QueryContext,
) -> Result<ScanRequest> {
let mut scan_request = ScanRequest {
sst_min_sequence: query_ctx.sst_min_sequence(region_id.as_u64()),
snapshot_on_scan: query_requires_snapshot_bound(query_ctx),
..Default::default()
};
let flow_extensions = FlowQueryExtensions::from_extensions(&query_ctx.extensions())?;
let should_apply_incremental = flow_extensions.validate_for_scan(region_id)?;
if should_apply_incremental
&& let Some(after_seq) = flow_extensions
.incremental_after_seqs
.as_ref()
.and_then(|seqs| seqs.get(&region_id.as_u64()))
.copied()
{
scan_request.memtable_min_sequence = Some(after_seq);
}
Ok(scan_request)
}
fn query_requires_snapshot_bound(query_ctx: &QueryContext) -> bool {
FlowQueryExtensions::from_extensions(&query_ctx.extensions())
.map(|extensions| extensions.should_collect_region_watermark())
.unwrap_or(false)
}
fn bind_snapshot_bound_region_seq(
query_ctx: &QueryContext,
region_id: RegionId,
snapshot_sequence: u64,
) -> u64 {
if let Some(existing) = query_ctx.get_snapshot(region_id.as_u64()) {
if existing != snapshot_sequence {
common_telemetry::warn!(
"conflicting snapshot sequence observed for region {} in a single query; keeping the first bound snapshot (existing={}, new={})",
region_id,
existing,
snapshot_sequence,
);
}
existing
} else {
query_ctx.set_snapshot(region_id.as_u64(), snapshot_sequence);
snapshot_sequence
}
}
#[async_trait]
impl TableProviderFactory for DummyTableProviderFactory {
async fn create(
@@ -443,3 +502,205 @@ impl CatalogManager for DummyCatalogManager {
Box::pin(futures::stream::empty())
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use session::context::QueryContextBuilder;
use super::*;
use crate::error::Error;
use crate::options::{FLOW_INCREMENTAL_AFTER_SEQS, FLOW_INCREMENTAL_MODE, FLOW_SINK_TABLE_ID};
fn test_region_id() -> RegionId {
RegionId::new(1024, 1)
}
#[test]
fn test_scan_request_from_query_context_uses_snapshot_bound_intent() {
let region_id = test_region_id();
let query_ctx = QueryContextBuilder::default()
.extensions(HashMap::from([(
"flow.return_region_seq".to_string(),
"true".to_string(),
)]))
.snapshot_seqs(Arc::new(RwLock::new(HashMap::from([(
region_id.as_u64(),
42_u64,
)]))))
.sst_min_sequences(Arc::new(RwLock::new(HashMap::from([(
region_id.as_u64(),
7_u64,
)]))))
.build();
let request = scan_request_from_query_context(region_id, &query_ctx).unwrap();
assert!(request.snapshot_on_scan);
assert_eq!(request.memtable_max_sequence, None);
assert_eq!(request.sst_min_sequence, Some(7));
}
#[test]
fn test_scan_request_from_incremental_context_uses_snapshot_bound_intent() {
let region_id = test_region_id();
let query_ctx = QueryContextBuilder::default()
.extensions(HashMap::from([(
"flow.incremental_after_seqs".to_string(),
format!(r#"{{"{}":10}}"#, region_id.as_u64()),
)]))
.build();
let request = scan_request_from_query_context(region_id, &query_ctx).unwrap();
assert!(request.snapshot_on_scan);
assert_eq!(request.memtable_min_sequence, Some(10));
assert_eq!(request.memtable_max_sequence, None);
}
#[test]
fn test_scan_request_from_query_context_keeps_snapshot_fields() {
let region_id = test_region_id();
let query_ctx = QueryContextBuilder::default()
.snapshot_seqs(Arc::new(RwLock::new(HashMap::from([(
region_id.as_u64(),
100,
)]))))
.sst_min_sequences(Arc::new(RwLock::new(HashMap::from([(
region_id.as_u64(),
90,
)]))))
.build();
let request = scan_request_from_query_context(region_id, &query_ctx).unwrap();
assert_eq!(request.memtable_max_sequence, None);
assert_eq!(request.sst_min_sequence, Some(90));
assert_eq!(request.memtable_min_sequence, None);
assert!(!request.snapshot_on_scan);
}
#[test]
fn test_bind_snapshot_bound_region_seq_reuses_existing_snapshot() {
let region_id = test_region_id();
let query_ctx = QueryContextBuilder::default()
.snapshot_seqs(Arc::new(RwLock::new(HashMap::from([(
region_id.as_u64(),
42_u64,
)]))))
.build();
let seq = bind_snapshot_bound_region_seq(&query_ctx, region_id, 99);
assert_eq!(seq, 42);
assert_eq!(query_ctx.get_snapshot(region_id.as_u64()), Some(42));
}
#[test]
fn test_bind_snapshot_bound_region_seq_sets_snapshot_once() {
let region_id = test_region_id();
let query_ctx = QueryContextBuilder::default().build();
let seq = bind_snapshot_bound_region_seq(&query_ctx, region_id, 99);
assert_eq!(seq, 99);
assert_eq!(query_ctx.get_snapshot(region_id.as_u64()), Some(99));
}
#[test]
fn test_scan_request_from_query_context_applies_incremental_after_seq_for_source_scan() {
let region_id = test_region_id();
let query_ctx = QueryContextBuilder::default()
.extensions(HashMap::from([
(
FLOW_INCREMENTAL_MODE.to_string(),
"memtable_only".to_string(),
),
(
FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
format!(r#"{{"{}":55}}"#, region_id.as_u64()),
),
]))
.build();
let request = scan_request_from_query_context(region_id, &query_ctx).unwrap();
assert_eq!(request.memtable_min_sequence, Some(55));
}
#[test]
fn test_scan_request_from_query_context_does_not_apply_incremental_for_sink_table() {
let region_id = test_region_id();
let query_ctx = QueryContextBuilder::default()
.extensions(HashMap::from([
(
FLOW_INCREMENTAL_MODE.to_string(),
"memtable_only".to_string(),
),
(
FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
format!(r#"{{"{}":55}}"#, region_id.as_u64()),
),
(
FLOW_SINK_TABLE_ID.to_string(),
region_id.table_id().to_string(),
),
]))
.build();
let request = scan_request_from_query_context(region_id, &query_ctx).unwrap();
assert_eq!(request.memtable_min_sequence, None);
}
#[test]
fn test_scan_request_from_query_context_rejects_missing_memtable_only_region() {
let region_id = test_region_id();
let query_ctx = QueryContextBuilder::default()
.extensions(HashMap::from([
(
FLOW_INCREMENTAL_MODE.to_string(),
"memtable_only".to_string(),
),
(
FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
r#"{"9":55}"#.to_string(),
),
]))
.build();
let err = scan_request_from_query_context(region_id, &query_ctx).unwrap_err();
assert!(matches!(err, Error::InvalidQueryContextExtension { .. }));
}
#[test]
fn test_scan_request_from_query_context_rejects_invalid_incremental_json() {
let region_id = test_region_id();
let query_ctx = QueryContextBuilder::default()
.extensions(HashMap::from([(
FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
"not-json".to_string(),
)]))
.build();
let err = scan_request_from_query_context(region_id, &query_ctx).unwrap_err();
assert!(matches!(err, Error::InvalidQueryContextExtension { .. }));
assert_eq!(err.status_code(), StatusCode::InvalidArguments);
}
#[test]
fn test_scan_request_from_query_context_rejects_invalid_sink_table_id() {
let region_id = test_region_id();
let query_ctx = QueryContextBuilder::default()
.extensions(HashMap::from([(
FLOW_SINK_TABLE_ID.to_string(),
"abc".to_string(),
)]))
.build();
let err = scan_request_from_query_context(region_id, &query_ctx).unwrap_err();
assert!(matches!(err, Error::InvalidQueryContextExtension { .. }));
assert_eq!(err.status_code(), StatusCode::InvalidArguments);
}
}