1use std::collections::HashMap;
16
17use common_base::memory_limit::MemoryLimit;
18use serde::{Deserialize, Serialize};
19use store_api::storage::RegionId;
20use table::metadata::TableId;
21
22use crate::error::{Error, InvalidQueryContextExtensionSnafu, Result};
23
24pub const FLOW_INCREMENTAL_AFTER_SEQS: &str = "flow.incremental_after_seqs";
25pub const FLOW_INCREMENTAL_MODE: &str = "flow.incremental_mode";
26pub const FLOW_RETURN_REGION_SEQ: &str = "flow.return_region_seq";
27pub const FLOW_SINK_TABLE_ID: &str = "flow.sink_table_id";
28pub const QUERY_ENABLE_REMOTE_DYNAMIC_FILTER_PUSHDOWN: &str =
30 "query.enable_remote_dynamic_filter_pushdown";
31
32pub const FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY: &str = "memtable_only";
33
34#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
36#[serde(default)]
37pub struct QueryOptions {
38 pub parallelism: usize,
40 pub allow_query_fallback: bool,
42 pub memory_pool_size: MemoryLimit,
46 #[serde(skip)]
48 pub enable_per_region_metrics: bool,
49}
50
51#[allow(clippy::derivable_impls)]
52impl Default for QueryOptions {
53 fn default() -> Self {
54 Self {
55 parallelism: 0,
56 allow_query_fallback: false,
57 memory_pool_size: MemoryLimit::default(),
58 enable_per_region_metrics: false,
59 }
60 }
61}
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum FlowIncrementalMode {
65 MemtableOnly,
66}
67
68#[derive(Debug, Clone, PartialEq, Eq, Default)]
69pub struct FlowQueryExtensions {
70 pub incremental_after_seqs: Option<HashMap<u64, u64>>,
72 pub incremental_mode: Option<FlowIncrementalMode>,
74 pub return_region_seq: bool,
76 pub sink_table_id: Option<TableId>,
78}
79
80impl FlowQueryExtensions {
81 pub fn parse_flow_extensions(extensions: &HashMap<String, String>) -> Result<Option<Self>> {
87 let has_flow_context = extensions.contains_key(FLOW_INCREMENTAL_AFTER_SEQS)
88 || extensions.contains_key(FLOW_INCREMENTAL_MODE)
89 || extensions.contains_key(FLOW_RETURN_REGION_SEQ)
90 || extensions.contains_key(FLOW_SINK_TABLE_ID);
91
92 if !has_flow_context {
93 return Ok(None);
94 }
95
96 let incremental_mode = extensions
97 .get(FLOW_INCREMENTAL_MODE)
98 .map(|value| match value.as_str() {
99 v if v.eq_ignore_ascii_case(FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY) => {
100 Ok(FlowIncrementalMode::MemtableOnly)
101 }
102 _ => Err(invalid_query_context_extension(format!(
103 "Invalid value for {}: {}",
104 FLOW_INCREMENTAL_MODE, value
105 ))),
106 })
107 .transpose()?;
108
109 let incremental_after_seqs = extensions
110 .get(FLOW_INCREMENTAL_AFTER_SEQS)
111 .map(|value| parse_incremental_after_seqs(value.as_str()))
112 .transpose()?;
113
114 let return_region_seq = extensions
115 .get(FLOW_RETURN_REGION_SEQ)
116 .map(|value| parse_bool(FLOW_RETURN_REGION_SEQ, value.as_str()))
117 .transpose()?
118 .unwrap_or(false);
119
120 let sink_table_id = extensions
121 .get(FLOW_SINK_TABLE_ID)
122 .map(|value| {
123 value.parse::<TableId>().map_err(|_| {
124 invalid_query_context_extension(format!(
125 "Invalid value for {}: {}",
126 FLOW_SINK_TABLE_ID, value
127 ))
128 })
129 })
130 .transpose()?;
131
132 if matches!(incremental_mode, Some(FlowIncrementalMode::MemtableOnly)) {
133 let after_seqs = incremental_after_seqs.as_ref().ok_or_else(|| {
134 invalid_query_context_extension(format!(
135 "{} is required when {}={}.",
136 FLOW_INCREMENTAL_AFTER_SEQS,
137 FLOW_INCREMENTAL_MODE,
138 FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY
139 ))
140 })?;
141 if after_seqs.is_empty() {
142 return Err(invalid_query_context_extension(format!(
143 "{} must not be empty when {}={}.",
144 FLOW_INCREMENTAL_AFTER_SEQS,
145 FLOW_INCREMENTAL_MODE,
146 FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY
147 )));
148 }
149 }
150
151 Ok(Some(Self {
152 incremental_after_seqs,
153 incremental_mode,
154 return_region_seq,
155 sink_table_id,
156 }))
157 }
158
159 pub fn validate_for_scan(&self, source_region_id: RegionId) -> Result<bool> {
160 if self.sink_table_id.is_some() && self.sink_table_id == Some(source_region_id.table_id()) {
161 return Ok(false);
162 }
163
164 if matches!(
165 self.incremental_mode,
166 Some(FlowIncrementalMode::MemtableOnly)
167 ) {
168 let after_seqs = self.incremental_after_seqs.as_ref().ok_or_else(|| {
169 invalid_query_context_extension(format!(
170 "{} is required when {}=memtable_only.",
171 FLOW_INCREMENTAL_AFTER_SEQS, FLOW_INCREMENTAL_MODE
172 ))
173 })?;
174
175 if !after_seqs.contains_key(&source_region_id.as_u64()) {
176 return Err(invalid_query_context_extension(format!(
177 "Missing region {} in {} when {}=memtable_only.",
178 source_region_id, FLOW_INCREMENTAL_AFTER_SEQS, FLOW_INCREMENTAL_MODE
179 )));
180 }
181 }
182
183 Ok(self.incremental_after_seqs.is_some())
184 }
185
186 pub fn should_collect_region_watermark(&self) -> bool {
187 should_collect_region_watermark(
188 self.return_region_seq,
189 self.incremental_after_seqs.is_some(),
190 )
191 }
192}
193
194pub fn remote_dyn_filter_pushdown_enabled_from_extensions(
201 extensions: &HashMap<String, String>,
202) -> Result<bool> {
203 extensions
204 .get(QUERY_ENABLE_REMOTE_DYNAMIC_FILTER_PUSHDOWN)
205 .map(|value| parse_bool(QUERY_ENABLE_REMOTE_DYNAMIC_FILTER_PUSHDOWN, value.as_str()))
206 .transpose()
207 .map(|value| value.unwrap_or(true))
208}
209
210pub fn should_collect_region_watermark_from_extensions(
215 extensions: &HashMap<String, String>,
216) -> bool {
217 let return_region_seq = extensions
218 .get(FLOW_RETURN_REGION_SEQ)
219 .is_some_and(|value| value.eq_ignore_ascii_case("true"));
220 let has_incremental_after_seqs = extensions.contains_key(FLOW_INCREMENTAL_AFTER_SEQS);
221
222 should_collect_region_watermark(return_region_seq, has_incremental_after_seqs)
223}
224
225fn should_collect_region_watermark(
226 return_region_seq: bool,
227 has_incremental_after_seqs: bool,
228) -> bool {
229 return_region_seq || has_incremental_after_seqs
230}
231
232fn parse_incremental_after_seqs(value: &str) -> Result<HashMap<u64, u64>> {
233 let raw = serde_json::from_str::<HashMap<String, serde_json::Value>>(value).map_err(|e| {
234 invalid_query_context_extension(format!(
235 "Invalid JSON for {}: {} ({})",
236 FLOW_INCREMENTAL_AFTER_SEQS, value, e
237 ))
238 })?;
239
240 raw.into_iter()
241 .map(|(region_id, raw_seq)| {
242 let region_id = region_id.parse::<u64>().map_err(|_| {
243 invalid_query_context_extension(format!(
244 "Invalid region id in {}: {}",
245 FLOW_INCREMENTAL_AFTER_SEQS, region_id
246 ))
247 })?;
248
249 let seq = match raw_seq {
250 serde_json::Value::Number(num) => num.as_u64().ok_or_else(|| {
251 invalid_query_context_extension(format!(
252 "Invalid sequence value in {} for region {}: {}",
253 FLOW_INCREMENTAL_AFTER_SEQS, region_id, num
254 ))
255 })?,
256 serde_json::Value::String(s) => s.parse::<u64>().map_err(|_| {
257 invalid_query_context_extension(format!(
258 "Invalid sequence string in {} for region {}: {}",
259 FLOW_INCREMENTAL_AFTER_SEQS, region_id, s
260 ))
261 })?,
262 _ => {
263 return Err(invalid_query_context_extension(format!(
264 "Invalid sequence value type in {} for region {}",
265 FLOW_INCREMENTAL_AFTER_SEQS, region_id
266 )));
267 }
268 };
269
270 Ok((region_id, seq))
271 })
272 .collect()
273}
274
275fn parse_bool(option_name: &str, value: &str) -> Result<bool> {
276 match value {
277 v if v.eq_ignore_ascii_case("true") => Ok(true),
278 v if v.eq_ignore_ascii_case("false") => Ok(false),
279 _ => Err(invalid_query_context_extension(format!(
280 "Invalid value for {}: {}",
281 option_name, value
282 ))),
283 }
284}
285
286fn invalid_query_context_extension(reason: String) -> Error {
287 InvalidQueryContextExtensionSnafu { reason }.build()
288}
289
290#[cfg(test)]
291mod flow_extension_tests {
292 use super::*;
293
294 #[test]
295 fn test_parse_flow_extensions_returns_none_for_non_flow_query() {
296 let exts = HashMap::new();
297 let parsed = FlowQueryExtensions::parse_flow_extensions(&exts).unwrap();
298
299 assert_eq!(parsed, None);
300 }
301
302 #[test]
303 fn test_remote_dyn_filter_pushdown_enabled_from_extensions_defaults_true() {
304 assert!(remote_dyn_filter_pushdown_enabled_from_extensions(&HashMap::new()).unwrap());
305 }
306
307 #[test]
308 fn test_remote_dyn_filter_pushdown_enabled_from_extensions_parses_bool() {
309 let exts = HashMap::from([(
310 QUERY_ENABLE_REMOTE_DYNAMIC_FILTER_PUSHDOWN.to_string(),
311 "false".to_string(),
312 )]);
313 assert!(!remote_dyn_filter_pushdown_enabled_from_extensions(&exts).unwrap());
314
315 let exts = HashMap::from([(
316 QUERY_ENABLE_REMOTE_DYNAMIC_FILTER_PUSHDOWN.to_string(),
317 "true".to_string(),
318 )]);
319 assert!(remote_dyn_filter_pushdown_enabled_from_extensions(&exts).unwrap());
320 }
321
322 #[test]
323 fn test_remote_dyn_filter_pushdown_enabled_from_extensions_rejects_invalid_bool() {
324 let exts = HashMap::from([(
325 QUERY_ENABLE_REMOTE_DYNAMIC_FILTER_PUSHDOWN.to_string(),
326 "invalid".to_string(),
327 )]);
328
329 let err = remote_dyn_filter_pushdown_enabled_from_extensions(&exts).unwrap_err();
330 assert!(format!("{err}").contains(QUERY_ENABLE_REMOTE_DYNAMIC_FILTER_PUSHDOWN));
331 }
332
333 #[test]
334 fn test_parse_flow_extensions_memtable_only_success() {
335 let exts = HashMap::from([
336 (
337 FLOW_INCREMENTAL_MODE.to_string(),
338 FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string(),
339 ),
340 (
341 FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
342 r#"{"1":10,"2":20}"#.to_string(),
343 ),
344 (FLOW_RETURN_REGION_SEQ.to_string(), "true".to_string()),
345 (FLOW_SINK_TABLE_ID.to_string(), "1024".to_string()),
346 ]);
347
348 let parsed = FlowQueryExtensions::parse_flow_extensions(&exts)
349 .unwrap()
350 .unwrap();
351 assert_eq!(
352 parsed.incremental_mode,
353 Some(FlowIncrementalMode::MemtableOnly)
354 );
355 assert_eq!(
356 parsed.incremental_after_seqs.unwrap(),
357 HashMap::from([(1, 10), (2, 20)])
358 );
359 assert!(parsed.return_region_seq);
360 assert_eq!(parsed.sink_table_id, Some(1024));
361 }
362
363 #[test]
364 fn test_parse_flow_extensions_mode_requires_after_seqs() {
365 let exts = HashMap::from([(
366 FLOW_INCREMENTAL_MODE.to_string(),
367 FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string(),
368 )]);
369
370 let err = FlowQueryExtensions::parse_flow_extensions(&exts).unwrap_err();
371 assert!(format!("{err}").contains(FLOW_INCREMENTAL_AFTER_SEQS));
372 }
373
374 #[test]
375 fn test_parse_flow_extensions_invalid_mode() {
376 let exts = HashMap::from([(FLOW_INCREMENTAL_MODE.to_string(), "foo".to_string())]);
377
378 let err = FlowQueryExtensions::parse_flow_extensions(&exts).unwrap_err();
379 assert!(format!("{err}").contains(FLOW_INCREMENTAL_MODE));
380 }
381
382 #[test]
383 fn test_parse_flow_extensions_invalid_after_seqs_json() {
384 let exts = HashMap::from([
385 (
386 FLOW_INCREMENTAL_MODE.to_string(),
387 FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string(),
388 ),
389 (
390 FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
391 "not-json".to_string(),
392 ),
393 ]);
394
395 let err = FlowQueryExtensions::parse_flow_extensions(&exts).unwrap_err();
396 assert!(format!("{err}").contains(FLOW_INCREMENTAL_AFTER_SEQS));
397 }
398
399 #[test]
400 fn test_parse_flow_extensions_after_seqs_string_values() {
401 let exts = HashMap::from([(
402 FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
403 r#"{"1":"10","2":"20"}"#.to_string(),
404 )]);
405
406 let parsed = FlowQueryExtensions::parse_flow_extensions(&exts)
407 .unwrap()
408 .unwrap();
409 assert_eq!(
410 parsed.incremental_after_seqs.unwrap(),
411 HashMap::from([(1, 10), (2, 20)])
412 );
413 }
414
415 #[test]
416 fn test_parse_flow_extensions_after_seqs_invalid_value_type() {
417 let exts = HashMap::from([(
418 FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
419 r#"{"1":true}"#.to_string(),
420 )]);
421
422 let err = FlowQueryExtensions::parse_flow_extensions(&exts).unwrap_err();
423 assert!(format!("{err}").contains(FLOW_INCREMENTAL_AFTER_SEQS));
424 }
425
426 #[test]
427 fn test_parse_flow_extensions_invalid_sink_table_id() {
428 let exts = HashMap::from([(FLOW_SINK_TABLE_ID.to_string(), "x".to_string())]);
429
430 let err = FlowQueryExtensions::parse_flow_extensions(&exts).unwrap_err();
431 assert!(format!("{err}").contains(FLOW_SINK_TABLE_ID));
432 }
433
434 #[test]
435 fn test_validate_for_scan_missing_source_region() {
436 let source_region_id = RegionId::new(100, 2);
437 let existing_region_id = RegionId::new(100, 1);
438 let exts = HashMap::from([
439 (
440 FLOW_INCREMENTAL_MODE.to_string(),
441 FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string(),
442 ),
443 (
444 FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
445 format!(r#"{{"{}":10}}"#, existing_region_id.as_u64()),
446 ),
447 ]);
448
449 let parsed = FlowQueryExtensions::parse_flow_extensions(&exts)
450 .unwrap()
451 .unwrap();
452 let err = parsed.validate_for_scan(source_region_id).unwrap_err();
453 assert!(format!("{err}").contains("Missing region"));
454 }
455
456 #[test]
457 fn test_validate_for_scan_sink_table_excluded() {
458 let source_region_id = RegionId::new(1024, 1);
459 let exts = HashMap::from([
460 (
461 FLOW_INCREMENTAL_MODE.to_string(),
462 FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string(),
463 ),
464 (
465 FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
466 format!(r#"{{"{}":10}}"#, source_region_id.as_u64()),
467 ),
468 (FLOW_SINK_TABLE_ID.to_string(), "1024".to_string()),
469 ]);
470
471 let parsed = FlowQueryExtensions::parse_flow_extensions(&exts)
472 .unwrap()
473 .unwrap();
474 let apply_incremental = parsed.validate_for_scan(source_region_id).unwrap();
475 assert!(!apply_incremental);
476 }
477
478 #[test]
479 fn test_should_collect_region_watermark_defaults_false() {
480 let parsed = FlowQueryExtensions::default();
481 assert!(!parsed.should_collect_region_watermark());
482 }
483
484 #[test]
485 fn test_should_collect_region_watermark_true_for_return_region_seq() {
486 let parsed = FlowQueryExtensions {
487 return_region_seq: true,
488 ..Default::default()
489 };
490 assert!(parsed.should_collect_region_watermark());
491 }
492
493 #[test]
494 fn test_should_collect_region_watermark_true_for_incremental_query() {
495 let parsed = FlowQueryExtensions {
496 incremental_after_seqs: Some(HashMap::from([(1, 10)])),
497 ..Default::default()
498 };
499 assert!(parsed.should_collect_region_watermark());
500 }
501
502 #[test]
503 fn test_should_collect_region_watermark_from_extensions() {
504 let exts = HashMap::from([(FLOW_RETURN_REGION_SEQ.to_string(), "true".to_string())]);
505 assert!(should_collect_region_watermark_from_extensions(&exts));
506
507 let exts = HashMap::from([(
508 FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
509 r#"{"1":10}"#.to_string(),
510 )]);
511 assert!(should_collect_region_watermark_from_extensions(&exts));
512
513 let exts = HashMap::from([(FLOW_RETURN_REGION_SEQ.to_string(), "false".to_string())]);
514 assert!(!should_collect_region_watermark_from_extensions(&exts));
515 assert!(!should_collect_region_watermark_from_extensions(
516 &HashMap::new()
517 ));
518 }
519
520 #[test]
521 fn test_parse_flow_extensions_return_region_seq_only_returns_some() {
522 let exts = HashMap::from([(FLOW_RETURN_REGION_SEQ.to_string(), "true".to_string())]);
523
524 let parsed = FlowQueryExtensions::parse_flow_extensions(&exts)
525 .unwrap()
526 .unwrap();
527
528 assert!(parsed.return_region_seq);
529 }
530
531 #[test]
532 fn test_parse_flow_extensions_sink_table_only_returns_some() {
533 let exts = HashMap::from([(FLOW_SINK_TABLE_ID.to_string(), "1024".to_string())]);
534
535 let parsed = FlowQueryExtensions::parse_flow_extensions(&exts)
536 .unwrap()
537 .unwrap();
538
539 assert_eq!(parsed.sink_table_id, Some(1024));
540 }
541
542 #[test]
543 fn test_parse_flow_extensions_incremental_after_seqs_only_returns_some() {
544 let exts = HashMap::from([(
545 FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
546 r#"{"1":10}"#.to_string(),
547 )]);
548
549 let parsed = FlowQueryExtensions::parse_flow_extensions(&exts)
550 .unwrap()
551 .unwrap();
552
553 assert_eq!(
554 parsed.incremental_after_seqs,
555 Some(HashMap::from([(1, 10)]))
556 );
557 }
558}