diff --git a/src/common/meta/src/ddl/tests/create_flow.rs b/src/common/meta/src/ddl/tests/create_flow.rs index 8803f98e0d..5b22c81857 100644 --- a/src/common/meta/src/ddl/tests/create_flow.rs +++ b/src/common/meta/src/ddl/tests/create_flow.rs @@ -37,6 +37,8 @@ fn test_query_context() -> QueryContext { timezone: "UTC".to_string(), extensions: HashMap::new(), channel: 0, + snapshot_seqs: HashMap::new(), + sst_min_sequences: HashMap::new(), } } @@ -251,6 +253,10 @@ fn test_create_flow_data_new_format_serialization() { catalog: "new_catalog".to_string(), schema: "new_schema".to_string(), timezone: "America/New_York".to_string(), + extensions: HashMap::new(), + channel: 0, + snapshot_seqs: HashMap::new(), + sst_min_sequences: HashMap::new(), }; let data = CreateFlowData { @@ -272,6 +278,9 @@ fn test_create_flow_data_new_format_serialization() { assert_eq!(deserialized.flow_context.catalog, "new_catalog"); assert_eq!(deserialized.flow_context.schema, "new_schema"); assert_eq!(deserialized.flow_context.timezone, "America/New_York"); + assert_eq!(deserialized.flow_context.channel, 0); + assert_eq!(deserialized.flow_context.snapshot_seqs, HashMap::new()); + assert_eq!(deserialized.flow_context.sst_min_sequences, HashMap::new()); } #[test] @@ -286,6 +295,8 @@ fn test_flow_query_context_conversion_from_query_context() { ] .into(), channel: 99, + snapshot_seqs: HashMap::from([(1, 10)]), + sst_min_sequences: HashMap::from([(1, 8)]), }; let flow_context: FlowQueryContext = query_context.into(); @@ -293,6 +304,9 @@ fn test_flow_query_context_conversion_from_query_context() { assert_eq!(flow_context.catalog, "prod_catalog"); assert_eq!(flow_context.schema, "public"); assert_eq!(flow_context.timezone, "America/Los_Angeles"); + assert_eq!(flow_context.channel, 99); + assert_eq!(flow_context.snapshot_seqs, HashMap::from([(1, 10)])); + assert_eq!(flow_context.sst_min_sequences, HashMap::from([(1, 8)])); } #[test] @@ -301,6 +315,10 @@ fn test_flow_info_conversion_with_flow_context() { catalog: "info_catalog".to_string(), schema: "info_schema".to_string(), timezone: "Europe/Berlin".to_string(), + extensions: HashMap::new(), + channel: 0, + snapshot_seqs: HashMap::new(), + sst_min_sequences: HashMap::new(), }; let data = CreateFlowData { @@ -349,6 +367,10 @@ fn test_mixed_serialization_format_support() { catalog: "test".to_string(), schema: "test".to_string(), timezone: "UTC".to_string(), + extensions: HashMap::new(), + channel: 0, + snapshot_seqs: HashMap::new(), + sst_min_sequences: HashMap::new(), }; assert_eq!(ctx_from_new, expected_new); } diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index 1e94a8f092..ed6f78154a 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -1432,6 +1432,12 @@ pub struct QueryContext { pub timezone: String, pub extensions: HashMap, pub channel: u8, + /// Maps region id -> snapshot upper bound sequence for that region. + #[serde(default)] + pub snapshot_seqs: HashMap, + /// Maps region id -> minimal SST sequence allowed for that region. + #[serde(default)] + pub sst_min_sequences: HashMap, } impl QueryContext { @@ -1459,6 +1465,14 @@ impl QueryContext { pub fn channel(&self) -> u8 { self.channel } + + pub fn snapshot_seqs(&self) -> &HashMap { + &self.snapshot_seqs + } + + pub fn sst_min_sequences(&self) -> &HashMap { + &self.sst_min_sequences + } } /// Lightweight query context for flow operations containing only essential fields. @@ -1466,12 +1480,24 @@ impl QueryContext { /// for flow creation and execution. #[derive(Debug, Clone, Serialize, PartialEq)] pub struct FlowQueryContext { - /// Current catalog name - needed for flow metadata and recovery + /// Current catalog name used for flow metadata and execution. pub catalog: String, - /// Current schema name - needed for table resolution during flow execution + /// Current schema name used for table resolution during flow execution. pub schema: String, - /// Timezone for timestamp operations in the flow + /// Timezone used for timestamp evaluation in the flow. pub timezone: String, + /// Query extensions carried into flow execution. + #[serde(default)] + pub extensions: HashMap, + /// Request channel propagated from the original query context. + #[serde(default)] + pub channel: u8, + /// Per-region snapshot upper bounds bound during query planning/execution. + #[serde(default)] + pub snapshot_seqs: HashMap, + /// Per-region lower SST scan bounds carried with the flow context. + #[serde(default)] + pub sst_min_sequences: HashMap, } impl<'de> Deserialize<'de> for FlowQueryContext { @@ -1492,6 +1518,14 @@ impl<'de> Deserialize<'de> for FlowQueryContext { catalog: String, schema: String, timezone: String, + #[serde(default)] + extensions: HashMap, + #[serde(default)] + channel: u8, + #[serde(default)] + snapshot_seqs: HashMap, + #[serde(default)] + sst_min_sequences: HashMap, } match ContextCompat::deserialize(deserializer)? { @@ -1499,6 +1533,10 @@ impl<'de> Deserialize<'de> for FlowQueryContext { catalog: helper.catalog, schema: helper.schema, timezone: helper.timezone, + extensions: helper.extensions, + channel: helper.channel, + snapshot_seqs: helper.snapshot_seqs, + sst_min_sequences: helper.sst_min_sequences, }), ContextCompat::Full(full_ctx) => Ok(full_ctx.into()), } @@ -1507,12 +1545,19 @@ impl<'de> Deserialize<'de> for FlowQueryContext { impl From for QueryContext { fn from(pb_ctx: PbQueryContext) -> Self { + let (snapshot_seqs, sst_min_sequences) = pb_ctx + .snapshot_seqs + .map(|seqs| (seqs.snapshot_seqs, seqs.sst_min_sequences)) + .unwrap_or_default(); + Self { current_catalog: pb_ctx.current_catalog, current_schema: pb_ctx.current_schema, timezone: pb_ctx.timezone, extensions: pb_ctx.extensions, channel: pb_ctx.channel as u8, + snapshot_seqs, + sst_min_sequences, } } } @@ -1525,6 +1570,8 @@ impl From for PbQueryContext { timezone, extensions, channel, + snapshot_seqs, + sst_min_sequences, }: QueryContext, ) -> Self { PbQueryContext { @@ -1533,7 +1580,12 @@ impl From for PbQueryContext { timezone, extensions, channel: channel as u32, - snapshot_seqs: None, + snapshot_seqs: (!snapshot_seqs.is_empty() || !sst_min_sequences.is_empty()).then_some( + api::v1::SnapshotSequences { + snapshot_seqs, + sst_min_sequences, + }, + ), explain: None, } } @@ -1545,6 +1597,10 @@ impl From for FlowQueryContext { catalog: ctx.current_catalog, schema: ctx.current_schema, timezone: ctx.timezone, + extensions: ctx.extensions, + channel: ctx.channel, + snapshot_seqs: ctx.snapshot_seqs, + sst_min_sequences: ctx.sst_min_sequences, } } } @@ -1555,8 +1611,10 @@ impl From for QueryContext { current_catalog: flow_ctx.catalog, current_schema: flow_ctx.schema, timezone: flow_ctx.timezone, - extensions: HashMap::new(), - channel: 0, // Use default channel for flows + extensions: flow_ctx.extensions, + channel: flow_ctx.channel, + snapshot_seqs: flow_ctx.snapshot_seqs, + sst_min_sequences: flow_ctx.sst_min_sequences, } } } @@ -1720,6 +1778,8 @@ mod tests { timezone: "UTC".to_string(), extensions, channel: 5, + snapshot_seqs: HashMap::from([(10, 100)]), + sst_min_sequences: HashMap::from([(10, 90)]), }; let flow_ctx: FlowQueryContext = query_ctx.into(); @@ -1727,6 +1787,9 @@ mod tests { assert_eq!(flow_ctx.catalog, "test_catalog"); assert_eq!(flow_ctx.schema, "test_schema"); assert_eq!(flow_ctx.timezone, "UTC"); + assert_eq!(flow_ctx.channel, 5); + assert_eq!(flow_ctx.snapshot_seqs, HashMap::from([(10, 100)])); + assert_eq!(flow_ctx.sst_min_sequences, HashMap::from([(10, 90)])); } #[test] @@ -1735,6 +1798,10 @@ mod tests { catalog: "prod_catalog".to_string(), schema: "public".to_string(), timezone: "America/New_York".to_string(), + extensions: HashMap::from([("k".to_string(), "v".to_string())]), + channel: 7, + snapshot_seqs: HashMap::from([(11, 111)]), + sst_min_sequences: HashMap::from([(11, 101)]), }; let query_ctx: QueryContext = flow_ctx.clone().into(); @@ -1742,8 +1809,13 @@ mod tests { assert_eq!(query_ctx.current_catalog, "prod_catalog"); assert_eq!(query_ctx.current_schema, "public"); assert_eq!(query_ctx.timezone, "America/New_York"); - assert!(query_ctx.extensions.is_empty()); - assert_eq!(query_ctx.channel, 0); + assert_eq!( + query_ctx.extensions, + HashMap::from([("k".to_string(), "v".to_string())]) + ); + assert_eq!(query_ctx.channel, 7); + assert_eq!(query_ctx.snapshot_seqs, HashMap::from([(11, 111)])); + assert_eq!(query_ctx.sst_min_sequences, HashMap::from([(11, 101)])); // Test roundtrip conversion let flow_ctx_roundtrip: FlowQueryContext = query_ctx.into(); @@ -1756,6 +1828,10 @@ mod tests { catalog: "test_catalog".to_string(), schema: "test_schema".to_string(), timezone: "UTC".to_string(), + extensions: HashMap::new(), + channel: 0, + snapshot_seqs: HashMap::new(), + sst_min_sequences: HashMap::new(), }; let serialized = serde_json::to_string(&flow_ctx).unwrap(); @@ -1776,6 +1852,10 @@ mod tests { catalog: "pb_catalog".to_string(), schema: "pb_schema".to_string(), timezone: "Asia/Tokyo".to_string(), + extensions: HashMap::from([("x".to_string(), "y".to_string())]), + channel: 6, + snapshot_seqs: HashMap::from([(3, 30)]), + sst_min_sequences: HashMap::from([(3, 21)]), }; let pb_ctx: PbQueryContext = flow_ctx.into(); @@ -1783,9 +1863,44 @@ mod tests { assert_eq!(pb_ctx.current_catalog, "pb_catalog"); assert_eq!(pb_ctx.current_schema, "pb_schema"); assert_eq!(pb_ctx.timezone, "Asia/Tokyo"); - assert!(pb_ctx.extensions.is_empty()); - assert_eq!(pb_ctx.channel, 0); - assert!(pb_ctx.snapshot_seqs.is_none()); + assert_eq!( + pb_ctx.extensions, + HashMap::from([("x".to_string(), "y".to_string())]) + ); + assert_eq!(pb_ctx.channel, 6); + assert_eq!( + pb_ctx.snapshot_seqs, + Some(api::v1::SnapshotSequences { + snapshot_seqs: HashMap::from([(3, 30)]), + sst_min_sequences: HashMap::from([(3, 21)]), + }) + ); assert!(pb_ctx.explain.is_none()); } + + #[test] + fn test_pb_query_context_roundtrip_with_snapshot_sequences() { + let pb = PbQueryContext { + current_catalog: "c1".to_string(), + current_schema: "s1".to_string(), + timezone: "UTC".to_string(), + extensions: HashMap::from([("flow.return_region_seq".to_string(), "true".to_string())]), + channel: 3, + snapshot_seqs: Some(api::v1::SnapshotSequences { + snapshot_seqs: HashMap::from([(1, 100)]), + sst_min_sequences: HashMap::from([(1, 90)]), + }), + explain: None, + }; + + let query_ctx: QueryContext = pb.clone().into(); + let pb_roundtrip: PbQueryContext = query_ctx.into(); + + assert_eq!(pb_roundtrip.current_catalog, pb.current_catalog); + assert_eq!(pb_roundtrip.current_schema, pb.current_schema); + assert_eq!(pb_roundtrip.timezone, pb.timezone); + assert_eq!(pb_roundtrip.extensions, pb.extensions); + assert_eq!(pb_roundtrip.channel, pb.channel); + assert_eq!(pb_roundtrip.snapshot_seqs, pb.snapshot_seqs); + } } diff --git a/src/file-engine/src/engine.rs b/src/file-engine/src/engine.rs index 693ae325df..175ebef237 100644 --- a/src/file-engine/src/engine.rs +++ b/src/file-engine/src/engine.rs @@ -94,7 +94,7 @@ impl RegionEngine for FileRegionEngine { let stream = self.handle_query(region_id, request).await?; let metadata = self.get_metadata(region_id).await?; // We don't support enabling append mode for file engine. - let scanner = Box::new(SinglePartitionScanner::new(stream, false, metadata)); + let scanner = Box::new(SinglePartitionScanner::new(stream, false, metadata, None)); Ok(scanner) } diff --git a/src/operator/src/utils.rs b/src/operator/src/utils.rs index 93da5f028e..6e9386b3fa 100644 --- a/src/operator/src/utils.rs +++ b/src/operator/src/utils.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::{Arc, RwLock}; + use common_time::Timezone; use session::context::{QueryContextBuilder, QueryContextRef}; use snafu::ResultExt; @@ -27,6 +29,8 @@ pub fn to_meta_query_context( timezone: query_context.timezone().to_string(), extensions: query_context.extensions(), channel: query_context.channel() as u8, + snapshot_seqs: query_context.snapshots(), + sst_min_sequences: query_context.sst_min_sequences(), } } @@ -43,5 +47,41 @@ pub fn try_to_session_query_context( ) .extensions(value.extensions) .channel((value.channel as u32).into()) + .snapshot_seqs(Arc::new(RwLock::new(value.snapshot_seqs))) + .sst_min_sequences(Arc::new(RwLock::new(value.sst_min_sequences))) .build()) } + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::{Arc, RwLock}; + + use common_time::Timezone; + use session::context::QueryContextBuilder; + + use super::{to_meta_query_context, try_to_session_query_context}; + + #[test] + fn test_query_context_meta_roundtrip_with_sequences() { + let session_ctx = Arc::new( + QueryContextBuilder::default() + .current_catalog("c1".to_string()) + .current_schema("s1".to_string()) + .timezone(Timezone::from_tz_string("UTC").unwrap()) + .set_extension("flow.return_region_seq".to_string(), "true".to_string()) + .snapshot_seqs(Arc::new(RwLock::new(HashMap::from([(10, 100)])))) + .sst_min_sequences(Arc::new(RwLock::new(HashMap::from([(10, 90)])))) + .build(), + ); + + let meta_ctx = to_meta_query_context(session_ctx); + let roundtrip = try_to_session_query_context(meta_ctx).unwrap(); + + assert_eq!(roundtrip.current_catalog(), "c1"); + assert_eq!(roundtrip.current_schema(), "s1"); + assert_eq!(roundtrip.snapshots(), HashMap::from([(10, 100)])); + assert_eq!(roundtrip.sst_min_sequences(), HashMap::from([(10, 90)])); + assert_eq!(roundtrip.extension("flow.return_region_seq"), Some("true")); + } +} diff --git a/src/query/src/error.rs b/src/query/src/error.rs index f863a26c4a..b3a4ebeba5 100644 --- a/src/query/src/error.rs +++ b/src/query/src/error.rs @@ -368,6 +368,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Invalid query context extension: {}", reason))] + InvalidQueryContextExtension { + reason: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(transparent)] Datatypes { source: datatypes::error::Error, @@ -399,7 +406,8 @@ impl ErrorExt for Error { | ColumnSchemaNoDefault { .. } | CteColumnSchemaMismatch { .. } | ConvertValue { .. } - | TryIntoDuration { .. } => StatusCode::InvalidArguments, + | TryIntoDuration { .. } + | InvalidQueryContextExtension { .. } => StatusCode::InvalidArguments, BuildBackend { .. } | ListObjects { .. } => StatusCode::StorageUnavailable, diff --git a/src/query/src/options.rs b/src/query/src/options.rs index 50ca1177a5..9b60b64759 100644 --- a/src/query/src/options.rs +++ b/src/query/src/options.rs @@ -12,8 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + use common_base::memory_limit::MemoryLimit; use serde::{Deserialize, Serialize}; +use store_api::storage::RegionId; +use table::metadata::TableId; + +use crate::error::{Error, InvalidQueryContextExtensionSnafu, Result}; + +pub const FLOW_INCREMENTAL_AFTER_SEQS: &str = "flow.incremental_after_seqs"; +pub const FLOW_INCREMENTAL_MODE: &str = "flow.incremental_mode"; +pub const FLOW_RETURN_REGION_SEQ: &str = "flow.return_region_seq"; +pub const FLOW_SINK_TABLE_ID: &str = "flow.sink_table_id"; + +pub const FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY: &str = "memtable_only"; /// Query engine config #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] @@ -39,3 +52,352 @@ impl Default for QueryOptions { } } } + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum FlowIncrementalMode { + MemtableOnly, +} + +#[derive(Debug, Clone, PartialEq, Eq, Default)] +pub struct FlowQueryExtensions { + /// Maps region id -> lower exclusive sequence bound for incremental reads. + pub incremental_after_seqs: Option>, + /// Incremental read mode requested by the caller. + pub incremental_mode: Option, + /// Whether the caller expects per-region watermark metadata in terminal metrics. + pub return_region_seq: bool, + /// Optional sink table id used to distinguish source scans from sink reads. + pub sink_table_id: Option, +} + +impl FlowQueryExtensions { + pub fn from_extensions(extensions: &HashMap) -> Result { + let incremental_mode = extensions + .get(FLOW_INCREMENTAL_MODE) + .map(|value| match value.as_str() { + v if v.eq_ignore_ascii_case(FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY) => { + Ok(FlowIncrementalMode::MemtableOnly) + } + _ => Err(invalid_query_context_extension(format!( + "Invalid value for {}: {}", + FLOW_INCREMENTAL_MODE, value + ))), + }) + .transpose()?; + + let incremental_after_seqs = extensions + .get(FLOW_INCREMENTAL_AFTER_SEQS) + .map(|value| parse_incremental_after_seqs(value.as_str())) + .transpose()?; + + let return_region_seq = extensions + .get(FLOW_RETURN_REGION_SEQ) + .map(|value| parse_bool(value.as_str())) + .transpose()? + .unwrap_or(false); + + let sink_table_id = extensions + .get(FLOW_SINK_TABLE_ID) + .map(|value| { + value.parse::().map_err(|_| { + invalid_query_context_extension(format!( + "Invalid value for {}: {}", + FLOW_SINK_TABLE_ID, value + )) + }) + }) + .transpose()?; + + if matches!(incremental_mode, Some(FlowIncrementalMode::MemtableOnly)) { + let after_seqs = incremental_after_seqs.as_ref().ok_or_else(|| { + invalid_query_context_extension(format!( + "{} is required when {}={}.", + FLOW_INCREMENTAL_AFTER_SEQS, + FLOW_INCREMENTAL_MODE, + FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY + )) + })?; + if after_seqs.is_empty() { + return Err(invalid_query_context_extension(format!( + "{} must not be empty when {}={}.", + FLOW_INCREMENTAL_AFTER_SEQS, + FLOW_INCREMENTAL_MODE, + FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY + ))); + } + } + + Ok(Self { + incremental_after_seqs, + incremental_mode, + return_region_seq, + sink_table_id, + }) + } + + pub fn validate_for_scan(&self, source_region_id: RegionId) -> Result { + if self.sink_table_id.is_some() && self.sink_table_id == Some(source_region_id.table_id()) { + return Ok(false); + } + + if matches!( + self.incremental_mode, + Some(FlowIncrementalMode::MemtableOnly) + ) { + let after_seqs = self.incremental_after_seqs.as_ref().ok_or_else(|| { + invalid_query_context_extension(format!( + "{} is required when {}=memtable_only.", + FLOW_INCREMENTAL_AFTER_SEQS, FLOW_INCREMENTAL_MODE + )) + })?; + + if !after_seqs.contains_key(&source_region_id.as_u64()) { + return Err(invalid_query_context_extension(format!( + "Missing region {} in {} when {}=memtable_only.", + source_region_id, FLOW_INCREMENTAL_AFTER_SEQS, FLOW_INCREMENTAL_MODE + ))); + } + } + + Ok(self.incremental_after_seqs.is_some()) + } + + pub fn should_collect_region_watermark(&self) -> bool { + self.return_region_seq || self.incremental_after_seqs.is_some() + } +} + +fn parse_incremental_after_seqs(value: &str) -> Result> { + let raw = serde_json::from_str::>(value).map_err(|e| { + invalid_query_context_extension(format!( + "Invalid JSON for {}: {} ({})", + FLOW_INCREMENTAL_AFTER_SEQS, value, e + )) + })?; + + raw.into_iter() + .map(|(region_id, raw_seq)| { + let region_id = region_id.parse::().map_err(|_| { + invalid_query_context_extension(format!( + "Invalid region id in {}: {}", + FLOW_INCREMENTAL_AFTER_SEQS, region_id + )) + })?; + + let seq = match raw_seq { + serde_json::Value::Number(num) => num.as_u64().ok_or_else(|| { + invalid_query_context_extension(format!( + "Invalid sequence value in {} for region {}: {}", + FLOW_INCREMENTAL_AFTER_SEQS, region_id, num + )) + })?, + serde_json::Value::String(s) => s.parse::().map_err(|_| { + invalid_query_context_extension(format!( + "Invalid sequence string in {} for region {}: {}", + FLOW_INCREMENTAL_AFTER_SEQS, region_id, s + )) + })?, + _ => { + return Err(invalid_query_context_extension(format!( + "Invalid sequence value type in {} for region {}", + FLOW_INCREMENTAL_AFTER_SEQS, region_id + ))); + } + }; + + Ok((region_id, seq)) + }) + .collect() +} + +fn parse_bool(value: &str) -> Result { + match value { + v if v.eq_ignore_ascii_case("true") => Ok(true), + v if v.eq_ignore_ascii_case("false") => Ok(false), + _ => Err(invalid_query_context_extension(format!( + "Invalid value for {}: {}", + FLOW_RETURN_REGION_SEQ, value + ))), + } +} + +fn invalid_query_context_extension(reason: String) -> Error { + InvalidQueryContextExtensionSnafu { reason }.build() +} + +#[cfg(test)] +mod flow_extension_tests { + use super::*; + + #[test] + fn test_parse_flow_extensions_default() { + let exts = HashMap::new(); + let parsed = FlowQueryExtensions::from_extensions(&exts).unwrap(); + + assert_eq!(parsed.incremental_mode, None); + assert_eq!(parsed.incremental_after_seqs, None); + assert!(!parsed.return_region_seq); + assert_eq!(parsed.sink_table_id, None); + } + + #[test] + fn test_parse_flow_extensions_memtable_only_success() { + let exts = HashMap::from([ + ( + FLOW_INCREMENTAL_MODE.to_string(), + FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string(), + ), + ( + FLOW_INCREMENTAL_AFTER_SEQS.to_string(), + r#"{"1":10,"2":20}"#.to_string(), + ), + (FLOW_RETURN_REGION_SEQ.to_string(), "true".to_string()), + (FLOW_SINK_TABLE_ID.to_string(), "1024".to_string()), + ]); + + let parsed = FlowQueryExtensions::from_extensions(&exts).unwrap(); + assert_eq!( + parsed.incremental_mode, + Some(FlowIncrementalMode::MemtableOnly) + ); + assert_eq!( + parsed.incremental_after_seqs.unwrap(), + HashMap::from([(1, 10), (2, 20)]) + ); + assert!(parsed.return_region_seq); + assert_eq!(parsed.sink_table_id, Some(1024)); + } + + #[test] + fn test_parse_flow_extensions_mode_requires_after_seqs() { + let exts = HashMap::from([( + FLOW_INCREMENTAL_MODE.to_string(), + FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string(), + )]); + + let err = FlowQueryExtensions::from_extensions(&exts).unwrap_err(); + assert!(format!("{err}").contains(FLOW_INCREMENTAL_AFTER_SEQS)); + } + + #[test] + fn test_parse_flow_extensions_invalid_mode() { + let exts = HashMap::from([(FLOW_INCREMENTAL_MODE.to_string(), "foo".to_string())]); + + let err = FlowQueryExtensions::from_extensions(&exts).unwrap_err(); + assert!(format!("{err}").contains(FLOW_INCREMENTAL_MODE)); + } + + #[test] + fn test_parse_flow_extensions_invalid_after_seqs_json() { + let exts = HashMap::from([ + ( + FLOW_INCREMENTAL_MODE.to_string(), + FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string(), + ), + ( + FLOW_INCREMENTAL_AFTER_SEQS.to_string(), + "not-json".to_string(), + ), + ]); + + let err = FlowQueryExtensions::from_extensions(&exts).unwrap_err(); + assert!(format!("{err}").contains(FLOW_INCREMENTAL_AFTER_SEQS)); + } + + #[test] + fn test_parse_flow_extensions_after_seqs_string_values() { + let exts = HashMap::from([( + FLOW_INCREMENTAL_AFTER_SEQS.to_string(), + r#"{"1":"10","2":"20"}"#.to_string(), + )]); + + let parsed = FlowQueryExtensions::from_extensions(&exts).unwrap(); + assert_eq!( + parsed.incremental_after_seqs.unwrap(), + HashMap::from([(1, 10), (2, 20)]) + ); + } + + #[test] + fn test_parse_flow_extensions_after_seqs_invalid_value_type() { + let exts = HashMap::from([( + FLOW_INCREMENTAL_AFTER_SEQS.to_string(), + r#"{"1":true}"#.to_string(), + )]); + + let err = FlowQueryExtensions::from_extensions(&exts).unwrap_err(); + assert!(format!("{err}").contains(FLOW_INCREMENTAL_AFTER_SEQS)); + } + + #[test] + fn test_parse_flow_extensions_invalid_sink_table_id() { + let exts = HashMap::from([(FLOW_SINK_TABLE_ID.to_string(), "x".to_string())]); + + let err = FlowQueryExtensions::from_extensions(&exts).unwrap_err(); + assert!(format!("{err}").contains(FLOW_SINK_TABLE_ID)); + } + + #[test] + fn test_validate_for_scan_missing_source_region() { + let source_region_id = RegionId::new(100, 2); + let existing_region_id = RegionId::new(100, 1); + let exts = HashMap::from([ + ( + FLOW_INCREMENTAL_MODE.to_string(), + FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string(), + ), + ( + FLOW_INCREMENTAL_AFTER_SEQS.to_string(), + format!(r#"{{"{}":10}}"#, existing_region_id.as_u64()), + ), + ]); + + let parsed = FlowQueryExtensions::from_extensions(&exts).unwrap(); + let err = parsed.validate_for_scan(source_region_id).unwrap_err(); + assert!(format!("{err}").contains("Missing region")); + } + + #[test] + fn test_validate_for_scan_sink_table_excluded() { + let source_region_id = RegionId::new(1024, 1); + let exts = HashMap::from([ + ( + FLOW_INCREMENTAL_MODE.to_string(), + FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string(), + ), + ( + FLOW_INCREMENTAL_AFTER_SEQS.to_string(), + format!(r#"{{"{}":10}}"#, source_region_id.as_u64()), + ), + (FLOW_SINK_TABLE_ID.to_string(), "1024".to_string()), + ]); + + let parsed = FlowQueryExtensions::from_extensions(&exts).unwrap(); + let apply_incremental = parsed.validate_for_scan(source_region_id).unwrap(); + assert!(!apply_incremental); + } + + #[test] + fn test_should_collect_region_watermark_defaults_false() { + let parsed = FlowQueryExtensions::default(); + assert!(!parsed.should_collect_region_watermark()); + } + + #[test] + fn test_should_collect_region_watermark_true_for_return_region_seq() { + let parsed = FlowQueryExtensions { + return_region_seq: true, + ..Default::default() + }; + assert!(parsed.should_collect_region_watermark()); + } + + #[test] + fn test_should_collect_region_watermark_true_for_incremental_query() { + let parsed = FlowQueryExtensions { + incremental_after_seqs: Some(HashMap::from([(1, 10)])), + ..Default::default() + }; + assert!(parsed.should_collect_region_watermark()); + } +} diff --git a/src/session/src/context.rs b/src/session/src/context.rs index 2b9483aca8..5f16ea8b5a 100644 --- a/src/session/src/context.rs +++ b/src/session/src/context.rs @@ -433,10 +433,21 @@ impl QueryContext { self.snapshot_seqs.read().unwrap().clone() } + pub fn sst_min_sequences(&self) -> HashMap { + self.sst_min_sequences.read().unwrap().clone() + } + pub fn get_snapshot(&self, region_id: u64) -> Option { self.snapshot_seqs.read().unwrap().get(®ion_id).cloned() } + pub fn set_snapshot(&self, region_id: u64, sequence: u64) { + self.snapshot_seqs + .write() + .unwrap() + .insert(region_id, sequence); + } + /// Returns `true` if the session can cast strings to numbers in MySQL style. pub fn auto_string_to_numeric(&self) -> bool { matches!(self.channel, Channel::Mysql) @@ -669,6 +680,8 @@ impl ConfigurationVariables { #[cfg(test)] mod test { + use std::collections::HashMap; + use common_catalog::consts::DEFAULT_CATALOG_NAME; use super::*; @@ -704,4 +717,30 @@ mod test { let context = QueryContext::with(DEFAULT_CATALOG_NAME, "test"); assert_eq!("test", context.get_db_string()); } + + #[test] + fn test_api_query_context_roundtrip_with_sequences() { + let api_ctx = api::v1::QueryContext { + current_catalog: "c1".to_string(), + current_schema: "s1".to_string(), + timezone: "UTC".to_string(), + extensions: HashMap::from([("flow.return_region_seq".to_string(), "true".to_string())]), + channel: Channel::Grpc as u32, + snapshot_seqs: Some(api::v1::SnapshotSequences { + snapshot_seqs: HashMap::from([(1, 100)]), + sst_min_sequences: HashMap::from([(1, 90)]), + }), + explain: None, + }; + + let session_ctx: QueryContext = api_ctx.clone().into(); + let roundtrip_api: api::v1::QueryContext = session_ctx.into(); + + assert_eq!(roundtrip_api.current_catalog, api_ctx.current_catalog); + assert_eq!(roundtrip_api.current_schema, api_ctx.current_schema); + assert_eq!(roundtrip_api.timezone, api_ctx.timezone); + assert_eq!(roundtrip_api.extensions, api_ctx.extensions); + assert_eq!(roundtrip_api.channel, api_ctx.channel); + assert_eq!(roundtrip_api.snapshot_seqs, api_ctx.snapshot_seqs); + } } diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 115c841f93..287f64d225 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -462,6 +462,10 @@ pub trait RegionScanner: Debug + DisplayAs + Send { /// Sets whether the scanner is reading a logical region. fn set_logical_region(&mut self, logical_region: bool); + + fn snapshot_sequence(&self) -> Option { + None + } } pub type RegionScannerRef = Box; @@ -945,6 +949,7 @@ pub struct SinglePartitionScanner { schema: SchemaRef, properties: ScannerProperties, metadata: RegionMetadataRef, + snapshot_sequence: Option, } impl SinglePartitionScanner { @@ -953,6 +958,7 @@ impl SinglePartitionScanner { stream: SendableRecordBatchStream, append_mode: bool, metadata: RegionMetadataRef, + snapshot_sequence: Option, ) -> Self { let schema = stream.schema(); Self { @@ -960,6 +966,7 @@ impl SinglePartitionScanner { schema, properties: ScannerProperties::default().with_append_mode(append_mode), metadata, + snapshot_sequence, } } } @@ -1019,6 +1026,10 @@ impl RegionScanner for SinglePartitionScanner { fn set_logical_region(&mut self, logical_region: bool) { self.properties.set_logical_region(logical_region); } + + fn snapshot_sequence(&self) -> Option { + self.snapshot_sequence + } } impl DisplayAs for SinglePartitionScanner { diff --git a/src/store-api/src/storage/requests.rs b/src/store-api/src/storage/requests.rs index 6725de92e3..d072ec1b39 100644 --- a/src/store-api/src/storage/requests.rs +++ b/src/store-api/src/storage/requests.rs @@ -112,6 +112,8 @@ pub struct ScanRequest { /// Optional constraint on the sequence number of the rows to read. /// If set, only rows with a sequence number **lesser or equal** to this value /// will be returned. + /// This is the effective memtable upper bound used by the scan, whether provided + /// explicitly or bound on scan open. pub memtable_max_sequence: Option, /// Optional constraint on the minimal sequence number in the memtable. /// If set, only the memtables that contain sequences **greater than** this value will be scanned @@ -119,6 +121,8 @@ pub struct ScanRequest { /// Optional constraint on the minimal sequence number in the SST files. /// If set, only the SST files that contain sequences greater than this value will be scanned. pub sst_min_sequence: Option, + /// Whether to bind the effective snapshot upper bound when opening the scan. + pub snapshot_on_scan: bool, /// Optional hint for the distribution of time-series data. pub distribution: Option, /// Optional hint for KNN vector search. When set, the scan should use @@ -195,6 +199,14 @@ impl Display for ScanRequest { sst_min_sequence )?; } + if self.snapshot_on_scan { + write!( + f, + "{}snapshot_on_scan: {}", + delimiter.as_str(), + self.snapshot_on_scan + )?; + } if let Some(distribution) = &self.distribution { write!(f, "{}distribution: {}", delimiter.as_str(), distribution)?; } @@ -278,5 +290,14 @@ mod tests { request.to_string(), "ScanRequest { force_flat_format: true }" ); + + let request = ScanRequest { + snapshot_on_scan: true, + ..Default::default() + }; + assert_eq!( + request.to_string(), + "ScanRequest { snapshot_on_scan: true }" + ); } } diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index 83319f2688..02511456ae 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -599,7 +599,12 @@ mod test { .primary_key(vec![1]); let region_metadata = Arc::new(builder.build().unwrap()); - let scanner = Box::new(SinglePartitionScanner::new(stream, false, region_metadata)); + let scanner = Box::new(SinglePartitionScanner::new( + stream, + false, + region_metadata, + None, + )); let plan = RegionScanExec::new(scanner, ScanRequest::default(), None).unwrap(); let actual: SchemaRef = Arc::new( plan.properties