mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-14 03:50:39 +00:00
feat(flow): inc query scan bind seq (#7879)
* feat: add sequence scan boundaries and stale detection Signed-off-by: discord9 <discord9@163.com> * fix: only scan bind once Signed-off-by: discord9 <discord9@163.com> * refactor: flow snapshot decision Signed-off-by: discord9 <discord9@163.com> * refactor: one place modify Signed-off-by: discord9 <discord9@163.com> * chore: per review Signed-off-by: discord9 <discord9@163.com> * refactor: pre review Signed-off-by: discord9 <discord9@163.com> * per review Signed-off-by: discord9 <discord9@163.com> --------- Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
@@ -129,8 +129,8 @@ use crate::cache::{CacheManagerRef, CacheStrategy};
|
||||
use crate::config::MitoConfig;
|
||||
use crate::engine::puffin_index::{IndexEntryContext, collect_index_entries_from_puffin};
|
||||
use crate::error::{
|
||||
InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result,
|
||||
SerdeJsonSnafu, SerializeColumnMetadataSnafu,
|
||||
IncrementalQueryStaleSnafu, InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu,
|
||||
RegionNotFoundSnafu, Result, SerdeJsonSnafu, SerializeColumnMetadataSnafu,
|
||||
};
|
||||
#[cfg(feature = "enterprise")]
|
||||
use crate::extension::BoxedExtensionRangeProviderFactory;
|
||||
@@ -1013,11 +1013,29 @@ impl EngineInner {
|
||||
|
||||
/// Handles the scan `request` and returns a [ScanRegion].
|
||||
#[tracing::instrument(skip_all, fields(region_id = %region_id))]
|
||||
fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
|
||||
fn scan_region(&self, region_id: RegionId, mut request: ScanRequest) -> Result<ScanRegion> {
|
||||
let query_start = Instant::now();
|
||||
// Reading a region doesn't need to go through the region worker thread.
|
||||
let region = self.find_region(region_id)?;
|
||||
let version = region.version();
|
||||
let version_data = region.version_control.current();
|
||||
let version = version_data.version;
|
||||
|
||||
if request.snapshot_on_scan && request.memtable_max_sequence.is_none() {
|
||||
request.memtable_max_sequence = Some(version_data.committed_sequence);
|
||||
}
|
||||
|
||||
if let Some(given_seq) = request.memtable_min_sequence {
|
||||
let min_readable_seq = version.flushed_sequence;
|
||||
ensure!(
|
||||
given_seq >= min_readable_seq,
|
||||
IncrementalQueryStaleSnafu {
|
||||
region_id,
|
||||
given_seq,
|
||||
min_readable_seq,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
// Get cache.
|
||||
let cache_manager = self.workers.cache_manager();
|
||||
|
||||
|
||||
@@ -27,16 +27,312 @@ use store_api::region_request::RegionRequest;
|
||||
use store_api::storage::{RegionId, ScanRequest, TimeSeriesDistribution};
|
||||
|
||||
use crate::config::MitoConfig;
|
||||
use crate::error::Error;
|
||||
use crate::read::scan_region::Scanner;
|
||||
use crate::test_util;
|
||||
use crate::test_util::{CreateRequestBuilder, TestEnv};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_incremental_query_stale_error() {
|
||||
let mut env = TestEnv::with_prefix("test_incremental_query_stale_error").await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
let column_schemas = test_util::rows_schema(&request);
|
||||
|
||||
engine
|
||||
.handle_request(region_id, RegionRequest::Create(request))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let rows = Rows {
|
||||
schema: column_schemas.clone(),
|
||||
rows: test_util::build_rows(0, 3),
|
||||
};
|
||||
test_util::put_rows(&engine, region_id, rows).await;
|
||||
test_util::flush_region(&engine, region_id, None).await;
|
||||
|
||||
let err = engine
|
||||
.scanner(
|
||||
region_id,
|
||||
ScanRequest {
|
||||
memtable_min_sequence: Some(0),
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await
|
||||
.err()
|
||||
.expect("expect stale incremental error");
|
||||
|
||||
let min_readable_seq = match &err {
|
||||
Error::IncrementalQueryStale {
|
||||
region_id: err_region_id,
|
||||
given_seq,
|
||||
min_readable_seq,
|
||||
..
|
||||
} => {
|
||||
assert_eq!(*err_region_id, region_id);
|
||||
assert_eq!(*given_seq, 0);
|
||||
assert!(*min_readable_seq > 0);
|
||||
*min_readable_seq
|
||||
}
|
||||
_ => panic!("unexpected err: {err}"),
|
||||
};
|
||||
assert_eq!(StatusCode::RequestOutdated, err.status_code());
|
||||
let err_msg = err.to_string();
|
||||
assert!(err_msg.contains("STALE_CURSOR"));
|
||||
assert!(err_msg.contains(®ion_id.to_string()));
|
||||
assert!(err_msg.contains("given_seq: 0"));
|
||||
assert!(err_msg.contains(&format!("min_readable_seq: {min_readable_seq}")));
|
||||
|
||||
let incremental_rows = Rows {
|
||||
schema: column_schemas,
|
||||
rows: test_util::build_rows(3, 5),
|
||||
};
|
||||
test_util::put_rows(&engine, region_id, incremental_rows).await;
|
||||
|
||||
let scanner = engine
|
||||
.scanner(
|
||||
region_id,
|
||||
ScanRequest {
|
||||
memtable_min_sequence: Some(min_readable_seq),
|
||||
sst_min_sequence: Some(u64::MAX),
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let stream = scanner.scan().await.unwrap();
|
||||
let batches = RecordBatches::try_collect(stream).await.unwrap();
|
||||
assert_eq!(
|
||||
batches.pretty_print().unwrap(),
|
||||
"\
|
||||
+-------+---------+---------------------+
|
||||
| tag_0 | field_0 | ts |
|
||||
+-------+---------+---------------------+
|
||||
| 3 | 3.0 | 1970-01-01T00:00:03 |
|
||||
| 4 | 4.0 | 1970-01-01T00:00:04 |
|
||||
+-------+---------+---------------------+"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_scan_with_min_sst_sequence() {
|
||||
test_scan_with_min_sst_sequence_with_format(false).await;
|
||||
test_scan_with_min_sst_sequence_with_format(true).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_full_snapshot_upper_bound_does_not_constrain_sst_rows() {
|
||||
let mut env =
|
||||
TestEnv::with_prefix("test_full_snapshot_upper_bound_does_not_constrain_sst_rows").await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
let column_schemas = test_util::rows_schema(&request);
|
||||
|
||||
engine
|
||||
.handle_request(region_id, RegionRequest::Create(request))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let first_rows = Rows {
|
||||
schema: column_schemas.clone(),
|
||||
rows: test_util::build_rows(0, 3),
|
||||
};
|
||||
test_util::put_rows(&engine, region_id, first_rows).await;
|
||||
test_util::flush_region(&engine, region_id, None).await;
|
||||
|
||||
let snapshot_upper_bound = engine.get_committed_sequence(region_id).await.unwrap();
|
||||
|
||||
let second_rows = Rows {
|
||||
schema: column_schemas,
|
||||
rows: test_util::build_rows(3, 5),
|
||||
};
|
||||
test_util::put_rows(&engine, region_id, second_rows).await;
|
||||
test_util::flush_region(&engine, region_id, None).await;
|
||||
|
||||
let scanner = engine
|
||||
.scanner(
|
||||
region_id,
|
||||
ScanRequest {
|
||||
memtable_max_sequence: Some(snapshot_upper_bound),
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let stream = scanner.scan().await.unwrap();
|
||||
let batches = RecordBatches::try_collect(stream).await.unwrap();
|
||||
let pretty = batches.pretty_print().unwrap();
|
||||
|
||||
assert!(pretty.contains("1970-01-01T00:00:03"));
|
||||
assert!(pretty.contains("1970-01-01T00:00:04"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_snapshot_bound_query_binds_memtable_upper_bound_at_scan_open() {
|
||||
let mut env =
|
||||
TestEnv::with_prefix("test_snapshot_bound_query_binds_memtable_upper_bound_at_scan_open")
|
||||
.await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
let column_schemas = test_util::rows_schema(&request);
|
||||
|
||||
engine
|
||||
.handle_request(region_id, RegionRequest::Create(request))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let first_rows = Rows {
|
||||
schema: column_schemas.clone(),
|
||||
rows: test_util::build_rows(0, 3),
|
||||
};
|
||||
test_util::put_rows(&engine, region_id, first_rows).await;
|
||||
|
||||
let expected_snapshot = engine.get_committed_sequence(region_id).await.unwrap();
|
||||
let scanner = engine
|
||||
.scanner(
|
||||
region_id,
|
||||
ScanRequest {
|
||||
snapshot_on_scan: true,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(scanner.snapshot_sequence(), Some(expected_snapshot));
|
||||
|
||||
let second_rows = Rows {
|
||||
schema: column_schemas,
|
||||
rows: test_util::build_rows(3, 5),
|
||||
};
|
||||
test_util::put_rows(&engine, region_id, second_rows).await;
|
||||
|
||||
let stream = scanner.scan().await.unwrap();
|
||||
let batches = RecordBatches::try_collect(stream).await.unwrap();
|
||||
assert_eq!(
|
||||
batches.pretty_print().unwrap(),
|
||||
"\
|
||||
+-------+---------+---------------------+
|
||||
| tag_0 | field_0 | ts |
|
||||
+-------+---------+---------------------+
|
||||
| 0 | 0.0 | 1970-01-01T00:00:00 |
|
||||
| 1 | 1.0 | 1970-01-01T00:00:01 |
|
||||
| 2 | 2.0 | 1970-01-01T00:00:02 |
|
||||
+-------+---------+---------------------+"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_snapshot_bound_query_keeps_open_snapshot_after_late_flush() {
|
||||
let mut env =
|
||||
TestEnv::with_prefix("test_snapshot_bound_query_keeps_open_snapshot_after_late_flush")
|
||||
.await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
let column_schemas = test_util::rows_schema(&request);
|
||||
|
||||
engine
|
||||
.handle_request(region_id, RegionRequest::Create(request))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let first_rows = Rows {
|
||||
schema: column_schemas.clone(),
|
||||
rows: test_util::build_rows(0, 3),
|
||||
};
|
||||
test_util::put_rows(&engine, region_id, first_rows).await;
|
||||
|
||||
let expected_snapshot = engine.get_committed_sequence(region_id).await.unwrap();
|
||||
let scanner = engine
|
||||
.scanner(
|
||||
region_id,
|
||||
ScanRequest {
|
||||
snapshot_on_scan: true,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(scanner.snapshot_sequence(), Some(expected_snapshot));
|
||||
|
||||
let second_rows = Rows {
|
||||
schema: column_schemas,
|
||||
rows: test_util::build_rows(3, 5),
|
||||
};
|
||||
test_util::put_rows(&engine, region_id, second_rows).await;
|
||||
test_util::flush_region(&engine, region_id, None).await;
|
||||
|
||||
assert_eq!(scanner.snapshot_sequence(), Some(expected_snapshot));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_snapshot_bound_query_keeps_correct_result_after_late_flush() {
|
||||
let mut env =
|
||||
TestEnv::with_prefix("test_snapshot_bound_query_keeps_correct_result_after_late_flush")
|
||||
.await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
let column_schemas = test_util::rows_schema(&request);
|
||||
|
||||
engine
|
||||
.handle_request(region_id, RegionRequest::Create(request))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let first_rows = Rows {
|
||||
schema: column_schemas.clone(),
|
||||
rows: test_util::build_rows(0, 3),
|
||||
};
|
||||
test_util::put_rows(&engine, region_id, first_rows).await;
|
||||
|
||||
let expected_snapshot = engine.get_committed_sequence(region_id).await.unwrap();
|
||||
let scanner = engine
|
||||
.scanner(
|
||||
region_id,
|
||||
ScanRequest {
|
||||
snapshot_on_scan: true,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(scanner.snapshot_sequence(), Some(expected_snapshot));
|
||||
|
||||
let second_rows = Rows {
|
||||
schema: column_schemas,
|
||||
rows: test_util::build_rows(3, 5),
|
||||
};
|
||||
test_util::put_rows(&engine, region_id, second_rows).await;
|
||||
test_util::flush_region(&engine, region_id, None).await;
|
||||
|
||||
assert_eq!(scanner.snapshot_sequence(), Some(expected_snapshot));
|
||||
|
||||
let stream = scanner.scan().await.unwrap();
|
||||
let batches = RecordBatches::try_collect(stream).await.unwrap();
|
||||
assert_eq!(
|
||||
batches.pretty_print().unwrap(),
|
||||
"\
|
||||
+-------+---------+---------------------+
|
||||
| tag_0 | field_0 | ts |
|
||||
+-------+---------+---------------------+
|
||||
| 0 | 0.0 | 1970-01-01T00:00:00 |
|
||||
| 1 | 1.0 | 1970-01-01T00:00:01 |
|
||||
| 2 | 2.0 | 1970-01-01T00:00:02 |
|
||||
+-------+---------+---------------------+"
|
||||
);
|
||||
}
|
||||
|
||||
async fn test_scan_with_min_sst_sequence_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::with_prefix("test_scan_with_min_sst_sequence").await;
|
||||
let engine = env
|
||||
|
||||
@@ -228,6 +228,20 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"STALE_CURSOR: incremental query stale, region: {}, given_seq: {}, min_readable_seq: {}, retry_hint: FALLBACK_FULL_RECOMPUTE",
|
||||
region_id,
|
||||
given_seq,
|
||||
min_readable_seq
|
||||
))]
|
||||
IncrementalQueryStale {
|
||||
region_id: RegionId,
|
||||
given_seq: u64,
|
||||
min_readable_seq: u64,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Old manifest missing for region {}", region_id))]
|
||||
MissingOldManifest {
|
||||
region_id: RegionId,
|
||||
@@ -1298,6 +1312,8 @@ impl ErrorExt for Error {
|
||||
| SerializePartitionExpr { .. }
|
||||
| InvalidSourceAndTargetRegion { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
IncrementalQueryStale { .. } => StatusCode::RequestOutdated,
|
||||
|
||||
RegionMetadataNotFound { .. }
|
||||
| Join { .. }
|
||||
| WorkerStopped { .. }
|
||||
|
||||
@@ -38,7 +38,8 @@ use snafu::{OptionExt as _, ResultExt};
|
||||
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
|
||||
use store_api::region_engine::{PartitionRange, RegionScannerRef};
|
||||
use store_api::storage::{
|
||||
ColumnId, RegionId, ScanRequest, SequenceRange, TimeSeriesDistribution, TimeSeriesRowSelector,
|
||||
ColumnId, RegionId, ScanRequest, SequenceNumber, SequenceRange, TimeSeriesDistribution,
|
||||
TimeSeriesRowSelector,
|
||||
};
|
||||
use table::predicate::{Predicate, build_time_range_predicate, extract_time_range_from_expr};
|
||||
use tokio::sync::{Semaphore, mpsc};
|
||||
@@ -147,6 +148,14 @@ impl Scanner {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn snapshot_sequence(&self) -> Option<SequenceNumber> {
|
||||
match self {
|
||||
Scanner::Seq(seq_scan) => seq_scan.input().snapshot_sequence,
|
||||
Scanner::Unordered(unordered_scan) => unordered_scan.input().snapshot_sequence,
|
||||
Scanner::Series(series_scan) => series_scan.input().snapshot_sequence,
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets the target partitions for the scanner. It can controls the parallelism of the scanner.
|
||||
pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
|
||||
use store_api::region_engine::{PrepareRequest, RegionScanner};
|
||||
@@ -523,6 +532,12 @@ impl ScanRegion {
|
||||
.with_distribution(self.request.distribution)
|
||||
.with_explain_flat_format(
|
||||
self.version.options.sst_format == Some(crate::sst::FormatType::Flat),
|
||||
)
|
||||
.with_snapshot_sequence(
|
||||
self.request
|
||||
.snapshot_on_scan
|
||||
.then_some(self.request.memtable_max_sequence)
|
||||
.flatten(),
|
||||
);
|
||||
#[cfg(feature = "vector_index")]
|
||||
let input = input
|
||||
@@ -826,6 +841,8 @@ pub struct ScanInput {
|
||||
pub(crate) distribution: Option<TimeSeriesDistribution>,
|
||||
/// Whether the region's configured SST format is flat.
|
||||
explain_flat_format: bool,
|
||||
/// Snapshot upper bound bound at scan open and propagated back to the caller.
|
||||
pub(crate) snapshot_sequence: Option<SequenceNumber>,
|
||||
/// Whether this scan is for compaction.
|
||||
pub(crate) compaction: bool,
|
||||
#[cfg(feature = "enterprise")]
|
||||
@@ -862,6 +879,7 @@ impl ScanInput {
|
||||
series_row_selector: None,
|
||||
distribution: None,
|
||||
explain_flat_format: false,
|
||||
snapshot_sequence: None,
|
||||
compaction: false,
|
||||
#[cfg(feature = "enterprise")]
|
||||
extension_ranges: Vec::new(),
|
||||
@@ -1024,6 +1042,15 @@ impl ScanInput {
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub(crate) fn with_snapshot_sequence(
|
||||
mut self,
|
||||
snapshot_sequence: Option<SequenceNumber>,
|
||||
) -> Self {
|
||||
self.snapshot_sequence = snapshot_sequence;
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets whether this scan is for compaction.
|
||||
#[must_use]
|
||||
pub(crate) fn with_compaction(mut self, compaction: bool) -> Self {
|
||||
|
||||
@@ -572,6 +572,10 @@ impl RegionScanner for SeqScan {
|
||||
fn set_logical_region(&mut self, logical_region: bool) {
|
||||
self.properties.set_logical_region(logical_region);
|
||||
}
|
||||
|
||||
fn snapshot_sequence(&self) -> Option<u64> {
|
||||
self.stream_ctx.input.snapshot_sequence
|
||||
}
|
||||
}
|
||||
|
||||
impl DisplayAs for SeqScan {
|
||||
|
||||
@@ -380,6 +380,10 @@ impl RegionScanner for SeriesScan {
|
||||
fn set_logical_region(&mut self, logical_region: bool) {
|
||||
self.properties.set_logical_region(logical_region);
|
||||
}
|
||||
|
||||
fn snapshot_sequence(&self) -> Option<u64> {
|
||||
self.stream_ctx.input.snapshot_sequence
|
||||
}
|
||||
}
|
||||
|
||||
impl DisplayAs for SeriesScan {
|
||||
|
||||
@@ -351,6 +351,10 @@ impl RegionScanner for UnorderedScan {
|
||||
fn set_logical_region(&mut self, logical_region: bool) {
|
||||
self.properties.set_logical_region(logical_region);
|
||||
}
|
||||
|
||||
fn snapshot_sequence(&self) -> Option<u64> {
|
||||
self.stream_ctx.input.snapshot_sequence
|
||||
}
|
||||
}
|
||||
|
||||
impl DisplayAs for UnorderedScan {
|
||||
|
||||
@@ -313,6 +313,15 @@ impl MitoRegion {
|
||||
self.version_control.committed_sequence()
|
||||
}
|
||||
|
||||
/// Returns the latest sequence that has already been persisted into SSTs.
|
||||
///
|
||||
/// Incremental memtable-only reads must use a cursor greater than or equal to
|
||||
/// this boundary; older cursors are stale because the corresponding updates may
|
||||
/// already have been flushed out of memtables.
|
||||
pub fn flushed_sequence(&self) -> SequenceNumber {
|
||||
self.version_control.current().version.flushed_sequence
|
||||
}
|
||||
|
||||
/// Returns whether the region is readonly.
|
||||
pub fn is_follower(&self) -> bool {
|
||||
self.manifest_ctx.state.load() == RegionRoleState::Follower
|
||||
|
||||
@@ -43,6 +43,7 @@ use table::metadata::{TableId, TableInfoRef};
|
||||
use table::table::scan::RegionScanExec;
|
||||
|
||||
use crate::error::{GetRegionMetadataSnafu, Result};
|
||||
use crate::options::FlowQueryExtensions;
|
||||
|
||||
/// Resolve to the given region (specified by [RegionId]) unconditionally.
|
||||
#[derive(Clone, Debug)]
|
||||
@@ -182,11 +183,26 @@ impl TableProvider for DummyTableProvider {
|
||||
request.filters = filters.to_vec();
|
||||
request.limit = limit;
|
||||
|
||||
if let Some(query_ctx) = &self.query_ctx {
|
||||
let is_sink_scan = is_sink_scan(query_ctx, self.region_id)
|
||||
.map_err(|e| DataFusionError::External(Box::new(e)))?;
|
||||
apply_cached_snapshot_to_request(query_ctx, self.region_id, is_sink_scan, &mut request);
|
||||
}
|
||||
|
||||
let scanner = self
|
||||
.engine
|
||||
.handle_query(self.region_id, request.clone())
|
||||
.await
|
||||
.map_err(|e| DataFusionError::External(Box::new(e)))?;
|
||||
|
||||
if request.snapshot_on_scan
|
||||
&& let Some(query_ctx) = &self.query_ctx
|
||||
&& let Some(snapshot_sequence) = scanner.snapshot_sequence()
|
||||
{
|
||||
bind_snapshot_bound_region_seq(query_ctx, self.region_id, snapshot_sequence)
|
||||
.map_err(|e| DataFusionError::External(Box::new(e)))?;
|
||||
}
|
||||
|
||||
let query_memory_tracker = self.engine.query_memory_tracker();
|
||||
let mut scan_exec = RegionScanExec::new(scanner, request, query_memory_tracker)?;
|
||||
if let Some(query_ctx) = &self.query_ctx {
|
||||
@@ -295,14 +311,11 @@ impl DummyTableProviderFactory {
|
||||
region_id,
|
||||
})?;
|
||||
|
||||
let scan_request = query_ctx
|
||||
.as_ref()
|
||||
.map(|ctx| ScanRequest {
|
||||
memtable_max_sequence: ctx.get_snapshot(region_id.as_u64()),
|
||||
sst_min_sequence: ctx.sst_min_sequence(region_id.as_u64()),
|
||||
..Default::default()
|
||||
})
|
||||
.unwrap_or_default();
|
||||
let scan_request = if let Some(ctx) = query_ctx.as_ref() {
|
||||
scan_request_from_query_context(region_id, ctx)?
|
||||
} else {
|
||||
ScanRequest::default()
|
||||
};
|
||||
|
||||
Ok(DummyTableProvider {
|
||||
region_id,
|
||||
@@ -314,6 +327,152 @@ impl DummyTableProviderFactory {
|
||||
}
|
||||
}
|
||||
|
||||
fn scan_request_from_query_context(
|
||||
region_id: RegionId,
|
||||
query_ctx: &QueryContext,
|
||||
) -> Result<ScanRequest> {
|
||||
let decision = decide_flow_scan(query_ctx, region_id)?;
|
||||
Ok(build_scan_request(query_ctx, region_id, &decision))
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
struct FlowScanDecision {
|
||||
/// Whether this region is the flow sink-table scan.
|
||||
/// Sink scans intentionally bypass incremental and snapshot-binding semantics.
|
||||
is_sink_scan: bool,
|
||||
/// Whether this scan should bind a memtable upper bound when opening the scan.
|
||||
/// This is only the initial intent; if a cached bound already exists in `query_ctx`,
|
||||
/// we reuse that cached bound instead and clear this flag.
|
||||
snapshot_on_scan: bool,
|
||||
/// Optional lower exclusive memtable sequence bound for incremental reads.
|
||||
/// When set, only rows with sequence strictly greater than this bound are read from memtables.
|
||||
memtable_min_sequence: Option<u64>,
|
||||
/// Optional cached per-region snapshot already bound in `query_ctx`.
|
||||
/// When present, this becomes the effective memtable upper bound and suppresses
|
||||
/// binding a new snapshot on scan open.
|
||||
memtable_max_sequence: Option<u64>,
|
||||
}
|
||||
|
||||
impl FlowScanDecision {
|
||||
fn plain_scan() -> Self {
|
||||
Self {
|
||||
is_sink_scan: true,
|
||||
snapshot_on_scan: false,
|
||||
memtable_min_sequence: None,
|
||||
memtable_max_sequence: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn decide_flow_scan(query_ctx: &QueryContext, region_id: RegionId) -> Result<FlowScanDecision> {
|
||||
let Some(flow_extensions) =
|
||||
FlowQueryExtensions::parse_flow_extensions(&query_ctx.extensions())?
|
||||
else {
|
||||
return Ok(FlowScanDecision {
|
||||
is_sink_scan: false,
|
||||
snapshot_on_scan: false,
|
||||
memtable_min_sequence: None,
|
||||
memtable_max_sequence: query_ctx.get_snapshot(region_id.as_u64()),
|
||||
});
|
||||
};
|
||||
|
||||
// Sink-table scans intentionally bypass all flow scan semantics. They should
|
||||
// behave like plain reads and must not participate in incremental lower bounds
|
||||
// or per-region snapshot binding/reuse.
|
||||
if flow_extensions.sink_table_id == Some(region_id.table_id()) {
|
||||
return Ok(FlowScanDecision::plain_scan());
|
||||
}
|
||||
|
||||
let apply_incremental = flow_extensions.validate_for_scan(region_id)?;
|
||||
|
||||
let memtable_min_sequence = if apply_incremental {
|
||||
flow_extensions
|
||||
.incremental_after_seqs
|
||||
.as_ref()
|
||||
.and_then(|seqs| seqs.get(®ion_id.as_u64()))
|
||||
.copied()
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let memtable_max_sequence = query_ctx.get_snapshot(region_id.as_u64());
|
||||
|
||||
Ok(FlowScanDecision {
|
||||
is_sink_scan: false,
|
||||
snapshot_on_scan: memtable_max_sequence.is_none()
|
||||
&& flow_extensions.should_collect_region_watermark(),
|
||||
memtable_min_sequence,
|
||||
memtable_max_sequence,
|
||||
})
|
||||
}
|
||||
|
||||
fn build_scan_request(
|
||||
query_ctx: &QueryContext,
|
||||
region_id: RegionId,
|
||||
decision: &FlowScanDecision,
|
||||
) -> ScanRequest {
|
||||
// Build the initial scan request from the final decision known at provider creation
|
||||
// time. A later scan may still refresh `memtable_max_sequence` if another source scan
|
||||
// has bound a snapshot into `query_ctx` after this provider was created.
|
||||
ScanRequest {
|
||||
sst_min_sequence: (!decision.is_sink_scan)
|
||||
.then(|| query_ctx.sst_min_sequence(region_id.as_u64()))
|
||||
.flatten(),
|
||||
snapshot_on_scan: decision.snapshot_on_scan,
|
||||
memtable_min_sequence: decision.memtable_min_sequence,
|
||||
memtable_max_sequence: decision.memtable_max_sequence,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
fn is_sink_scan(query_ctx: &QueryContext, region_id: RegionId) -> Result<bool> {
|
||||
Ok(
|
||||
FlowQueryExtensions::parse_flow_extensions(&query_ctx.extensions())?
|
||||
.is_some_and(|exts| exts.sink_table_id == Some(region_id.table_id())),
|
||||
)
|
||||
}
|
||||
|
||||
fn apply_cached_snapshot_to_request(
|
||||
query_ctx: &QueryContext,
|
||||
region_id: RegionId,
|
||||
is_sink_scan: bool,
|
||||
scan_request: &mut ScanRequest,
|
||||
) {
|
||||
if is_sink_scan {
|
||||
return;
|
||||
}
|
||||
|
||||
if let Some(snapshot_sequence) = query_ctx.get_snapshot(region_id.as_u64()) {
|
||||
// Reuse the previously bound per-region snapshot instead of rebinding a new
|
||||
// upper bound on scan open. This refresh is still needed at scan time because
|
||||
// the provider's cached request may have been built before another source scan
|
||||
// bound the shared query-level snapshot into `query_ctx`.
|
||||
scan_request.memtable_max_sequence = Some(snapshot_sequence);
|
||||
scan_request.snapshot_on_scan = false;
|
||||
}
|
||||
}
|
||||
|
||||
fn bind_snapshot_bound_region_seq(
|
||||
query_ctx: &QueryContext,
|
||||
region_id: RegionId,
|
||||
snapshot_sequence: u64,
|
||||
) -> Result<u64> {
|
||||
if let Some(existing) = query_ctx.get_snapshot(region_id.as_u64()) {
|
||||
if existing != snapshot_sequence {
|
||||
return crate::error::ConflictingSnapshotSequenceSnafu {
|
||||
region_id,
|
||||
existing,
|
||||
new: snapshot_sequence,
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
Ok(existing)
|
||||
} else {
|
||||
query_ctx.set_snapshot(region_id.as_u64(), snapshot_sequence);
|
||||
Ok(snapshot_sequence)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl TableProviderFactory for DummyTableProviderFactory {
|
||||
async fn create(
|
||||
@@ -443,3 +602,277 @@ impl CatalogManager for DummyCatalogManager {
|
||||
Box::pin(futures::stream::empty())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use session::context::QueryContextBuilder;
|
||||
|
||||
use super::*;
|
||||
use crate::error::Error;
|
||||
use crate::options::{FLOW_INCREMENTAL_AFTER_SEQS, FLOW_INCREMENTAL_MODE, FLOW_SINK_TABLE_ID};
|
||||
|
||||
fn test_region_id() -> RegionId {
|
||||
RegionId::new(1024, 1)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_scan_request_from_query_context_uses_snapshot_bound_intent() {
|
||||
let region_id = test_region_id();
|
||||
let query_ctx = QueryContextBuilder::default()
|
||||
.extensions(HashMap::from([(
|
||||
"flow.return_region_seq".to_string(),
|
||||
"true".to_string(),
|
||||
)]))
|
||||
.snapshot_seqs(Arc::new(RwLock::new(HashMap::from([(
|
||||
region_id.as_u64(),
|
||||
42_u64,
|
||||
)]))))
|
||||
.sst_min_sequences(Arc::new(RwLock::new(HashMap::from([(
|
||||
region_id.as_u64(),
|
||||
7_u64,
|
||||
)]))))
|
||||
.build();
|
||||
|
||||
let request = scan_request_from_query_context(region_id, &query_ctx).unwrap();
|
||||
|
||||
assert!(!request.snapshot_on_scan);
|
||||
assert_eq!(request.memtable_max_sequence, Some(42));
|
||||
assert_eq!(request.sst_min_sequence, Some(7));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_scan_request_from_incremental_context_uses_snapshot_bound_intent() {
|
||||
let region_id = test_region_id();
|
||||
let query_ctx = QueryContextBuilder::default()
|
||||
.extensions(HashMap::from([(
|
||||
"flow.incremental_after_seqs".to_string(),
|
||||
format!(r#"{{"{}":10}}"#, region_id.as_u64()),
|
||||
)]))
|
||||
.build();
|
||||
|
||||
let request = scan_request_from_query_context(region_id, &query_ctx).unwrap();
|
||||
|
||||
assert!(request.snapshot_on_scan);
|
||||
assert_eq!(request.memtable_min_sequence, Some(10));
|
||||
assert_eq!(request.memtable_max_sequence, None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_scan_request_from_query_context_keeps_snapshot_fields() {
|
||||
let region_id = test_region_id();
|
||||
let query_ctx = QueryContextBuilder::default()
|
||||
.snapshot_seqs(Arc::new(RwLock::new(HashMap::from([(
|
||||
region_id.as_u64(),
|
||||
100,
|
||||
)]))))
|
||||
.sst_min_sequences(Arc::new(RwLock::new(HashMap::from([(
|
||||
region_id.as_u64(),
|
||||
90,
|
||||
)]))))
|
||||
.build();
|
||||
|
||||
let request = scan_request_from_query_context(region_id, &query_ctx).unwrap();
|
||||
assert_eq!(request.memtable_max_sequence, Some(100));
|
||||
assert_eq!(request.sst_min_sequence, Some(90));
|
||||
assert_eq!(request.memtable_min_sequence, None);
|
||||
assert!(!request.snapshot_on_scan);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_scan_request_from_query_context_reuses_existing_snapshot_for_incremental_scan() {
|
||||
let region_id = test_region_id();
|
||||
let query_ctx = QueryContextBuilder::default()
|
||||
.extensions(HashMap::from([(
|
||||
FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
|
||||
format!(r#"{{"{}":10}}"#, region_id.as_u64()),
|
||||
)]))
|
||||
.snapshot_seqs(Arc::new(RwLock::new(HashMap::from([(
|
||||
region_id.as_u64(),
|
||||
42_u64,
|
||||
)]))))
|
||||
.build();
|
||||
|
||||
let request = scan_request_from_query_context(region_id, &query_ctx).unwrap();
|
||||
|
||||
assert_eq!(request.memtable_min_sequence, Some(10));
|
||||
assert_eq!(request.memtable_max_sequence, Some(42));
|
||||
assert!(!request.snapshot_on_scan);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_apply_cached_snapshot_to_request_updates_cached_scan_request() {
|
||||
let region_id = test_region_id();
|
||||
let query_ctx = QueryContextBuilder::default()
|
||||
.snapshot_seqs(Arc::new(RwLock::new(HashMap::from([(
|
||||
region_id.as_u64(),
|
||||
88_u64,
|
||||
)]))))
|
||||
.build();
|
||||
let mut request = ScanRequest {
|
||||
snapshot_on_scan: true,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
apply_cached_snapshot_to_request(&query_ctx, region_id, false, &mut request);
|
||||
|
||||
assert_eq!(request.memtable_max_sequence, Some(88));
|
||||
assert!(!request.snapshot_on_scan);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_apply_cached_snapshot_to_request_skips_sink_scan() {
|
||||
let region_id = test_region_id();
|
||||
let query_ctx = QueryContextBuilder::default()
|
||||
.snapshot_seqs(Arc::new(RwLock::new(HashMap::from([(
|
||||
region_id.as_u64(),
|
||||
88_u64,
|
||||
)]))))
|
||||
.build();
|
||||
let mut request = ScanRequest {
|
||||
snapshot_on_scan: true,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
apply_cached_snapshot_to_request(&query_ctx, region_id, true, &mut request);
|
||||
|
||||
assert_eq!(request.memtable_max_sequence, None);
|
||||
assert!(request.snapshot_on_scan);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bind_snapshot_bound_region_seq_reuses_existing_snapshot() {
|
||||
let region_id = test_region_id();
|
||||
let query_ctx = QueryContextBuilder::default()
|
||||
.snapshot_seqs(Arc::new(RwLock::new(HashMap::from([(
|
||||
region_id.as_u64(),
|
||||
42_u64,
|
||||
)]))))
|
||||
.build();
|
||||
|
||||
let err = bind_snapshot_bound_region_seq(&query_ctx, region_id, 99).unwrap_err();
|
||||
|
||||
assert!(matches!(err, Error::ConflictingSnapshotSequence { .. }));
|
||||
assert_eq!(query_ctx.get_snapshot(region_id.as_u64()), Some(42));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bind_snapshot_bound_region_seq_sets_snapshot_once() {
|
||||
let region_id = test_region_id();
|
||||
let query_ctx = QueryContextBuilder::default().build();
|
||||
|
||||
let seq = bind_snapshot_bound_region_seq(&query_ctx, region_id, 99).unwrap();
|
||||
|
||||
assert_eq!(seq, 99);
|
||||
assert_eq!(query_ctx.get_snapshot(region_id.as_u64()), Some(99));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_scan_request_from_query_context_applies_incremental_after_seq_for_source_scan() {
|
||||
let region_id = test_region_id();
|
||||
let query_ctx = QueryContextBuilder::default()
|
||||
.extensions(HashMap::from([
|
||||
(
|
||||
FLOW_INCREMENTAL_MODE.to_string(),
|
||||
"memtable_only".to_string(),
|
||||
),
|
||||
(
|
||||
FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
|
||||
format!(r#"{{"{}":55}}"#, region_id.as_u64()),
|
||||
),
|
||||
]))
|
||||
.build();
|
||||
|
||||
let request = scan_request_from_query_context(region_id, &query_ctx).unwrap();
|
||||
assert_eq!(request.memtable_min_sequence, Some(55));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_scan_request_from_query_context_does_not_apply_incremental_for_sink_table() {
|
||||
let region_id = test_region_id();
|
||||
let query_ctx = QueryContextBuilder::default()
|
||||
.extensions(HashMap::from([
|
||||
(
|
||||
FLOW_INCREMENTAL_MODE.to_string(),
|
||||
"memtable_only".to_string(),
|
||||
),
|
||||
(
|
||||
FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
|
||||
format!(r#"{{"{}":55}}"#, region_id.as_u64()),
|
||||
),
|
||||
(
|
||||
FLOW_SINK_TABLE_ID.to_string(),
|
||||
region_id.table_id().to_string(),
|
||||
),
|
||||
]))
|
||||
.snapshot_seqs(Arc::new(RwLock::new(HashMap::from([(
|
||||
region_id.as_u64(),
|
||||
88_u64,
|
||||
)]))))
|
||||
.sst_min_sequences(Arc::new(RwLock::new(HashMap::from([(
|
||||
region_id.as_u64(),
|
||||
77_u64,
|
||||
)]))))
|
||||
.build();
|
||||
|
||||
let request = scan_request_from_query_context(region_id, &query_ctx).unwrap();
|
||||
assert_eq!(request.memtable_min_sequence, None);
|
||||
assert_eq!(request.memtable_max_sequence, None);
|
||||
assert_eq!(request.sst_min_sequence, None);
|
||||
assert!(!request.snapshot_on_scan);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_scan_request_from_query_context_rejects_missing_memtable_only_region() {
|
||||
let region_id = test_region_id();
|
||||
let query_ctx = QueryContextBuilder::default()
|
||||
.extensions(HashMap::from([
|
||||
(
|
||||
FLOW_INCREMENTAL_MODE.to_string(),
|
||||
"memtable_only".to_string(),
|
||||
),
|
||||
(
|
||||
FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
|
||||
r#"{"9":55}"#.to_string(),
|
||||
),
|
||||
]))
|
||||
.build();
|
||||
|
||||
let err = scan_request_from_query_context(region_id, &query_ctx).unwrap_err();
|
||||
assert!(matches!(err, Error::InvalidQueryContextExtension { .. }));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_scan_request_from_query_context_rejects_invalid_incremental_json() {
|
||||
let region_id = test_region_id();
|
||||
let query_ctx = QueryContextBuilder::default()
|
||||
.extensions(HashMap::from([(
|
||||
FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
|
||||
"not-json".to_string(),
|
||||
)]))
|
||||
.build();
|
||||
|
||||
let err = scan_request_from_query_context(region_id, &query_ctx).unwrap_err();
|
||||
assert!(matches!(err, Error::InvalidQueryContextExtension { .. }));
|
||||
assert_eq!(err.status_code(), StatusCode::InvalidArguments);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_scan_request_from_query_context_rejects_invalid_sink_table_id() {
|
||||
let region_id = test_region_id();
|
||||
let query_ctx = QueryContextBuilder::default()
|
||||
.extensions(HashMap::from([(
|
||||
FLOW_SINK_TABLE_ID.to_string(),
|
||||
"abc".to_string(),
|
||||
)]))
|
||||
.build();
|
||||
|
||||
let err = scan_request_from_query_context(region_id, &query_ctx).unwrap_err();
|
||||
assert!(matches!(err, Error::InvalidQueryContextExtension { .. }));
|
||||
assert_eq!(err.status_code(), StatusCode::InvalidArguments);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -375,6 +375,20 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Conflicting snapshot sequence observed for region {} in a single query (existing={}, new={})",
|
||||
region_id,
|
||||
existing,
|
||||
new
|
||||
))]
|
||||
ConflictingSnapshotSequence {
|
||||
region_id: RegionId,
|
||||
existing: u64,
|
||||
new: u64,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(transparent)]
|
||||
Datatypes {
|
||||
source: datatypes::error::Error,
|
||||
@@ -407,7 +421,8 @@ impl ErrorExt for Error {
|
||||
| CteColumnSchemaMismatch { .. }
|
||||
| ConvertValue { .. }
|
||||
| TryIntoDuration { .. }
|
||||
| InvalidQueryContextExtension { .. } => StatusCode::InvalidArguments,
|
||||
| InvalidQueryContextExtension { .. }
|
||||
| ConflictingSnapshotSequence { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
BuildBackend { .. } | ListObjects { .. } => StatusCode::StorageUnavailable,
|
||||
|
||||
|
||||
@@ -71,7 +71,21 @@ pub struct FlowQueryExtensions {
|
||||
}
|
||||
|
||||
impl FlowQueryExtensions {
|
||||
pub fn from_extensions(extensions: &HashMap<String, String>) -> Result<Self> {
|
||||
/// Parses flow-specific query extensions when any flow key is present.
|
||||
///
|
||||
/// Returns `Ok(None)` for ordinary queries with no flow-related extensions,
|
||||
/// `Ok(Some(_))` when flow context is present and valid, and `Err(_)` when a
|
||||
/// flow-related extension is present but malformed or incomplete.
|
||||
pub fn parse_flow_extensions(extensions: &HashMap<String, String>) -> Result<Option<Self>> {
|
||||
let has_flow_context = extensions.contains_key(FLOW_INCREMENTAL_AFTER_SEQS)
|
||||
|| extensions.contains_key(FLOW_INCREMENTAL_MODE)
|
||||
|| extensions.contains_key(FLOW_RETURN_REGION_SEQ)
|
||||
|| extensions.contains_key(FLOW_SINK_TABLE_ID);
|
||||
|
||||
if !has_flow_context {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let incremental_mode = extensions
|
||||
.get(FLOW_INCREMENTAL_MODE)
|
||||
.map(|value| match value.as_str() {
|
||||
@@ -127,12 +141,12 @@ impl FlowQueryExtensions {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
Ok(Some(Self {
|
||||
incremental_after_seqs,
|
||||
incremental_mode,
|
||||
return_region_seq,
|
||||
sink_table_id,
|
||||
})
|
||||
}))
|
||||
}
|
||||
|
||||
pub fn validate_for_scan(&self, source_region_id: RegionId) -> Result<bool> {
|
||||
@@ -230,14 +244,11 @@ mod flow_extension_tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_parse_flow_extensions_default() {
|
||||
fn test_parse_flow_extensions_returns_none_for_non_flow_query() {
|
||||
let exts = HashMap::new();
|
||||
let parsed = FlowQueryExtensions::from_extensions(&exts).unwrap();
|
||||
let parsed = FlowQueryExtensions::parse_flow_extensions(&exts).unwrap();
|
||||
|
||||
assert_eq!(parsed.incremental_mode, None);
|
||||
assert_eq!(parsed.incremental_after_seqs, None);
|
||||
assert!(!parsed.return_region_seq);
|
||||
assert_eq!(parsed.sink_table_id, None);
|
||||
assert_eq!(parsed, None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -255,7 +266,9 @@ mod flow_extension_tests {
|
||||
(FLOW_SINK_TABLE_ID.to_string(), "1024".to_string()),
|
||||
]);
|
||||
|
||||
let parsed = FlowQueryExtensions::from_extensions(&exts).unwrap();
|
||||
let parsed = FlowQueryExtensions::parse_flow_extensions(&exts)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
parsed.incremental_mode,
|
||||
Some(FlowIncrementalMode::MemtableOnly)
|
||||
@@ -275,7 +288,7 @@ mod flow_extension_tests {
|
||||
FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string(),
|
||||
)]);
|
||||
|
||||
let err = FlowQueryExtensions::from_extensions(&exts).unwrap_err();
|
||||
let err = FlowQueryExtensions::parse_flow_extensions(&exts).unwrap_err();
|
||||
assert!(format!("{err}").contains(FLOW_INCREMENTAL_AFTER_SEQS));
|
||||
}
|
||||
|
||||
@@ -283,7 +296,7 @@ mod flow_extension_tests {
|
||||
fn test_parse_flow_extensions_invalid_mode() {
|
||||
let exts = HashMap::from([(FLOW_INCREMENTAL_MODE.to_string(), "foo".to_string())]);
|
||||
|
||||
let err = FlowQueryExtensions::from_extensions(&exts).unwrap_err();
|
||||
let err = FlowQueryExtensions::parse_flow_extensions(&exts).unwrap_err();
|
||||
assert!(format!("{err}").contains(FLOW_INCREMENTAL_MODE));
|
||||
}
|
||||
|
||||
@@ -300,7 +313,7 @@ mod flow_extension_tests {
|
||||
),
|
||||
]);
|
||||
|
||||
let err = FlowQueryExtensions::from_extensions(&exts).unwrap_err();
|
||||
let err = FlowQueryExtensions::parse_flow_extensions(&exts).unwrap_err();
|
||||
assert!(format!("{err}").contains(FLOW_INCREMENTAL_AFTER_SEQS));
|
||||
}
|
||||
|
||||
@@ -311,7 +324,9 @@ mod flow_extension_tests {
|
||||
r#"{"1":"10","2":"20"}"#.to_string(),
|
||||
)]);
|
||||
|
||||
let parsed = FlowQueryExtensions::from_extensions(&exts).unwrap();
|
||||
let parsed = FlowQueryExtensions::parse_flow_extensions(&exts)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
parsed.incremental_after_seqs.unwrap(),
|
||||
HashMap::from([(1, 10), (2, 20)])
|
||||
@@ -325,7 +340,7 @@ mod flow_extension_tests {
|
||||
r#"{"1":true}"#.to_string(),
|
||||
)]);
|
||||
|
||||
let err = FlowQueryExtensions::from_extensions(&exts).unwrap_err();
|
||||
let err = FlowQueryExtensions::parse_flow_extensions(&exts).unwrap_err();
|
||||
assert!(format!("{err}").contains(FLOW_INCREMENTAL_AFTER_SEQS));
|
||||
}
|
||||
|
||||
@@ -333,7 +348,7 @@ mod flow_extension_tests {
|
||||
fn test_parse_flow_extensions_invalid_sink_table_id() {
|
||||
let exts = HashMap::from([(FLOW_SINK_TABLE_ID.to_string(), "x".to_string())]);
|
||||
|
||||
let err = FlowQueryExtensions::from_extensions(&exts).unwrap_err();
|
||||
let err = FlowQueryExtensions::parse_flow_extensions(&exts).unwrap_err();
|
||||
assert!(format!("{err}").contains(FLOW_SINK_TABLE_ID));
|
||||
}
|
||||
|
||||
@@ -352,7 +367,9 @@ mod flow_extension_tests {
|
||||
),
|
||||
]);
|
||||
|
||||
let parsed = FlowQueryExtensions::from_extensions(&exts).unwrap();
|
||||
let parsed = FlowQueryExtensions::parse_flow_extensions(&exts)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let err = parsed.validate_for_scan(source_region_id).unwrap_err();
|
||||
assert!(format!("{err}").contains("Missing region"));
|
||||
}
|
||||
@@ -372,7 +389,9 @@ mod flow_extension_tests {
|
||||
(FLOW_SINK_TABLE_ID.to_string(), "1024".to_string()),
|
||||
]);
|
||||
|
||||
let parsed = FlowQueryExtensions::from_extensions(&exts).unwrap();
|
||||
let parsed = FlowQueryExtensions::parse_flow_extensions(&exts)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let apply_incremental = parsed.validate_for_scan(source_region_id).unwrap();
|
||||
assert!(!apply_incremental);
|
||||
}
|
||||
@@ -400,4 +419,43 @@ mod flow_extension_tests {
|
||||
};
|
||||
assert!(parsed.should_collect_region_watermark());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_flow_extensions_return_region_seq_only_returns_some() {
|
||||
let exts = HashMap::from([(FLOW_RETURN_REGION_SEQ.to_string(), "true".to_string())]);
|
||||
|
||||
let parsed = FlowQueryExtensions::parse_flow_extensions(&exts)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
assert!(parsed.return_region_seq);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_flow_extensions_sink_table_only_returns_some() {
|
||||
let exts = HashMap::from([(FLOW_SINK_TABLE_ID.to_string(), "1024".to_string())]);
|
||||
|
||||
let parsed = FlowQueryExtensions::parse_flow_extensions(&exts)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(parsed.sink_table_id, Some(1024));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_flow_extensions_incremental_after_seqs_only_returns_some() {
|
||||
let exts = HashMap::from([(
|
||||
FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
|
||||
r#"{"1":10}"#.to_string(),
|
||||
)]);
|
||||
|
||||
let parsed = FlowQueryExtensions::parse_flow_extensions(&exts)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
parsed.incremental_after_seqs,
|
||||
Some(HashMap::from([(1, 10)]))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user