mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-13 19:40:39 +00:00
feat: add fields for inc query in query ctx
Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
@@ -37,6 +37,8 @@ fn test_query_context() -> QueryContext {
|
||||
timezone: "UTC".to_string(),
|
||||
extensions: HashMap::new(),
|
||||
channel: 0,
|
||||
snapshot_seqs: HashMap::new(),
|
||||
sst_min_sequences: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -251,6 +253,10 @@ fn test_create_flow_data_new_format_serialization() {
|
||||
catalog: "new_catalog".to_string(),
|
||||
schema: "new_schema".to_string(),
|
||||
timezone: "America/New_York".to_string(),
|
||||
extensions: HashMap::new(),
|
||||
channel: 0,
|
||||
snapshot_seqs: HashMap::new(),
|
||||
sst_min_sequences: HashMap::new(),
|
||||
};
|
||||
|
||||
let data = CreateFlowData {
|
||||
@@ -272,6 +278,9 @@ fn test_create_flow_data_new_format_serialization() {
|
||||
assert_eq!(deserialized.flow_context.catalog, "new_catalog");
|
||||
assert_eq!(deserialized.flow_context.schema, "new_schema");
|
||||
assert_eq!(deserialized.flow_context.timezone, "America/New_York");
|
||||
assert_eq!(deserialized.flow_context.channel, 0);
|
||||
assert_eq!(deserialized.flow_context.snapshot_seqs, HashMap::new());
|
||||
assert_eq!(deserialized.flow_context.sst_min_sequences, HashMap::new());
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -286,6 +295,8 @@ fn test_flow_query_context_conversion_from_query_context() {
|
||||
]
|
||||
.into(),
|
||||
channel: 99,
|
||||
snapshot_seqs: HashMap::from([(1, 10)]),
|
||||
sst_min_sequences: HashMap::from([(1, 8)]),
|
||||
};
|
||||
|
||||
let flow_context: FlowQueryContext = query_context.into();
|
||||
@@ -293,6 +304,9 @@ fn test_flow_query_context_conversion_from_query_context() {
|
||||
assert_eq!(flow_context.catalog, "prod_catalog");
|
||||
assert_eq!(flow_context.schema, "public");
|
||||
assert_eq!(flow_context.timezone, "America/Los_Angeles");
|
||||
assert_eq!(flow_context.channel, 99);
|
||||
assert_eq!(flow_context.snapshot_seqs, HashMap::from([(1, 10)]));
|
||||
assert_eq!(flow_context.sst_min_sequences, HashMap::from([(1, 8)]));
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -301,6 +315,10 @@ fn test_flow_info_conversion_with_flow_context() {
|
||||
catalog: "info_catalog".to_string(),
|
||||
schema: "info_schema".to_string(),
|
||||
timezone: "Europe/Berlin".to_string(),
|
||||
extensions: HashMap::new(),
|
||||
channel: 0,
|
||||
snapshot_seqs: HashMap::new(),
|
||||
sst_min_sequences: HashMap::new(),
|
||||
};
|
||||
|
||||
let data = CreateFlowData {
|
||||
@@ -349,6 +367,10 @@ fn test_mixed_serialization_format_support() {
|
||||
catalog: "test".to_string(),
|
||||
schema: "test".to_string(),
|
||||
timezone: "UTC".to_string(),
|
||||
extensions: HashMap::new(),
|
||||
channel: 0,
|
||||
snapshot_seqs: HashMap::new(),
|
||||
sst_min_sequences: HashMap::new(),
|
||||
};
|
||||
assert_eq!(ctx_from_new, expected_new);
|
||||
}
|
||||
|
||||
@@ -1432,6 +1432,10 @@ pub struct QueryContext {
|
||||
pub timezone: String,
|
||||
pub extensions: HashMap<String, String>,
|
||||
pub channel: u8,
|
||||
#[serde(default)]
|
||||
pub snapshot_seqs: HashMap<u64, u64>,
|
||||
#[serde(default)]
|
||||
pub sst_min_sequences: HashMap<u64, u64>,
|
||||
}
|
||||
|
||||
impl QueryContext {
|
||||
@@ -1459,6 +1463,14 @@ impl QueryContext {
|
||||
pub fn channel(&self) -> u8 {
|
||||
self.channel
|
||||
}
|
||||
|
||||
pub fn snapshot_seqs(&self) -> &HashMap<u64, u64> {
|
||||
&self.snapshot_seqs
|
||||
}
|
||||
|
||||
pub fn sst_min_sequences(&self) -> &HashMap<u64, u64> {
|
||||
&self.sst_min_sequences
|
||||
}
|
||||
}
|
||||
|
||||
/// Lightweight query context for flow operations containing only essential fields.
|
||||
@@ -1466,12 +1478,17 @@ impl QueryContext {
|
||||
/// for flow creation and execution.
|
||||
#[derive(Debug, Clone, Serialize, PartialEq)]
|
||||
pub struct FlowQueryContext {
|
||||
/// Current catalog name - needed for flow metadata and recovery
|
||||
pub catalog: String,
|
||||
/// Current schema name - needed for table resolution during flow execution
|
||||
pub schema: String,
|
||||
/// Timezone for timestamp operations in the flow
|
||||
pub timezone: String,
|
||||
#[serde(default)]
|
||||
pub extensions: HashMap<String, String>,
|
||||
#[serde(default)]
|
||||
pub channel: u8,
|
||||
#[serde(default)]
|
||||
pub snapshot_seqs: HashMap<u64, u64>,
|
||||
#[serde(default)]
|
||||
pub sst_min_sequences: HashMap<u64, u64>,
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for FlowQueryContext {
|
||||
@@ -1492,6 +1509,14 @@ impl<'de> Deserialize<'de> for FlowQueryContext {
|
||||
catalog: String,
|
||||
schema: String,
|
||||
timezone: String,
|
||||
#[serde(default)]
|
||||
extensions: HashMap<String, String>,
|
||||
#[serde(default)]
|
||||
channel: u8,
|
||||
#[serde(default)]
|
||||
snapshot_seqs: HashMap<u64, u64>,
|
||||
#[serde(default)]
|
||||
sst_min_sequences: HashMap<u64, u64>,
|
||||
}
|
||||
|
||||
match ContextCompat::deserialize(deserializer)? {
|
||||
@@ -1499,6 +1524,10 @@ impl<'de> Deserialize<'de> for FlowQueryContext {
|
||||
catalog: helper.catalog,
|
||||
schema: helper.schema,
|
||||
timezone: helper.timezone,
|
||||
extensions: helper.extensions,
|
||||
channel: helper.channel,
|
||||
snapshot_seqs: helper.snapshot_seqs,
|
||||
sst_min_sequences: helper.sst_min_sequences,
|
||||
}),
|
||||
ContextCompat::Full(full_ctx) => Ok(full_ctx.into()),
|
||||
}
|
||||
@@ -1507,12 +1536,21 @@ impl<'de> Deserialize<'de> for FlowQueryContext {
|
||||
|
||||
impl From<PbQueryContext> for QueryContext {
|
||||
fn from(pb_ctx: PbQueryContext) -> Self {
|
||||
let snapshot_sequences = pb_ctx.snapshot_seqs;
|
||||
Self {
|
||||
current_catalog: pb_ctx.current_catalog,
|
||||
current_schema: pb_ctx.current_schema,
|
||||
timezone: pb_ctx.timezone,
|
||||
extensions: pb_ctx.extensions,
|
||||
channel: pb_ctx.channel as u8,
|
||||
snapshot_seqs: snapshot_sequences
|
||||
.as_ref()
|
||||
.map(|x| x.snapshot_seqs.clone())
|
||||
.unwrap_or_default(),
|
||||
sst_min_sequences: snapshot_sequences
|
||||
.as_ref()
|
||||
.map(|x| x.sst_min_sequences.clone())
|
||||
.unwrap_or_default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1525,6 +1563,8 @@ impl From<QueryContext> for PbQueryContext {
|
||||
timezone,
|
||||
extensions,
|
||||
channel,
|
||||
snapshot_seqs,
|
||||
sst_min_sequences,
|
||||
}: QueryContext,
|
||||
) -> Self {
|
||||
PbQueryContext {
|
||||
@@ -1533,7 +1573,10 @@ impl From<QueryContext> for PbQueryContext {
|
||||
timezone,
|
||||
extensions,
|
||||
channel: channel as u32,
|
||||
snapshot_seqs: None,
|
||||
snapshot_seqs: Some(api::v1::SnapshotSequences {
|
||||
snapshot_seqs,
|
||||
sst_min_sequences,
|
||||
}),
|
||||
explain: None,
|
||||
}
|
||||
}
|
||||
@@ -1545,6 +1588,10 @@ impl From<QueryContext> for FlowQueryContext {
|
||||
catalog: ctx.current_catalog,
|
||||
schema: ctx.current_schema,
|
||||
timezone: ctx.timezone,
|
||||
extensions: ctx.extensions,
|
||||
channel: ctx.channel,
|
||||
snapshot_seqs: ctx.snapshot_seqs,
|
||||
sst_min_sequences: ctx.sst_min_sequences,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1555,8 +1602,10 @@ impl From<FlowQueryContext> for QueryContext {
|
||||
current_catalog: flow_ctx.catalog,
|
||||
current_schema: flow_ctx.schema,
|
||||
timezone: flow_ctx.timezone,
|
||||
extensions: HashMap::new(),
|
||||
channel: 0, // Use default channel for flows
|
||||
extensions: flow_ctx.extensions,
|
||||
channel: flow_ctx.channel,
|
||||
snapshot_seqs: flow_ctx.snapshot_seqs,
|
||||
sst_min_sequences: flow_ctx.sst_min_sequences,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1720,6 +1769,8 @@ mod tests {
|
||||
timezone: "UTC".to_string(),
|
||||
extensions,
|
||||
channel: 5,
|
||||
snapshot_seqs: HashMap::from([(10, 100)]),
|
||||
sst_min_sequences: HashMap::from([(10, 90)]),
|
||||
};
|
||||
|
||||
let flow_ctx: FlowQueryContext = query_ctx.into();
|
||||
@@ -1727,6 +1778,9 @@ mod tests {
|
||||
assert_eq!(flow_ctx.catalog, "test_catalog");
|
||||
assert_eq!(flow_ctx.schema, "test_schema");
|
||||
assert_eq!(flow_ctx.timezone, "UTC");
|
||||
assert_eq!(flow_ctx.channel, 5);
|
||||
assert_eq!(flow_ctx.snapshot_seqs, HashMap::from([(10, 100)]));
|
||||
assert_eq!(flow_ctx.sst_min_sequences, HashMap::from([(10, 90)]));
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -1735,6 +1789,10 @@ mod tests {
|
||||
catalog: "prod_catalog".to_string(),
|
||||
schema: "public".to_string(),
|
||||
timezone: "America/New_York".to_string(),
|
||||
extensions: HashMap::from([("k".to_string(), "v".to_string())]),
|
||||
channel: 7,
|
||||
snapshot_seqs: HashMap::from([(11, 111)]),
|
||||
sst_min_sequences: HashMap::from([(11, 101)]),
|
||||
};
|
||||
|
||||
let query_ctx: QueryContext = flow_ctx.clone().into();
|
||||
@@ -1742,8 +1800,13 @@ mod tests {
|
||||
assert_eq!(query_ctx.current_catalog, "prod_catalog");
|
||||
assert_eq!(query_ctx.current_schema, "public");
|
||||
assert_eq!(query_ctx.timezone, "America/New_York");
|
||||
assert!(query_ctx.extensions.is_empty());
|
||||
assert_eq!(query_ctx.channel, 0);
|
||||
assert_eq!(
|
||||
query_ctx.extensions,
|
||||
HashMap::from([("k".to_string(), "v".to_string())])
|
||||
);
|
||||
assert_eq!(query_ctx.channel, 7);
|
||||
assert_eq!(query_ctx.snapshot_seqs, HashMap::from([(11, 111)]));
|
||||
assert_eq!(query_ctx.sst_min_sequences, HashMap::from([(11, 101)]));
|
||||
|
||||
// Test roundtrip conversion
|
||||
let flow_ctx_roundtrip: FlowQueryContext = query_ctx.into();
|
||||
@@ -1756,6 +1819,10 @@ mod tests {
|
||||
catalog: "test_catalog".to_string(),
|
||||
schema: "test_schema".to_string(),
|
||||
timezone: "UTC".to_string(),
|
||||
extensions: HashMap::new(),
|
||||
channel: 0,
|
||||
snapshot_seqs: HashMap::new(),
|
||||
sst_min_sequences: HashMap::new(),
|
||||
};
|
||||
|
||||
let serialized = serde_json::to_string(&flow_ctx).unwrap();
|
||||
@@ -1776,6 +1843,10 @@ mod tests {
|
||||
catalog: "pb_catalog".to_string(),
|
||||
schema: "pb_schema".to_string(),
|
||||
timezone: "Asia/Tokyo".to_string(),
|
||||
extensions: HashMap::from([("x".to_string(), "y".to_string())]),
|
||||
channel: 6,
|
||||
snapshot_seqs: HashMap::from([(3, 30)]),
|
||||
sst_min_sequences: HashMap::from([(3, 21)]),
|
||||
};
|
||||
|
||||
let pb_ctx: PbQueryContext = flow_ctx.into();
|
||||
@@ -1783,9 +1854,44 @@ mod tests {
|
||||
assert_eq!(pb_ctx.current_catalog, "pb_catalog");
|
||||
assert_eq!(pb_ctx.current_schema, "pb_schema");
|
||||
assert_eq!(pb_ctx.timezone, "Asia/Tokyo");
|
||||
assert!(pb_ctx.extensions.is_empty());
|
||||
assert_eq!(pb_ctx.channel, 0);
|
||||
assert!(pb_ctx.snapshot_seqs.is_none());
|
||||
assert_eq!(
|
||||
pb_ctx.extensions,
|
||||
HashMap::from([("x".to_string(), "y".to_string())])
|
||||
);
|
||||
assert_eq!(pb_ctx.channel, 6);
|
||||
assert_eq!(
|
||||
pb_ctx.snapshot_seqs,
|
||||
Some(api::v1::SnapshotSequences {
|
||||
snapshot_seqs: HashMap::from([(3, 30)]),
|
||||
sst_min_sequences: HashMap::from([(3, 21)]),
|
||||
})
|
||||
);
|
||||
assert!(pb_ctx.explain.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_pb_query_context_roundtrip_with_snapshot_sequences() {
|
||||
let pb = PbQueryContext {
|
||||
current_catalog: "c1".to_string(),
|
||||
current_schema: "s1".to_string(),
|
||||
timezone: "UTC".to_string(),
|
||||
extensions: HashMap::from([("flow.return_region_seq".to_string(), "true".to_string())]),
|
||||
channel: 3,
|
||||
snapshot_seqs: Some(api::v1::SnapshotSequences {
|
||||
snapshot_seqs: HashMap::from([(1, 100)]),
|
||||
sst_min_sequences: HashMap::from([(1, 90)]),
|
||||
}),
|
||||
explain: None,
|
||||
};
|
||||
|
||||
let query_ctx: QueryContext = pb.clone().into();
|
||||
let pb_roundtrip: PbQueryContext = query_ctx.into();
|
||||
|
||||
assert_eq!(pb_roundtrip.current_catalog, pb.current_catalog);
|
||||
assert_eq!(pb_roundtrip.current_schema, pb.current_schema);
|
||||
assert_eq!(pb_roundtrip.timezone, pb.timezone);
|
||||
assert_eq!(pb_roundtrip.extensions, pb.extensions);
|
||||
assert_eq!(pb_roundtrip.channel, pb.channel);
|
||||
assert_eq!(pb_roundtrip.snapshot_seqs, pb.snapshot_seqs);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,6 +12,8 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use common_time::Timezone;
|
||||
use session::context::{QueryContextBuilder, QueryContextRef};
|
||||
use snafu::ResultExt;
|
||||
@@ -27,6 +29,8 @@ pub fn to_meta_query_context(
|
||||
timezone: query_context.timezone().to_string(),
|
||||
extensions: query_context.extensions(),
|
||||
channel: query_context.channel() as u8,
|
||||
snapshot_seqs: query_context.snapshots(),
|
||||
sst_min_sequences: query_context.sst_min_sequences(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -43,5 +47,41 @@ pub fn try_to_session_query_context(
|
||||
)
|
||||
.extensions(value.extensions)
|
||||
.channel((value.channel as u32).into())
|
||||
.snapshot_seqs(Arc::new(RwLock::new(value.snapshot_seqs)))
|
||||
.sst_min_sequences(Arc::new(RwLock::new(value.sst_min_sequences)))
|
||||
.build())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use common_time::Timezone;
|
||||
use session::context::QueryContextBuilder;
|
||||
|
||||
use super::{to_meta_query_context, try_to_session_query_context};
|
||||
|
||||
#[test]
|
||||
fn test_query_context_meta_roundtrip_with_sequences() {
|
||||
let session_ctx = Arc::new(
|
||||
QueryContextBuilder::default()
|
||||
.current_catalog("c1".to_string())
|
||||
.current_schema("s1".to_string())
|
||||
.timezone(Timezone::from_tz_string("UTC").unwrap())
|
||||
.set_extension("flow.return_region_seq".to_string(), "true".to_string())
|
||||
.snapshot_seqs(Arc::new(RwLock::new(HashMap::from([(10, 100)]))))
|
||||
.sst_min_sequences(Arc::new(RwLock::new(HashMap::from([(10, 90)]))))
|
||||
.build(),
|
||||
);
|
||||
|
||||
let meta_ctx = to_meta_query_context(session_ctx);
|
||||
let roundtrip = try_to_session_query_context(meta_ctx).unwrap();
|
||||
|
||||
assert_eq!(roundtrip.current_catalog(), "c1");
|
||||
assert_eq!(roundtrip.current_schema(), "s1");
|
||||
assert_eq!(roundtrip.snapshots(), HashMap::from([(10, 100)]));
|
||||
assert_eq!(roundtrip.sst_min_sequences(), HashMap::from([(10, 90)]));
|
||||
assert_eq!(roundtrip.extension("flow.return_region_seq"), Some("true"));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,8 +12,18 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use common_base::memory_limit::MemoryLimit;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use table::metadata::TableId;
|
||||
|
||||
pub const FLOW_INCREMENTAL_AFTER_SEQS: &str = "flow.incremental_after_seqs";
|
||||
pub const FLOW_INCREMENTAL_MODE: &str = "flow.incremental_mode";
|
||||
pub const FLOW_RETURN_REGION_SEQ: &str = "flow.return_region_seq";
|
||||
pub const FLOW_SINK_TABLE_ID: &str = "flow.sink_table_id";
|
||||
|
||||
const FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY: &str = "memtable_only";
|
||||
|
||||
/// Query engine config
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
@@ -39,3 +49,271 @@ impl Default for QueryOptions {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum FlowIncrementalMode {
|
||||
MemtableOnly,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Default)]
|
||||
pub struct FlowQueryExtensions {
|
||||
pub incremental_after_seqs: Option<HashMap<u64, u64>>,
|
||||
pub incremental_mode: Option<FlowIncrementalMode>,
|
||||
pub return_region_seq: bool,
|
||||
pub sink_table_id: Option<TableId>,
|
||||
}
|
||||
|
||||
impl FlowQueryExtensions {
|
||||
pub fn from_extensions(extensions: &HashMap<String, String>) -> Result<Self, String> {
|
||||
let incremental_mode = extensions
|
||||
.get(FLOW_INCREMENTAL_MODE)
|
||||
.map(|value| match value.as_str() {
|
||||
v if v.eq_ignore_ascii_case(FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY) => {
|
||||
Ok(FlowIncrementalMode::MemtableOnly)
|
||||
}
|
||||
_ => Err(format!(
|
||||
"Invalid value for {}: {}",
|
||||
FLOW_INCREMENTAL_MODE, value
|
||||
)),
|
||||
})
|
||||
.transpose()?;
|
||||
|
||||
let incremental_after_seqs = extensions
|
||||
.get(FLOW_INCREMENTAL_AFTER_SEQS)
|
||||
.map(|value| parse_incremental_after_seqs(value.as_str()))
|
||||
.transpose()?;
|
||||
|
||||
let return_region_seq = extensions
|
||||
.get(FLOW_RETURN_REGION_SEQ)
|
||||
.map(|value| parse_bool(value.as_str()))
|
||||
.transpose()?
|
||||
.unwrap_or(false);
|
||||
|
||||
let sink_table_id = extensions
|
||||
.get(FLOW_SINK_TABLE_ID)
|
||||
.map(|value| {
|
||||
value
|
||||
.parse::<TableId>()
|
||||
.map_err(|_| format!("Invalid value for {}: {}", FLOW_SINK_TABLE_ID, value))
|
||||
})
|
||||
.transpose()?;
|
||||
|
||||
if matches!(incremental_mode, Some(FlowIncrementalMode::MemtableOnly)) {
|
||||
let after_seqs = incremental_after_seqs.as_ref().ok_or_else(|| {
|
||||
format!(
|
||||
"{} is required when {}={}.",
|
||||
FLOW_INCREMENTAL_AFTER_SEQS,
|
||||
FLOW_INCREMENTAL_MODE,
|
||||
FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY
|
||||
)
|
||||
})?;
|
||||
if after_seqs.is_empty() {
|
||||
return Err(format!(
|
||||
"{} must not be empty when {}={}.",
|
||||
FLOW_INCREMENTAL_AFTER_SEQS,
|
||||
FLOW_INCREMENTAL_MODE,
|
||||
FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
incremental_after_seqs,
|
||||
incremental_mode,
|
||||
return_region_seq,
|
||||
sink_table_id,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn validate_for_scan(
|
||||
&self,
|
||||
source_region_ids: &[u64],
|
||||
current_scan_table_id: Option<TableId>,
|
||||
) -> Result<bool, String> {
|
||||
if self.sink_table_id.is_some() && self.sink_table_id == current_scan_table_id {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
if matches!(
|
||||
self.incremental_mode,
|
||||
Some(FlowIncrementalMode::MemtableOnly)
|
||||
) {
|
||||
let after_seqs = self.incremental_after_seqs.as_ref().ok_or_else(|| {
|
||||
format!(
|
||||
"{} is required when {}=memtable_only.",
|
||||
FLOW_INCREMENTAL_AFTER_SEQS, FLOW_INCREMENTAL_MODE
|
||||
)
|
||||
})?;
|
||||
|
||||
for region_id in source_region_ids {
|
||||
if !after_seqs.contains_key(region_id) {
|
||||
return Err(format!(
|
||||
"Missing region {} in {} when {}=memtable_only.",
|
||||
region_id, FLOW_INCREMENTAL_AFTER_SEQS, FLOW_INCREMENTAL_MODE
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(self.incremental_after_seqs.is_some())
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_incremental_after_seqs(value: &str) -> Result<HashMap<u64, u64>, String> {
|
||||
let raw = serde_json::from_str::<HashMap<String, u64>>(value).map_err(|e| {
|
||||
format!(
|
||||
"Invalid JSON for {}: {} ({})",
|
||||
FLOW_INCREMENTAL_AFTER_SEQS, value, e
|
||||
)
|
||||
})?;
|
||||
|
||||
raw.into_iter()
|
||||
.map(|(region_id, seq)| {
|
||||
region_id
|
||||
.parse::<u64>()
|
||||
.map(|region_id| (region_id, seq))
|
||||
.map_err(|_| {
|
||||
format!(
|
||||
"Invalid region id in {}: {}",
|
||||
FLOW_INCREMENTAL_AFTER_SEQS, region_id
|
||||
)
|
||||
})
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn parse_bool(value: &str) -> Result<bool, String> {
|
||||
match value {
|
||||
v if v.eq_ignore_ascii_case("true") => Ok(true),
|
||||
v if v.eq_ignore_ascii_case("false") => Ok(false),
|
||||
_ => Err(format!(
|
||||
"Invalid value for {}: {}",
|
||||
FLOW_RETURN_REGION_SEQ, value
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod flow_extension_tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_parse_flow_extensions_default() {
|
||||
let exts = HashMap::new();
|
||||
let parsed = FlowQueryExtensions::from_extensions(&exts).unwrap();
|
||||
|
||||
assert_eq!(parsed.incremental_mode, None);
|
||||
assert_eq!(parsed.incremental_after_seqs, None);
|
||||
assert!(!parsed.return_region_seq);
|
||||
assert_eq!(parsed.sink_table_id, None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_flow_extensions_memtable_only_success() {
|
||||
let exts = HashMap::from([
|
||||
(
|
||||
FLOW_INCREMENTAL_MODE.to_string(),
|
||||
FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string(),
|
||||
),
|
||||
(
|
||||
FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
|
||||
r#"{"1":10,"2":20}"#.to_string(),
|
||||
),
|
||||
(FLOW_RETURN_REGION_SEQ.to_string(), "true".to_string()),
|
||||
(FLOW_SINK_TABLE_ID.to_string(), "1024".to_string()),
|
||||
]);
|
||||
|
||||
let parsed = FlowQueryExtensions::from_extensions(&exts).unwrap();
|
||||
assert_eq!(
|
||||
parsed.incremental_mode,
|
||||
Some(FlowIncrementalMode::MemtableOnly)
|
||||
);
|
||||
assert_eq!(
|
||||
parsed.incremental_after_seqs.unwrap(),
|
||||
HashMap::from([(1, 10), (2, 20)])
|
||||
);
|
||||
assert!(parsed.return_region_seq);
|
||||
assert_eq!(parsed.sink_table_id, Some(1024));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_flow_extensions_mode_requires_after_seqs() {
|
||||
let exts = HashMap::from([(
|
||||
FLOW_INCREMENTAL_MODE.to_string(),
|
||||
FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string(),
|
||||
)]);
|
||||
|
||||
let err = FlowQueryExtensions::from_extensions(&exts).unwrap_err();
|
||||
assert!(err.contains(FLOW_INCREMENTAL_AFTER_SEQS));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_flow_extensions_invalid_mode() {
|
||||
let exts = HashMap::from([(FLOW_INCREMENTAL_MODE.to_string(), "foo".to_string())]);
|
||||
|
||||
let err = FlowQueryExtensions::from_extensions(&exts).unwrap_err();
|
||||
assert!(err.contains(FLOW_INCREMENTAL_MODE));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_flow_extensions_invalid_after_seqs_json() {
|
||||
let exts = HashMap::from([
|
||||
(
|
||||
FLOW_INCREMENTAL_MODE.to_string(),
|
||||
FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string(),
|
||||
),
|
||||
(
|
||||
FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
|
||||
"not-json".to_string(),
|
||||
),
|
||||
]);
|
||||
|
||||
let err = FlowQueryExtensions::from_extensions(&exts).unwrap_err();
|
||||
assert!(err.contains(FLOW_INCREMENTAL_AFTER_SEQS));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_flow_extensions_invalid_sink_table_id() {
|
||||
let exts = HashMap::from([(FLOW_SINK_TABLE_ID.to_string(), "x".to_string())]);
|
||||
|
||||
let err = FlowQueryExtensions::from_extensions(&exts).unwrap_err();
|
||||
assert!(err.contains(FLOW_SINK_TABLE_ID));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_validate_for_scan_missing_source_region() {
|
||||
let exts = HashMap::from([
|
||||
(
|
||||
FLOW_INCREMENTAL_MODE.to_string(),
|
||||
FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string(),
|
||||
),
|
||||
(
|
||||
FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
|
||||
r#"{"1":10}"#.to_string(),
|
||||
),
|
||||
]);
|
||||
|
||||
let parsed = FlowQueryExtensions::from_extensions(&exts).unwrap();
|
||||
let err = parsed.validate_for_scan(&[1, 2], Some(100)).unwrap_err();
|
||||
assert!(err.contains("Missing region 2"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_validate_for_scan_sink_table_excluded() {
|
||||
let exts = HashMap::from([
|
||||
(
|
||||
FLOW_INCREMENTAL_MODE.to_string(),
|
||||
FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string(),
|
||||
),
|
||||
(
|
||||
FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
|
||||
r#"{"1":10}"#.to_string(),
|
||||
),
|
||||
(FLOW_SINK_TABLE_ID.to_string(), "1024".to_string()),
|
||||
]);
|
||||
|
||||
let parsed = FlowQueryExtensions::from_extensions(&exts).unwrap();
|
||||
let apply_incremental = parsed.validate_for_scan(&[1, 2], Some(1024)).unwrap();
|
||||
assert!(!apply_incremental);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -433,6 +433,10 @@ impl QueryContext {
|
||||
self.snapshot_seqs.read().unwrap().clone()
|
||||
}
|
||||
|
||||
pub fn sst_min_sequences(&self) -> HashMap<u64, u64> {
|
||||
self.sst_min_sequences.read().unwrap().clone()
|
||||
}
|
||||
|
||||
pub fn get_snapshot(&self, region_id: u64) -> Option<u64> {
|
||||
self.snapshot_seqs.read().unwrap().get(®ion_id).cloned()
|
||||
}
|
||||
@@ -669,6 +673,8 @@ impl ConfigurationVariables {
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::collections::HashMap;
|
||||
|
||||
use common_catalog::consts::DEFAULT_CATALOG_NAME;
|
||||
|
||||
use super::*;
|
||||
@@ -704,4 +710,30 @@ mod test {
|
||||
let context = QueryContext::with(DEFAULT_CATALOG_NAME, "test");
|
||||
assert_eq!("test", context.get_db_string());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_api_query_context_roundtrip_with_sequences() {
|
||||
let api_ctx = api::v1::QueryContext {
|
||||
current_catalog: "c1".to_string(),
|
||||
current_schema: "s1".to_string(),
|
||||
timezone: "UTC".to_string(),
|
||||
extensions: HashMap::from([("flow.return_region_seq".to_string(), "true".to_string())]),
|
||||
channel: Channel::Grpc as u32,
|
||||
snapshot_seqs: Some(api::v1::SnapshotSequences {
|
||||
snapshot_seqs: HashMap::from([(1, 100)]),
|
||||
sst_min_sequences: HashMap::from([(1, 90)]),
|
||||
}),
|
||||
explain: None,
|
||||
};
|
||||
|
||||
let session_ctx: QueryContext = api_ctx.clone().into();
|
||||
let roundtrip_api: api::v1::QueryContext = session_ctx.into();
|
||||
|
||||
assert_eq!(roundtrip_api.current_catalog, api_ctx.current_catalog);
|
||||
assert_eq!(roundtrip_api.current_schema, api_ctx.current_schema);
|
||||
assert_eq!(roundtrip_api.timezone, api_ctx.timezone);
|
||||
assert_eq!(roundtrip_api.extensions, api_ctx.extensions);
|
||||
assert_eq!(roundtrip_api.channel, api_ctx.channel);
|
||||
assert_eq!(roundtrip_api.snapshot_seqs, api_ctx.snapshot_seqs);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user