1use std::str::FromStr;
16
17use api::v1::region::{StrictWindow, compact_request};
18use arrow::datatypes::DataType as ArrowDataType;
19use common_error::ext::BoxedError;
20use common_macro::admin_fn;
21use common_query::error::{
22 InvalidFuncArgsSnafu, MissingTableMutationHandlerSnafu, Result, TableMutationSnafu,
23 UnsupportedInputDataTypeSnafu,
24};
25use common_telemetry::info;
26use datafusion_expr::{Signature, Volatility};
27use datatypes::prelude::*;
28use session::context::QueryContextRef;
29use session::table_name::table_name_to_full_name;
30use snafu::{ResultExt, ensure};
31use table::requests::{CompactTableRequest, FlushTableRequest};
32
33use crate::handlers::TableMutationHandlerRef;
34
35const COMPACT_TYPE_STRICT_WINDOW: &str = "strict_window";
37const COMPACT_TYPE_STRICT_WINDOW_SHORT: &str = "swcs";
39
40const DEFAULT_COMPACTION_PARALLELISM: u32 = 1;
41
42#[admin_fn(
43 name = FlushTableFunction,
44 display_name = flush_table,
45 sig_fn = flush_signature,
46 ret = uint64
47)]
48pub(crate) async fn flush_table(
49 table_mutation_handler: &TableMutationHandlerRef,
50 query_ctx: &QueryContextRef,
51 params: &[ValueRef<'_>],
52) -> Result<Value> {
53 ensure!(
54 params.len() == 1,
55 InvalidFuncArgsSnafu {
56 err_msg: format!(
57 "The length of the args is not correct, expect 1, have: {}",
58 params.len()
59 ),
60 }
61 );
62
63 let ValueRef::String(table_name) = params[0] else {
64 return UnsupportedInputDataTypeSnafu {
65 function: "flush_table",
66 datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
67 }
68 .fail();
69 };
70
71 let (catalog_name, schema_name, table_name) = table_name_to_full_name(table_name, query_ctx)
72 .map_err(BoxedError::new)
73 .context(TableMutationSnafu)?;
74
75 let affected_rows = table_mutation_handler
76 .flush(
77 FlushTableRequest {
78 catalog_name,
79 schema_name,
80 table_name,
81 },
82 query_ctx.clone(),
83 )
84 .await?;
85
86 Ok(Value::from(affected_rows as u64))
87}
88
89#[admin_fn(
90 name = CompactTableFunction,
91 display_name = compact_table,
92 sig_fn = compact_signature,
93 ret = uint64
94)]
95pub(crate) async fn compact_table(
96 table_mutation_handler: &TableMutationHandlerRef,
97 query_ctx: &QueryContextRef,
98 params: &[ValueRef<'_>],
99) -> Result<Value> {
100 let request = parse_compact_request(params, query_ctx)?;
101 info!("Compact table request: {:?}", request);
102
103 let affected_rows = table_mutation_handler
104 .compact(request, query_ctx.clone())
105 .await?;
106
107 Ok(Value::from(affected_rows as u64))
108}
109
110fn flush_signature() -> Signature {
111 Signature::uniform(1, vec![ArrowDataType::Utf8], Volatility::Immutable)
112}
113
114fn compact_signature() -> Signature {
115 Signature::variadic(vec![ArrowDataType::Utf8], Volatility::Immutable)
116}
117
118fn parse_compact_request(
125 params: &[ValueRef<'_>],
126 query_ctx: &QueryContextRef,
127) -> Result<CompactTableRequest> {
128 ensure!(
129 !params.is_empty() && params.len() <= 3,
130 InvalidFuncArgsSnafu {
131 err_msg: format!(
132 "The length of the args is not correct, expect 1-4, have: {}",
133 params.len()
134 ),
135 }
136 );
137
138 let (table_name, compact_type, parallelism) = match params {
139 [ValueRef::String(table_name)] => (
141 table_name,
142 compact_request::Options::Regular(Default::default()),
143 DEFAULT_COMPACTION_PARALLELISM,
144 ),
145 [
147 ValueRef::String(table_name),
148 ValueRef::String(compact_ty_str),
149 ] => {
150 let (compact_type, parallelism) = parse_compact_options(compact_ty_str, None)?;
151 (table_name, compact_type, parallelism)
152 }
153 [
155 ValueRef::String(table_name),
156 ValueRef::String(compact_ty_str),
157 ValueRef::String(options_str),
158 ] => {
159 let (compact_type, parallelism) =
160 parse_compact_options(compact_ty_str, Some(options_str))?;
161 (table_name, compact_type, parallelism)
162 }
163 _ => {
164 return UnsupportedInputDataTypeSnafu {
165 function: "compact_table",
166 datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
167 }
168 .fail();
169 }
170 };
171
172 let (catalog_name, schema_name, table_name) = table_name_to_full_name(table_name, query_ctx)
173 .map_err(BoxedError::new)
174 .context(TableMutationSnafu)?;
175
176 Ok(CompactTableRequest {
177 catalog_name,
178 schema_name,
179 table_name,
180 compact_options: compact_type,
181 parallelism,
182 })
183}
184
185fn parse_compact_options(
188 type_str: &str,
189 option: Option<&str>,
190) -> Result<(compact_request::Options, u32)> {
191 if type_str.eq_ignore_ascii_case(COMPACT_TYPE_STRICT_WINDOW)
192 | type_str.eq_ignore_ascii_case(COMPACT_TYPE_STRICT_WINDOW_SHORT)
193 {
194 let Some(option_str) = option else {
195 return Ok((
196 compact_request::Options::StrictWindow(StrictWindow { window_seconds: 0 }),
197 DEFAULT_COMPACTION_PARALLELISM,
198 ));
199 };
200
201 if let Ok(window_seconds) = i64::from_str(option_str) {
203 return Ok((
204 compact_request::Options::StrictWindow(StrictWindow { window_seconds }),
205 DEFAULT_COMPACTION_PARALLELISM,
206 ));
207 };
208
209 let mut window_seconds = 0i64;
211 let mut parallelism = DEFAULT_COMPACTION_PARALLELISM;
212
213 let pairs: Vec<&str> = option_str.split(',').collect();
214 for pair in pairs {
215 let kv: Vec<&str> = pair.trim().split('=').collect();
216 if kv.len() != 2 {
217 return InvalidFuncArgsSnafu {
218 err_msg: format!("Invalid key-value pair: {}", pair.trim()),
219 }
220 .fail();
221 }
222
223 let key = kv[0].trim();
224 let value = kv[1].trim();
225
226 match key {
227 "window" | "window_seconds" => {
228 window_seconds = i64::from_str(value).map_err(|_| {
229 InvalidFuncArgsSnafu {
230 err_msg: format!("Invalid value for window: {}", value),
231 }
232 .build()
233 })?;
234 }
235 "parallelism" => {
236 parallelism = value.parse::<u32>().map_err(|_| {
237 InvalidFuncArgsSnafu {
238 err_msg: format!("Invalid value for parallelism: {}", value),
239 }
240 .build()
241 })?;
242 }
243 _ => {
244 return InvalidFuncArgsSnafu {
245 err_msg: format!("Unknown parameter: {}", key),
246 }
247 .fail();
248 }
249 }
250 }
251
252 Ok((
253 compact_request::Options::StrictWindow(StrictWindow { window_seconds }),
254 parallelism,
255 ))
256 } else {
257 let Some(option_str) = option else {
259 return Ok((
260 compact_request::Options::Regular(Default::default()),
261 DEFAULT_COMPACTION_PARALLELISM,
262 ));
263 };
264
265 let mut parallelism = DEFAULT_COMPACTION_PARALLELISM;
266 let pairs: Vec<&str> = option_str.split(',').collect();
267 for pair in pairs {
268 let kv: Vec<&str> = pair.trim().split('=').collect();
269 if kv.len() != 2 {
270 return InvalidFuncArgsSnafu {
271 err_msg: format!("Invalid key-value pair: {}", pair.trim()),
272 }
273 .fail();
274 }
275
276 let key = kv[0].trim();
277 let value = kv[1].trim();
278
279 match key {
280 "parallelism" => {
281 parallelism = value.parse::<u32>().map_err(|_| {
282 InvalidFuncArgsSnafu {
283 err_msg: format!("Invalid value for parallelism: {}", value),
284 }
285 .build()
286 })?;
287 }
288 _ => {
289 return InvalidFuncArgsSnafu {
290 err_msg: format!("Unknown parameter: {}", key),
291 }
292 .fail();
293 }
294 }
295 }
296
297 Ok((
298 compact_request::Options::Regular(Default::default()),
299 parallelism,
300 ))
301 }
302}
303
304#[cfg(test)]
305mod tests {
306 use std::sync::Arc;
307
308 use api::v1::region::compact_request::Options;
309 use arrow::array::StringArray;
310 use arrow::datatypes::{DataType, Field};
311 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
312 use datafusion_expr::ColumnarValue;
313 use session::context::QueryContext;
314
315 use super::*;
316 use crate::function::FunctionContext;
317 use crate::function_factory::ScalarFunctionFactory;
318
319 macro_rules! define_table_function_test {
320 ($name: ident, $func: ident) => {
321 paste::paste!{
322 #[test]
323 fn [<test_ $name _misc>]() {
324 let factory: ScalarFunctionFactory = $func::factory().into();
325 let f = factory.provide(FunctionContext::mock());
326 assert_eq!(stringify!($name), f.name());
327 assert_eq!(
328 DataType::UInt64,
329 f.return_type(&[]).unwrap()
330 );
331 assert!(matches!(f.signature(),
332 datafusion_expr::Signature {
333 type_signature: datafusion_expr::TypeSignature::Uniform(1, valid_types),
334 volatility: datafusion_expr::Volatility::Immutable,
335 ..
336 } if valid_types == &vec![ArrowDataType::Utf8]));
337 }
338
339 #[tokio::test]
340 async fn [<test_ $name _missing_table_mutation>]() {
341 let factory: ScalarFunctionFactory = $func::factory().into();
342 let provider = factory.provide(FunctionContext::default());
343 let f = provider.as_async().unwrap();
344
345 let func_args = datafusion::logical_expr::ScalarFunctionArgs {
346 args: vec![
347 ColumnarValue::Array(Arc::new(StringArray::from(vec!["test"]))),
348 ],
349 arg_fields: vec![
350 Arc::new(Field::new("arg_0", DataType::Utf8, false)),
351 ],
352 return_field: Arc::new(Field::new("result", DataType::UInt64, true)),
353 number_rows: 1,
354 config_options: Arc::new(datafusion_common::config::ConfigOptions::default()),
355 };
356 let result = f.invoke_async_with_args(func_args).await.unwrap_err();
357 assert_eq!(
358 "Execution error: Handler error: Missing TableMutationHandler, not expected",
359 result.to_string()
360 );
361 }
362
363 #[tokio::test]
364 async fn [<test_ $name>]() {
365 let factory: ScalarFunctionFactory = $func::factory().into();
366 let provider = factory.provide(FunctionContext::mock());
367 let f = provider.as_async().unwrap();
368
369 let func_args = datafusion::logical_expr::ScalarFunctionArgs {
370 args: vec![
371 ColumnarValue::Array(Arc::new(StringArray::from(vec!["test"]))),
372 ],
373 arg_fields: vec![
374 Arc::new(Field::new("arg_0", DataType::Utf8, false)),
375 ],
376 return_field: Arc::new(Field::new("result", DataType::UInt64, true)),
377 number_rows: 1,
378 config_options: Arc::new(datafusion_common::config::ConfigOptions::default()),
379 };
380 let result = f.invoke_async_with_args(func_args).await.unwrap();
381
382 match result {
383 ColumnarValue::Array(array) => {
384 let result_array = array.as_any().downcast_ref::<arrow::array::UInt64Array>().unwrap();
385 assert_eq!(result_array.value(0), 42u64);
386 }
387 ColumnarValue::Scalar(scalar) => {
388 assert_eq!(scalar, datafusion_common::ScalarValue::UInt64(Some(42)));
389 }
390 }
391 }
392 }
393 }
394 }
395
396 define_table_function_test!(flush_table, FlushTableFunction);
397
398 fn check_parse_compact_params(cases: &[(&[&str], CompactTableRequest)]) {
399 for (params, expected) in cases {
400 let params = params
401 .iter()
402 .map(|s| ValueRef::String(s))
403 .collect::<Vec<_>>();
404
405 assert_eq!(
406 expected,
407 &parse_compact_request(¶ms, &QueryContext::arc()).unwrap()
408 );
409 }
410 }
411
412 #[test]
413 fn test_parse_compact_params() {
414 check_parse_compact_params(&[
415 (
416 &["table"],
417 CompactTableRequest {
418 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
419 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
420 table_name: "table".to_string(),
421 compact_options: Options::Regular(Default::default()),
422 parallelism: 1,
423 },
424 ),
425 (
426 &[&format!("{}.table", DEFAULT_SCHEMA_NAME)],
427 CompactTableRequest {
428 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
429 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
430 table_name: "table".to_string(),
431 compact_options: Options::Regular(Default::default()),
432 parallelism: 1,
433 },
434 ),
435 (
436 &[&format!(
437 "{}.{}.table",
438 DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME
439 )],
440 CompactTableRequest {
441 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
442 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
443 table_name: "table".to_string(),
444 compact_options: Options::Regular(Default::default()),
445 parallelism: 1,
446 },
447 ),
448 (
449 &["table", "regular"],
450 CompactTableRequest {
451 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
452 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
453 table_name: "table".to_string(),
454 compact_options: Options::Regular(Default::default()),
455 parallelism: 1,
456 },
457 ),
458 (
459 &["table", "strict_window"],
460 CompactTableRequest {
461 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
462 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
463 table_name: "table".to_string(),
464 compact_options: Options::StrictWindow(StrictWindow { window_seconds: 0 }),
465 parallelism: 1,
466 },
467 ),
468 (
469 &["table", "strict_window", "3600"],
470 CompactTableRequest {
471 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
472 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
473 table_name: "table".to_string(),
474 compact_options: Options::StrictWindow(StrictWindow {
475 window_seconds: 3600,
476 }),
477 parallelism: 1,
478 },
479 ),
480 (
481 &["table", "swcs", "120"],
482 CompactTableRequest {
483 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
484 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
485 table_name: "table".to_string(),
486 compact_options: Options::StrictWindow(StrictWindow {
487 window_seconds: 120,
488 }),
489 parallelism: 1,
490 },
491 ),
492 (
494 &["table", "regular", "parallelism=4"],
495 CompactTableRequest {
496 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
497 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
498 table_name: "table".to_string(),
499 compact_options: Options::Regular(Default::default()),
500 parallelism: 4,
501 },
502 ),
503 (
504 &["table", "strict_window", "window=3600,parallelism=2"],
505 CompactTableRequest {
506 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
507 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
508 table_name: "table".to_string(),
509 compact_options: Options::StrictWindow(StrictWindow {
510 window_seconds: 3600,
511 }),
512 parallelism: 2,
513 },
514 ),
515 (
516 &["table", "strict_window", "window=3600"],
517 CompactTableRequest {
518 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
519 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
520 table_name: "table".to_string(),
521 compact_options: Options::StrictWindow(StrictWindow {
522 window_seconds: 3600,
523 }),
524 parallelism: 1,
525 },
526 ),
527 (
528 &["table", "strict_window", "window_seconds=7200"],
529 CompactTableRequest {
530 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
531 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
532 table_name: "table".to_string(),
533 compact_options: Options::StrictWindow(StrictWindow {
534 window_seconds: 7200,
535 }),
536 parallelism: 1,
537 },
538 ),
539 (
540 &["table", "strict_window", "window=1800"],
541 CompactTableRequest {
542 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
543 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
544 table_name: "table".to_string(),
545 compact_options: Options::StrictWindow(StrictWindow {
546 window_seconds: 1800,
547 }),
548 parallelism: 1,
549 },
550 ),
551 (
552 &["table", "regular", "parallelism=8"],
553 CompactTableRequest {
554 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
555 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
556 table_name: "table".to_string(),
557 compact_options: Options::Regular(Default::default()),
558 parallelism: 8,
559 },
560 ),
561 ]);
562
563 assert!(
564 parse_compact_request(
565 &["table", "strict_window", "abc"]
566 .into_iter()
567 .map(ValueRef::String)
568 .collect::<Vec<_>>(),
569 &QueryContext::arc(),
570 )
571 .is_err()
572 );
573
574 assert!(
575 parse_compact_request(
576 &["a.b.table", "strict_window", "abc"]
577 .into_iter()
578 .map(ValueRef::String)
579 .collect::<Vec<_>>(),
580 &QueryContext::arc(),
581 )
582 .is_err()
583 );
584
585 assert!(
587 parse_compact_request(
588 &["table", "regular", "options", "invalid"]
589 .into_iter()
590 .map(ValueRef::String)
591 .collect::<Vec<_>>(),
592 &QueryContext::arc(),
593 )
594 .is_err()
595 );
596
597 assert!(
599 parse_compact_request(
600 &["table", "regular", "options", "4", "extra"]
601 .into_iter()
602 .map(ValueRef::String)
603 .collect::<Vec<_>>(),
604 &QueryContext::arc(),
605 )
606 .is_err()
607 );
608
609 assert!(
611 parse_compact_request(
612 &["table", "strict_window", "window"]
613 .into_iter()
614 .map(ValueRef::String)
615 .collect::<Vec<_>>(),
616 &QueryContext::arc(),
617 )
618 .is_err()
619 );
620
621 assert!(
623 parse_compact_request(
624 &["table", "strict_window", "invalid_key=123"]
625 .into_iter()
626 .map(ValueRef::String)
627 .collect::<Vec<_>>(),
628 &QueryContext::arc(),
629 )
630 .is_err()
631 );
632
633 assert!(
634 parse_compact_request(
635 &["table", "regular", "abcd"]
636 .into_iter()
637 .map(ValueRef::String)
638 .collect::<Vec<_>>(),
639 &QueryContext::arc(),
640 )
641 .is_err()
642 );
643
644 assert!(
646 parse_compact_request(
647 &["table", "strict_window", "window=abc"]
648 .into_iter()
649 .map(ValueRef::String)
650 .collect::<Vec<_>>(),
651 &QueryContext::arc(),
652 )
653 .is_err()
654 );
655
656 assert!(
658 parse_compact_request(
659 &["table", "strict_window", "parallelism=abc"]
660 .into_iter()
661 .map(ValueRef::String)
662 .collect::<Vec<_>>(),
663 &QueryContext::arc(),
664 )
665 .is_err()
666 );
667 }
668}