From e6eb272137aa98693c7639b00207e3ee457e99bb Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 16 Mar 2026 18:07:55 +0800 Subject: [PATCH] feat: add fields for inc query in query ctx Signed-off-by: discord9 --- src/common/meta/src/ddl/tests/create_flow.rs | 22 ++ src/common/meta/src/rpc/ddl.rs | 128 ++++++++- src/operator/src/utils.rs | 40 +++ src/query/src/options.rs | 278 +++++++++++++++++++ src/session/src/context.rs | 32 +++ 5 files changed, 489 insertions(+), 11 deletions(-) diff --git a/src/common/meta/src/ddl/tests/create_flow.rs b/src/common/meta/src/ddl/tests/create_flow.rs index 8803f98e0d..5b22c81857 100644 --- a/src/common/meta/src/ddl/tests/create_flow.rs +++ b/src/common/meta/src/ddl/tests/create_flow.rs @@ -37,6 +37,8 @@ fn test_query_context() -> QueryContext { timezone: "UTC".to_string(), extensions: HashMap::new(), channel: 0, + snapshot_seqs: HashMap::new(), + sst_min_sequences: HashMap::new(), } } @@ -251,6 +253,10 @@ fn test_create_flow_data_new_format_serialization() { catalog: "new_catalog".to_string(), schema: "new_schema".to_string(), timezone: "America/New_York".to_string(), + extensions: HashMap::new(), + channel: 0, + snapshot_seqs: HashMap::new(), + sst_min_sequences: HashMap::new(), }; let data = CreateFlowData { @@ -272,6 +278,9 @@ fn test_create_flow_data_new_format_serialization() { assert_eq!(deserialized.flow_context.catalog, "new_catalog"); assert_eq!(deserialized.flow_context.schema, "new_schema"); assert_eq!(deserialized.flow_context.timezone, "America/New_York"); + assert_eq!(deserialized.flow_context.channel, 0); + assert_eq!(deserialized.flow_context.snapshot_seqs, HashMap::new()); + assert_eq!(deserialized.flow_context.sst_min_sequences, HashMap::new()); } #[test] @@ -286,6 +295,8 @@ fn test_flow_query_context_conversion_from_query_context() { ] .into(), channel: 99, + snapshot_seqs: HashMap::from([(1, 10)]), + sst_min_sequences: HashMap::from([(1, 8)]), }; let flow_context: FlowQueryContext = query_context.into(); @@ -293,6 +304,9 @@ fn test_flow_query_context_conversion_from_query_context() { assert_eq!(flow_context.catalog, "prod_catalog"); assert_eq!(flow_context.schema, "public"); assert_eq!(flow_context.timezone, "America/Los_Angeles"); + assert_eq!(flow_context.channel, 99); + assert_eq!(flow_context.snapshot_seqs, HashMap::from([(1, 10)])); + assert_eq!(flow_context.sst_min_sequences, HashMap::from([(1, 8)])); } #[test] @@ -301,6 +315,10 @@ fn test_flow_info_conversion_with_flow_context() { catalog: "info_catalog".to_string(), schema: "info_schema".to_string(), timezone: "Europe/Berlin".to_string(), + extensions: HashMap::new(), + channel: 0, + snapshot_seqs: HashMap::new(), + sst_min_sequences: HashMap::new(), }; let data = CreateFlowData { @@ -349,6 +367,10 @@ fn test_mixed_serialization_format_support() { catalog: "test".to_string(), schema: "test".to_string(), timezone: "UTC".to_string(), + extensions: HashMap::new(), + channel: 0, + snapshot_seqs: HashMap::new(), + sst_min_sequences: HashMap::new(), }; assert_eq!(ctx_from_new, expected_new); } diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index 1e94a8f092..cf96676d26 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -1432,6 +1432,10 @@ pub struct QueryContext { pub timezone: String, pub extensions: HashMap, pub channel: u8, + #[serde(default)] + pub snapshot_seqs: HashMap, + #[serde(default)] + pub sst_min_sequences: HashMap, } impl QueryContext { @@ -1459,6 +1463,14 @@ impl QueryContext { pub fn channel(&self) -> u8 { self.channel } + + pub fn snapshot_seqs(&self) -> &HashMap { + &self.snapshot_seqs + } + + pub fn sst_min_sequences(&self) -> &HashMap { + &self.sst_min_sequences + } } /// Lightweight query context for flow operations containing only essential fields. @@ -1466,12 +1478,17 @@ impl QueryContext { /// for flow creation and execution. #[derive(Debug, Clone, Serialize, PartialEq)] pub struct FlowQueryContext { - /// Current catalog name - needed for flow metadata and recovery pub catalog: String, - /// Current schema name - needed for table resolution during flow execution pub schema: String, - /// Timezone for timestamp operations in the flow pub timezone: String, + #[serde(default)] + pub extensions: HashMap, + #[serde(default)] + pub channel: u8, + #[serde(default)] + pub snapshot_seqs: HashMap, + #[serde(default)] + pub sst_min_sequences: HashMap, } impl<'de> Deserialize<'de> for FlowQueryContext { @@ -1492,6 +1509,14 @@ impl<'de> Deserialize<'de> for FlowQueryContext { catalog: String, schema: String, timezone: String, + #[serde(default)] + extensions: HashMap, + #[serde(default)] + channel: u8, + #[serde(default)] + snapshot_seqs: HashMap, + #[serde(default)] + sst_min_sequences: HashMap, } match ContextCompat::deserialize(deserializer)? { @@ -1499,6 +1524,10 @@ impl<'de> Deserialize<'de> for FlowQueryContext { catalog: helper.catalog, schema: helper.schema, timezone: helper.timezone, + extensions: helper.extensions, + channel: helper.channel, + snapshot_seqs: helper.snapshot_seqs, + sst_min_sequences: helper.sst_min_sequences, }), ContextCompat::Full(full_ctx) => Ok(full_ctx.into()), } @@ -1507,12 +1536,21 @@ impl<'de> Deserialize<'de> for FlowQueryContext { impl From for QueryContext { fn from(pb_ctx: PbQueryContext) -> Self { + let snapshot_sequences = pb_ctx.snapshot_seqs; Self { current_catalog: pb_ctx.current_catalog, current_schema: pb_ctx.current_schema, timezone: pb_ctx.timezone, extensions: pb_ctx.extensions, channel: pb_ctx.channel as u8, + snapshot_seqs: snapshot_sequences + .as_ref() + .map(|x| x.snapshot_seqs.clone()) + .unwrap_or_default(), + sst_min_sequences: snapshot_sequences + .as_ref() + .map(|x| x.sst_min_sequences.clone()) + .unwrap_or_default(), } } } @@ -1525,6 +1563,8 @@ impl From for PbQueryContext { timezone, extensions, channel, + snapshot_seqs, + sst_min_sequences, }: QueryContext, ) -> Self { PbQueryContext { @@ -1533,7 +1573,10 @@ impl From for PbQueryContext { timezone, extensions, channel: channel as u32, - snapshot_seqs: None, + snapshot_seqs: Some(api::v1::SnapshotSequences { + snapshot_seqs, + sst_min_sequences, + }), explain: None, } } @@ -1545,6 +1588,10 @@ impl From for FlowQueryContext { catalog: ctx.current_catalog, schema: ctx.current_schema, timezone: ctx.timezone, + extensions: ctx.extensions, + channel: ctx.channel, + snapshot_seqs: ctx.snapshot_seqs, + sst_min_sequences: ctx.sst_min_sequences, } } } @@ -1555,8 +1602,10 @@ impl From for QueryContext { current_catalog: flow_ctx.catalog, current_schema: flow_ctx.schema, timezone: flow_ctx.timezone, - extensions: HashMap::new(), - channel: 0, // Use default channel for flows + extensions: flow_ctx.extensions, + channel: flow_ctx.channel, + snapshot_seqs: flow_ctx.snapshot_seqs, + sst_min_sequences: flow_ctx.sst_min_sequences, } } } @@ -1720,6 +1769,8 @@ mod tests { timezone: "UTC".to_string(), extensions, channel: 5, + snapshot_seqs: HashMap::from([(10, 100)]), + sst_min_sequences: HashMap::from([(10, 90)]), }; let flow_ctx: FlowQueryContext = query_ctx.into(); @@ -1727,6 +1778,9 @@ mod tests { assert_eq!(flow_ctx.catalog, "test_catalog"); assert_eq!(flow_ctx.schema, "test_schema"); assert_eq!(flow_ctx.timezone, "UTC"); + assert_eq!(flow_ctx.channel, 5); + assert_eq!(flow_ctx.snapshot_seqs, HashMap::from([(10, 100)])); + assert_eq!(flow_ctx.sst_min_sequences, HashMap::from([(10, 90)])); } #[test] @@ -1735,6 +1789,10 @@ mod tests { catalog: "prod_catalog".to_string(), schema: "public".to_string(), timezone: "America/New_York".to_string(), + extensions: HashMap::from([("k".to_string(), "v".to_string())]), + channel: 7, + snapshot_seqs: HashMap::from([(11, 111)]), + sst_min_sequences: HashMap::from([(11, 101)]), }; let query_ctx: QueryContext = flow_ctx.clone().into(); @@ -1742,8 +1800,13 @@ mod tests { assert_eq!(query_ctx.current_catalog, "prod_catalog"); assert_eq!(query_ctx.current_schema, "public"); assert_eq!(query_ctx.timezone, "America/New_York"); - assert!(query_ctx.extensions.is_empty()); - assert_eq!(query_ctx.channel, 0); + assert_eq!( + query_ctx.extensions, + HashMap::from([("k".to_string(), "v".to_string())]) + ); + assert_eq!(query_ctx.channel, 7); + assert_eq!(query_ctx.snapshot_seqs, HashMap::from([(11, 111)])); + assert_eq!(query_ctx.sst_min_sequences, HashMap::from([(11, 101)])); // Test roundtrip conversion let flow_ctx_roundtrip: FlowQueryContext = query_ctx.into(); @@ -1756,6 +1819,10 @@ mod tests { catalog: "test_catalog".to_string(), schema: "test_schema".to_string(), timezone: "UTC".to_string(), + extensions: HashMap::new(), + channel: 0, + snapshot_seqs: HashMap::new(), + sst_min_sequences: HashMap::new(), }; let serialized = serde_json::to_string(&flow_ctx).unwrap(); @@ -1776,6 +1843,10 @@ mod tests { catalog: "pb_catalog".to_string(), schema: "pb_schema".to_string(), timezone: "Asia/Tokyo".to_string(), + extensions: HashMap::from([("x".to_string(), "y".to_string())]), + channel: 6, + snapshot_seqs: HashMap::from([(3, 30)]), + sst_min_sequences: HashMap::from([(3, 21)]), }; let pb_ctx: PbQueryContext = flow_ctx.into(); @@ -1783,9 +1854,44 @@ mod tests { assert_eq!(pb_ctx.current_catalog, "pb_catalog"); assert_eq!(pb_ctx.current_schema, "pb_schema"); assert_eq!(pb_ctx.timezone, "Asia/Tokyo"); - assert!(pb_ctx.extensions.is_empty()); - assert_eq!(pb_ctx.channel, 0); - assert!(pb_ctx.snapshot_seqs.is_none()); + assert_eq!( + pb_ctx.extensions, + HashMap::from([("x".to_string(), "y".to_string())]) + ); + assert_eq!(pb_ctx.channel, 6); + assert_eq!( + pb_ctx.snapshot_seqs, + Some(api::v1::SnapshotSequences { + snapshot_seqs: HashMap::from([(3, 30)]), + sst_min_sequences: HashMap::from([(3, 21)]), + }) + ); assert!(pb_ctx.explain.is_none()); } + + #[test] + fn test_pb_query_context_roundtrip_with_snapshot_sequences() { + let pb = PbQueryContext { + current_catalog: "c1".to_string(), + current_schema: "s1".to_string(), + timezone: "UTC".to_string(), + extensions: HashMap::from([("flow.return_region_seq".to_string(), "true".to_string())]), + channel: 3, + snapshot_seqs: Some(api::v1::SnapshotSequences { + snapshot_seqs: HashMap::from([(1, 100)]), + sst_min_sequences: HashMap::from([(1, 90)]), + }), + explain: None, + }; + + let query_ctx: QueryContext = pb.clone().into(); + let pb_roundtrip: PbQueryContext = query_ctx.into(); + + assert_eq!(pb_roundtrip.current_catalog, pb.current_catalog); + assert_eq!(pb_roundtrip.current_schema, pb.current_schema); + assert_eq!(pb_roundtrip.timezone, pb.timezone); + assert_eq!(pb_roundtrip.extensions, pb.extensions); + assert_eq!(pb_roundtrip.channel, pb.channel); + assert_eq!(pb_roundtrip.snapshot_seqs, pb.snapshot_seqs); + } } diff --git a/src/operator/src/utils.rs b/src/operator/src/utils.rs index 93da5f028e..6e9386b3fa 100644 --- a/src/operator/src/utils.rs +++ b/src/operator/src/utils.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::{Arc, RwLock}; + use common_time::Timezone; use session::context::{QueryContextBuilder, QueryContextRef}; use snafu::ResultExt; @@ -27,6 +29,8 @@ pub fn to_meta_query_context( timezone: query_context.timezone().to_string(), extensions: query_context.extensions(), channel: query_context.channel() as u8, + snapshot_seqs: query_context.snapshots(), + sst_min_sequences: query_context.sst_min_sequences(), } } @@ -43,5 +47,41 @@ pub fn try_to_session_query_context( ) .extensions(value.extensions) .channel((value.channel as u32).into()) + .snapshot_seqs(Arc::new(RwLock::new(value.snapshot_seqs))) + .sst_min_sequences(Arc::new(RwLock::new(value.sst_min_sequences))) .build()) } + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::{Arc, RwLock}; + + use common_time::Timezone; + use session::context::QueryContextBuilder; + + use super::{to_meta_query_context, try_to_session_query_context}; + + #[test] + fn test_query_context_meta_roundtrip_with_sequences() { + let session_ctx = Arc::new( + QueryContextBuilder::default() + .current_catalog("c1".to_string()) + .current_schema("s1".to_string()) + .timezone(Timezone::from_tz_string("UTC").unwrap()) + .set_extension("flow.return_region_seq".to_string(), "true".to_string()) + .snapshot_seqs(Arc::new(RwLock::new(HashMap::from([(10, 100)])))) + .sst_min_sequences(Arc::new(RwLock::new(HashMap::from([(10, 90)])))) + .build(), + ); + + let meta_ctx = to_meta_query_context(session_ctx); + let roundtrip = try_to_session_query_context(meta_ctx).unwrap(); + + assert_eq!(roundtrip.current_catalog(), "c1"); + assert_eq!(roundtrip.current_schema(), "s1"); + assert_eq!(roundtrip.snapshots(), HashMap::from([(10, 100)])); + assert_eq!(roundtrip.sst_min_sequences(), HashMap::from([(10, 90)])); + assert_eq!(roundtrip.extension("flow.return_region_seq"), Some("true")); + } +} diff --git a/src/query/src/options.rs b/src/query/src/options.rs index 50ca1177a5..f807c1c16a 100644 --- a/src/query/src/options.rs +++ b/src/query/src/options.rs @@ -12,8 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + use common_base::memory_limit::MemoryLimit; use serde::{Deserialize, Serialize}; +use table::metadata::TableId; + +pub const FLOW_INCREMENTAL_AFTER_SEQS: &str = "flow.incremental_after_seqs"; +pub const FLOW_INCREMENTAL_MODE: &str = "flow.incremental_mode"; +pub const FLOW_RETURN_REGION_SEQ: &str = "flow.return_region_seq"; +pub const FLOW_SINK_TABLE_ID: &str = "flow.sink_table_id"; + +const FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY: &str = "memtable_only"; /// Query engine config #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] @@ -39,3 +49,271 @@ impl Default for QueryOptions { } } } + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum FlowIncrementalMode { + MemtableOnly, +} + +#[derive(Debug, Clone, PartialEq, Eq, Default)] +pub struct FlowQueryExtensions { + pub incremental_after_seqs: Option>, + pub incremental_mode: Option, + pub return_region_seq: bool, + pub sink_table_id: Option, +} + +impl FlowQueryExtensions { + pub fn from_extensions(extensions: &HashMap) -> Result { + let incremental_mode = extensions + .get(FLOW_INCREMENTAL_MODE) + .map(|value| match value.as_str() { + v if v.eq_ignore_ascii_case(FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY) => { + Ok(FlowIncrementalMode::MemtableOnly) + } + _ => Err(format!( + "Invalid value for {}: {}", + FLOW_INCREMENTAL_MODE, value + )), + }) + .transpose()?; + + let incremental_after_seqs = extensions + .get(FLOW_INCREMENTAL_AFTER_SEQS) + .map(|value| parse_incremental_after_seqs(value.as_str())) + .transpose()?; + + let return_region_seq = extensions + .get(FLOW_RETURN_REGION_SEQ) + .map(|value| parse_bool(value.as_str())) + .transpose()? + .unwrap_or(false); + + let sink_table_id = extensions + .get(FLOW_SINK_TABLE_ID) + .map(|value| { + value + .parse::() + .map_err(|_| format!("Invalid value for {}: {}", FLOW_SINK_TABLE_ID, value)) + }) + .transpose()?; + + if matches!(incremental_mode, Some(FlowIncrementalMode::MemtableOnly)) { + let after_seqs = incremental_after_seqs.as_ref().ok_or_else(|| { + format!( + "{} is required when {}={}.", + FLOW_INCREMENTAL_AFTER_SEQS, + FLOW_INCREMENTAL_MODE, + FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY + ) + })?; + if after_seqs.is_empty() { + return Err(format!( + "{} must not be empty when {}={}.", + FLOW_INCREMENTAL_AFTER_SEQS, + FLOW_INCREMENTAL_MODE, + FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY + )); + } + } + + Ok(Self { + incremental_after_seqs, + incremental_mode, + return_region_seq, + sink_table_id, + }) + } + + pub fn validate_for_scan( + &self, + source_region_ids: &[u64], + current_scan_table_id: Option, + ) -> Result { + if self.sink_table_id.is_some() && self.sink_table_id == current_scan_table_id { + return Ok(false); + } + + if matches!( + self.incremental_mode, + Some(FlowIncrementalMode::MemtableOnly) + ) { + let after_seqs = self.incremental_after_seqs.as_ref().ok_or_else(|| { + format!( + "{} is required when {}=memtable_only.", + FLOW_INCREMENTAL_AFTER_SEQS, FLOW_INCREMENTAL_MODE + ) + })?; + + for region_id in source_region_ids { + if !after_seqs.contains_key(region_id) { + return Err(format!( + "Missing region {} in {} when {}=memtable_only.", + region_id, FLOW_INCREMENTAL_AFTER_SEQS, FLOW_INCREMENTAL_MODE + )); + } + } + } + + Ok(self.incremental_after_seqs.is_some()) + } +} + +fn parse_incremental_after_seqs(value: &str) -> Result, String> { + let raw = serde_json::from_str::>(value).map_err(|e| { + format!( + "Invalid JSON for {}: {} ({})", + FLOW_INCREMENTAL_AFTER_SEQS, value, e + ) + })?; + + raw.into_iter() + .map(|(region_id, seq)| { + region_id + .parse::() + .map(|region_id| (region_id, seq)) + .map_err(|_| { + format!( + "Invalid region id in {}: {}", + FLOW_INCREMENTAL_AFTER_SEQS, region_id + ) + }) + }) + .collect() +} + +fn parse_bool(value: &str) -> Result { + match value { + v if v.eq_ignore_ascii_case("true") => Ok(true), + v if v.eq_ignore_ascii_case("false") => Ok(false), + _ => Err(format!( + "Invalid value for {}: {}", + FLOW_RETURN_REGION_SEQ, value + )), + } +} + +#[cfg(test)] +mod flow_extension_tests { + use super::*; + + #[test] + fn test_parse_flow_extensions_default() { + let exts = HashMap::new(); + let parsed = FlowQueryExtensions::from_extensions(&exts).unwrap(); + + assert_eq!(parsed.incremental_mode, None); + assert_eq!(parsed.incremental_after_seqs, None); + assert!(!parsed.return_region_seq); + assert_eq!(parsed.sink_table_id, None); + } + + #[test] + fn test_parse_flow_extensions_memtable_only_success() { + let exts = HashMap::from([ + ( + FLOW_INCREMENTAL_MODE.to_string(), + FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string(), + ), + ( + FLOW_INCREMENTAL_AFTER_SEQS.to_string(), + r#"{"1":10,"2":20}"#.to_string(), + ), + (FLOW_RETURN_REGION_SEQ.to_string(), "true".to_string()), + (FLOW_SINK_TABLE_ID.to_string(), "1024".to_string()), + ]); + + let parsed = FlowQueryExtensions::from_extensions(&exts).unwrap(); + assert_eq!( + parsed.incremental_mode, + Some(FlowIncrementalMode::MemtableOnly) + ); + assert_eq!( + parsed.incremental_after_seqs.unwrap(), + HashMap::from([(1, 10), (2, 20)]) + ); + assert!(parsed.return_region_seq); + assert_eq!(parsed.sink_table_id, Some(1024)); + } + + #[test] + fn test_parse_flow_extensions_mode_requires_after_seqs() { + let exts = HashMap::from([( + FLOW_INCREMENTAL_MODE.to_string(), + FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string(), + )]); + + let err = FlowQueryExtensions::from_extensions(&exts).unwrap_err(); + assert!(err.contains(FLOW_INCREMENTAL_AFTER_SEQS)); + } + + #[test] + fn test_parse_flow_extensions_invalid_mode() { + let exts = HashMap::from([(FLOW_INCREMENTAL_MODE.to_string(), "foo".to_string())]); + + let err = FlowQueryExtensions::from_extensions(&exts).unwrap_err(); + assert!(err.contains(FLOW_INCREMENTAL_MODE)); + } + + #[test] + fn test_parse_flow_extensions_invalid_after_seqs_json() { + let exts = HashMap::from([ + ( + FLOW_INCREMENTAL_MODE.to_string(), + FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string(), + ), + ( + FLOW_INCREMENTAL_AFTER_SEQS.to_string(), + "not-json".to_string(), + ), + ]); + + let err = FlowQueryExtensions::from_extensions(&exts).unwrap_err(); + assert!(err.contains(FLOW_INCREMENTAL_AFTER_SEQS)); + } + + #[test] + fn test_parse_flow_extensions_invalid_sink_table_id() { + let exts = HashMap::from([(FLOW_SINK_TABLE_ID.to_string(), "x".to_string())]); + + let err = FlowQueryExtensions::from_extensions(&exts).unwrap_err(); + assert!(err.contains(FLOW_SINK_TABLE_ID)); + } + + #[test] + fn test_validate_for_scan_missing_source_region() { + let exts = HashMap::from([ + ( + FLOW_INCREMENTAL_MODE.to_string(), + FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string(), + ), + ( + FLOW_INCREMENTAL_AFTER_SEQS.to_string(), + r#"{"1":10}"#.to_string(), + ), + ]); + + let parsed = FlowQueryExtensions::from_extensions(&exts).unwrap(); + let err = parsed.validate_for_scan(&[1, 2], Some(100)).unwrap_err(); + assert!(err.contains("Missing region 2")); + } + + #[test] + fn test_validate_for_scan_sink_table_excluded() { + let exts = HashMap::from([ + ( + FLOW_INCREMENTAL_MODE.to_string(), + FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string(), + ), + ( + FLOW_INCREMENTAL_AFTER_SEQS.to_string(), + r#"{"1":10}"#.to_string(), + ), + (FLOW_SINK_TABLE_ID.to_string(), "1024".to_string()), + ]); + + let parsed = FlowQueryExtensions::from_extensions(&exts).unwrap(); + let apply_incremental = parsed.validate_for_scan(&[1, 2], Some(1024)).unwrap(); + assert!(!apply_incremental); + } +} diff --git a/src/session/src/context.rs b/src/session/src/context.rs index 2b9483aca8..56a38c9d72 100644 --- a/src/session/src/context.rs +++ b/src/session/src/context.rs @@ -433,6 +433,10 @@ impl QueryContext { self.snapshot_seqs.read().unwrap().clone() } + pub fn sst_min_sequences(&self) -> HashMap { + self.sst_min_sequences.read().unwrap().clone() + } + pub fn get_snapshot(&self, region_id: u64) -> Option { self.snapshot_seqs.read().unwrap().get(®ion_id).cloned() } @@ -669,6 +673,8 @@ impl ConfigurationVariables { #[cfg(test)] mod test { + use std::collections::HashMap; + use common_catalog::consts::DEFAULT_CATALOG_NAME; use super::*; @@ -704,4 +710,30 @@ mod test { let context = QueryContext::with(DEFAULT_CATALOG_NAME, "test"); assert_eq!("test", context.get_db_string()); } + + #[test] + fn test_api_query_context_roundtrip_with_sequences() { + let api_ctx = api::v1::QueryContext { + current_catalog: "c1".to_string(), + current_schema: "s1".to_string(), + timezone: "UTC".to_string(), + extensions: HashMap::from([("flow.return_region_seq".to_string(), "true".to_string())]), + channel: Channel::Grpc as u32, + snapshot_seqs: Some(api::v1::SnapshotSequences { + snapshot_seqs: HashMap::from([(1, 100)]), + sst_min_sequences: HashMap::from([(1, 90)]), + }), + explain: None, + }; + + let session_ctx: QueryContext = api_ctx.clone().into(); + let roundtrip_api: api::v1::QueryContext = session_ctx.into(); + + assert_eq!(roundtrip_api.current_catalog, api_ctx.current_catalog); + assert_eq!(roundtrip_api.current_schema, api_ctx.current_schema); + assert_eq!(roundtrip_api.timezone, api_ctx.timezone); + assert_eq!(roundtrip_api.extensions, api_ctx.extensions); + assert_eq!(roundtrip_api.channel, api_ctx.channel); + assert_eq!(roundtrip_api.snapshot_seqs, api_ctx.snapshot_seqs); + } }