diff --git a/src/common/function/src/admin/gc.rs b/src/common/function/src/admin/gc.rs index 07473b724e..ad2e3d3890 100644 --- a/src/common/function/src/admin/gc.rs +++ b/src/common/function/src/admin/gc.rs @@ -31,6 +31,7 @@ use crate::handlers::ProcedureServiceHandlerRef; use crate::helper::cast_u64; const DEFAULT_GC_TIMEOUT: Duration = Duration::from_secs(60); +const DEFAULT_FULL_FILE_LISTING: bool = false; #[admin_fn( name = GcRegionsFunction, @@ -43,30 +44,12 @@ pub(crate) async fn gc_regions( _ctx: &QueryContextRef, params: &[ValueRef<'_>], ) -> Result { - ensure!( - !params.is_empty(), - InvalidFuncArgsSnafu { - err_msg: "The length of the args is not correct, expect at least 1 region id, have 0" - .to_string(), - } - ); - - let mut region_ids = Vec::with_capacity(params.len()); - for param in params { - let Some(region_id) = cast_u64(param)? else { - return UnsupportedInputDataTypeSnafu { - function: "gc_regions", - datatypes: params.iter().map(|v| v.data_type()).collect::>(), - } - .fail(); - }; - region_ids.push(region_id); - } + let (region_ids, full_file_listing) = parse_gc_regions_params(params)?; let resp = procedure_service_handler .gc_regions(GcRegionsRequest { region_ids, - full_file_listing: false, + full_file_listing, timeout: DEFAULT_GC_TIMEOUT, }) .await?; @@ -85,11 +68,68 @@ pub(crate) async fn gc_table( query_ctx: &QueryContextRef, params: &[ValueRef<'_>], ) -> Result { + let (catalog_name, schema_name, table_name, full_file_listing) = + parse_gc_table_params(params, query_ctx)?; + + let resp = procedure_service_handler + .gc_table(GcTableRequest { + catalog_name, + schema_name, + table_name, + full_file_listing, + timeout: DEFAULT_GC_TIMEOUT, + }) + .await?; + + Ok(Value::from(resp.processed_regions)) +} + +fn parse_gc_regions_params(params: &[ValueRef<'_>]) -> Result<(Vec, bool)> { ensure!( - params.len() == 1, + !params.is_empty(), + InvalidFuncArgsSnafu { + err_msg: "The length of the args is not correct, expect at least 1 region id, have 0" + .to_string(), + } + ); + + let (full_file_listing, region_params) = match params.last() { + Some(ValueRef::Boolean(value)) => (*value, ¶ms[..params.len() - 1]), + _ => (DEFAULT_FULL_FILE_LISTING, params), + }; + + ensure!( + !region_params.is_empty(), + InvalidFuncArgsSnafu { + err_msg: "The length of the args is not correct, expect at least 1 region id" + .to_string(), + } + ); + + let mut region_ids = Vec::with_capacity(region_params.len()); + for param in region_params { + let Some(region_id) = cast_u64(param)? else { + return UnsupportedInputDataTypeSnafu { + function: "gc_regions", + datatypes: params.iter().map(|v| v.data_type()).collect::>(), + } + .fail(); + }; + region_ids.push(region_id); + } + + Ok((region_ids, full_file_listing)) +} + +fn parse_gc_table_params( + params: &[ValueRef<'_>], + query_ctx: &QueryContextRef, +) -> Result<(String, String, String, bool)> { + ensure!( + matches!(params.len(), 1 | 2), InvalidFuncArgsSnafu { err_msg: format!( - "The length of the args is not correct, expect 1, have: {}", + "The length of the args is not correct, expect 1 or 2, have: {}", params.len() ), } @@ -103,37 +143,78 @@ pub(crate) async fn gc_table( .fail(); }; + let full_file_listing = if params.len() == 2 { + let ValueRef::Boolean(value) = params[1] else { + return UnsupportedInputDataTypeSnafu { + function: "gc_table", + datatypes: params.iter().map(|v| v.data_type()).collect::>(), + } + .fail(); + }; + value + } else { + DEFAULT_FULL_FILE_LISTING + }; + let (catalog_name, schema_name, table_name) = session::table_name::table_name_to_full_name(table_name, query_ctx) .map_err(BoxedError::new) .context(TableMutationSnafu)?; - let resp = procedure_service_handler - .gc_table(GcTableRequest { - catalog_name, - schema_name, - table_name, - full_file_listing: false, - timeout: DEFAULT_GC_TIMEOUT, - }) - .await?; - - Ok(Value::from(resp.processed_regions)) + Ok((catalog_name, schema_name, table_name, full_file_listing)) } fn gc_regions_signature() -> Signature { - Signature::variadic( - ConcreteDataType::numerics() - .into_iter() - .map(|dt| dt.as_arrow_type()) - .collect(), - Volatility::Immutable, - ) + Signature::variadic_any(Volatility::Immutable) } fn gc_table_signature() -> Signature { Signature::one_of( - vec![TypeSignature::Uniform(1, vec![ArrowDataType::Utf8])], + vec![ + TypeSignature::Uniform(1, vec![ArrowDataType::Utf8]), + TypeSignature::Exact(vec![ArrowDataType::Utf8, ArrowDataType::Boolean]), + ], Volatility::Immutable, ) } + +#[cfg(test)] +mod tests { + use session::context::QueryContext; + + use super::*; + + #[test] + fn test_parse_gc_regions_params_with_full_file_listing() { + let params = vec![ + ValueRef::UInt64(1), + ValueRef::UInt64(2), + ValueRef::Boolean(true), + ]; + let (region_ids, full_file_listing) = parse_gc_regions_params(¶ms).unwrap(); + + assert_eq!(region_ids, vec![1, 2]); + assert!(full_file_listing); + } + + #[test] + fn test_parse_gc_regions_params_default_full_file_listing() { + let params = vec![ValueRef::UInt64(1), ValueRef::UInt32(2)]; + let (region_ids, full_file_listing) = parse_gc_regions_params(¶ms).unwrap(); + + assert_eq!(region_ids, vec![1, 2]); + assert!(!full_file_listing); + } + + #[test] + fn test_parse_gc_table_params_with_full_file_listing() { + let params = vec![ValueRef::String("public.t"), ValueRef::Boolean(true)]; + let (catalog, schema, table, full_file_listing) = + parse_gc_table_params(¶ms, &QueryContext::arc()).unwrap(); + + assert_eq!(catalog, "greptime"); + assert_eq!(schema, "public"); + assert_eq!(table, "t"); + assert!(full_file_listing); + } +} diff --git a/src/meta-client/src/client/procedure.rs b/src/meta-client/src/client/procedure.rs index b6eb07e686..93e37511d9 100644 --- a/src/meta-client/src/client/procedure.rs +++ b/src/meta-client/src/client/procedure.rs @@ -276,6 +276,7 @@ impl Inner { } async fn gc_regions(&self, request: MetaGcRegionsRequest) -> Result { + let timeout = request.timeout; let req = GcRegionsRequest { header: Some(RequestHeader { protocol_version: 0, @@ -285,7 +286,7 @@ impl Inner { }), region_ids: request.region_ids, full_file_listing: request.full_file_listing, - timeout_secs: request.timeout.as_secs() as u32, + timeout_secs: timeout.as_secs() as u32, }; let resp: GcRegionsResponse = self @@ -293,7 +294,7 @@ impl Inner { "gc_regions", move |mut client| { let mut req = Request::new(req.clone()); - req.set_timeout(self.timeout); + req.set_timeout(timeout); async move { client.gc_regions(req).await.map(|res| res.into_inner()) } }, |resp: &GcRegionsResponse| &resp.header, @@ -310,6 +311,7 @@ impl Inner { } async fn gc_table(&self, request: MetaGcTableRequest) -> Result { + let timeout = request.timeout; let req = GcTableRequest { header: Some(RequestHeader { protocol_version: 0, @@ -321,7 +323,7 @@ impl Inner { schema_name: request.schema_name, table_name: request.table_name, full_file_listing: request.full_file_listing, - timeout_secs: request.timeout.as_secs() as u32, + timeout_secs: timeout.as_secs() as u32, }; let resp: GcTableResponse = self @@ -329,7 +331,7 @@ impl Inner { "gc_table", move |mut client| { let mut req = Request::new(req.clone()); - req.set_timeout(self.timeout); + req.set_timeout(timeout); async move { client.gc_table(req).await.map(|res| res.into_inner()) } }, |resp: &GcTableResponse| &resp.header, diff --git a/tests-integration/src/tests/gc/admin.rs b/tests-integration/src/tests/gc/admin.rs index 9ac2bb8281..b9e08392f5 100644 --- a/tests-integration/src/tests/gc/admin.rs +++ b/tests-integration/src/tests/gc/admin.rs @@ -14,8 +14,6 @@ //! Integration tests for `ADMIN GC_TABLE` and `ADMIN GC_REGIONS` functions. -use std::time::Duration; - use client::OutputData; use common_meta::key::table_repart::TableRepartValue; use common_recordbatch::RecordBatches; @@ -93,9 +91,6 @@ async fn test_admin_gc_table(store_type: &StorageType) { let compact_sql = "ADMIN COMPACT_TABLE('test_admin_gc_table')"; execute_sql(&instance, compact_sql).await; - // Wait for compaction to complete - tokio::time::sleep(Duration::from_secs(2)).await; - // List SST files after compaction (should have both old and new files) let sst_files_after_compaction = list_sst_files(&test_context).await; info!( @@ -216,9 +211,6 @@ async fn test_admin_gc_regions(store_type: &StorageType) { let compact_sql = "ADMIN COMPACT_TABLE('test_admin_gc_regions')"; execute_sql(&instance, compact_sql).await; - // Wait for compaction to complete - tokio::time::sleep(Duration::from_secs(2)).await; - // List SST files after compaction (should have both old and new files) let sst_files_after_compaction = list_sst_files(&test_context).await; info!( @@ -370,7 +362,6 @@ async fn test_admin_gc_missing_cases(store_type: &StorageType) { let sst_before_compaction = list_sst_files(&test_context).await; let compact_sql = format!("ADMIN COMPACT_TABLE('{multi_region_table}')"); execute_sql(&instance, &compact_sql).await; - tokio::time::sleep(Duration::from_secs(2)).await; let sst_after_compaction = list_sst_files(&test_context).await; assert!(sst_after_compaction.len() >= sst_before_compaction.len()); let region_ids = fetch_region_ids(&instance, multi_region_table).await;