per review

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-03-05 19:25:16 +08:00
parent 41fec4acbb
commit 19e1b84292
3 changed files with 128 additions and 54 deletions

View File

@@ -31,6 +31,7 @@ use crate::handlers::ProcedureServiceHandlerRef;
use crate::helper::cast_u64;
const DEFAULT_GC_TIMEOUT: Duration = Duration::from_secs(60);
const DEFAULT_FULL_FILE_LISTING: bool = false;
#[admin_fn(
name = GcRegionsFunction,
@@ -43,30 +44,12 @@ pub(crate) async fn gc_regions(
_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
ensure!(
!params.is_empty(),
InvalidFuncArgsSnafu {
err_msg: "The length of the args is not correct, expect at least 1 region id, have 0"
.to_string(),
}
);
let mut region_ids = Vec::with_capacity(params.len());
for param in params {
let Some(region_id) = cast_u64(param)? else {
return UnsupportedInputDataTypeSnafu {
function: "gc_regions",
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
};
region_ids.push(region_id);
}
let (region_ids, full_file_listing) = parse_gc_regions_params(params)?;
let resp = procedure_service_handler
.gc_regions(GcRegionsRequest {
region_ids,
full_file_listing: false,
full_file_listing,
timeout: DEFAULT_GC_TIMEOUT,
})
.await?;
@@ -85,11 +68,68 @@ pub(crate) async fn gc_table(
query_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
let (catalog_name, schema_name, table_name, full_file_listing) =
parse_gc_table_params(params, query_ctx)?;
let resp = procedure_service_handler
.gc_table(GcTableRequest {
catalog_name,
schema_name,
table_name,
full_file_listing,
timeout: DEFAULT_GC_TIMEOUT,
})
.await?;
Ok(Value::from(resp.processed_regions))
}
fn parse_gc_regions_params(params: &[ValueRef<'_>]) -> Result<(Vec<u64>, bool)> {
ensure!(
params.len() == 1,
!params.is_empty(),
InvalidFuncArgsSnafu {
err_msg: "The length of the args is not correct, expect at least 1 region id, have 0"
.to_string(),
}
);
let (full_file_listing, region_params) = match params.last() {
Some(ValueRef::Boolean(value)) => (*value, &params[..params.len() - 1]),
_ => (DEFAULT_FULL_FILE_LISTING, params),
};
ensure!(
!region_params.is_empty(),
InvalidFuncArgsSnafu {
err_msg: "The length of the args is not correct, expect at least 1 region id"
.to_string(),
}
);
let mut region_ids = Vec::with_capacity(region_params.len());
for param in region_params {
let Some(region_id) = cast_u64(param)? else {
return UnsupportedInputDataTypeSnafu {
function: "gc_regions",
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
};
region_ids.push(region_id);
}
Ok((region_ids, full_file_listing))
}
fn parse_gc_table_params(
params: &[ValueRef<'_>],
query_ctx: &QueryContextRef,
) -> Result<(String, String, String, bool)> {
ensure!(
matches!(params.len(), 1 | 2),
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 1, have: {}",
"The length of the args is not correct, expect 1 or 2, have: {}",
params.len()
),
}
@@ -103,37 +143,78 @@ pub(crate) async fn gc_table(
.fail();
};
let full_file_listing = if params.len() == 2 {
let ValueRef::Boolean(value) = params[1] else {
return UnsupportedInputDataTypeSnafu {
function: "gc_table",
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
};
value
} else {
DEFAULT_FULL_FILE_LISTING
};
let (catalog_name, schema_name, table_name) =
session::table_name::table_name_to_full_name(table_name, query_ctx)
.map_err(BoxedError::new)
.context(TableMutationSnafu)?;
let resp = procedure_service_handler
.gc_table(GcTableRequest {
catalog_name,
schema_name,
table_name,
full_file_listing: false,
timeout: DEFAULT_GC_TIMEOUT,
})
.await?;
Ok(Value::from(resp.processed_regions))
Ok((catalog_name, schema_name, table_name, full_file_listing))
}
fn gc_regions_signature() -> Signature {
Signature::variadic(
ConcreteDataType::numerics()
.into_iter()
.map(|dt| dt.as_arrow_type())
.collect(),
Volatility::Immutable,
)
Signature::variadic_any(Volatility::Immutable)
}
fn gc_table_signature() -> Signature {
Signature::one_of(
vec![TypeSignature::Uniform(1, vec![ArrowDataType::Utf8])],
vec![
TypeSignature::Uniform(1, vec![ArrowDataType::Utf8]),
TypeSignature::Exact(vec![ArrowDataType::Utf8, ArrowDataType::Boolean]),
],
Volatility::Immutable,
)
}
#[cfg(test)]
mod tests {
use session::context::QueryContext;
use super::*;
#[test]
fn test_parse_gc_regions_params_with_full_file_listing() {
let params = vec![
ValueRef::UInt64(1),
ValueRef::UInt64(2),
ValueRef::Boolean(true),
];
let (region_ids, full_file_listing) = parse_gc_regions_params(&params).unwrap();
assert_eq!(region_ids, vec![1, 2]);
assert!(full_file_listing);
}
#[test]
fn test_parse_gc_regions_params_default_full_file_listing() {
let params = vec![ValueRef::UInt64(1), ValueRef::UInt32(2)];
let (region_ids, full_file_listing) = parse_gc_regions_params(&params).unwrap();
assert_eq!(region_ids, vec![1, 2]);
assert!(!full_file_listing);
}
#[test]
fn test_parse_gc_table_params_with_full_file_listing() {
let params = vec![ValueRef::String("public.t"), ValueRef::Boolean(true)];
let (catalog, schema, table, full_file_listing) =
parse_gc_table_params(&params, &QueryContext::arc()).unwrap();
assert_eq!(catalog, "greptime");
assert_eq!(schema, "public");
assert_eq!(table, "t");
assert!(full_file_listing);
}
}

View File

@@ -276,6 +276,7 @@ impl Inner {
}
async fn gc_regions(&self, request: MetaGcRegionsRequest) -> Result<MetaGcResponse> {
let timeout = request.timeout;
let req = GcRegionsRequest {
header: Some(RequestHeader {
protocol_version: 0,
@@ -285,7 +286,7 @@ impl Inner {
}),
region_ids: request.region_ids,
full_file_listing: request.full_file_listing,
timeout_secs: request.timeout.as_secs() as u32,
timeout_secs: timeout.as_secs() as u32,
};
let resp: GcRegionsResponse = self
@@ -293,7 +294,7 @@ impl Inner {
"gc_regions",
move |mut client| {
let mut req = Request::new(req.clone());
req.set_timeout(self.timeout);
req.set_timeout(timeout);
async move { client.gc_regions(req).await.map(|res| res.into_inner()) }
},
|resp: &GcRegionsResponse| &resp.header,
@@ -310,6 +311,7 @@ impl Inner {
}
async fn gc_table(&self, request: MetaGcTableRequest) -> Result<MetaGcResponse> {
let timeout = request.timeout;
let req = GcTableRequest {
header: Some(RequestHeader {
protocol_version: 0,
@@ -321,7 +323,7 @@ impl Inner {
schema_name: request.schema_name,
table_name: request.table_name,
full_file_listing: request.full_file_listing,
timeout_secs: request.timeout.as_secs() as u32,
timeout_secs: timeout.as_secs() as u32,
};
let resp: GcTableResponse = self
@@ -329,7 +331,7 @@ impl Inner {
"gc_table",
move |mut client| {
let mut req = Request::new(req.clone());
req.set_timeout(self.timeout);
req.set_timeout(timeout);
async move { client.gc_table(req).await.map(|res| res.into_inner()) }
},
|resp: &GcTableResponse| &resp.header,

View File

@@ -14,8 +14,6 @@
//! Integration tests for `ADMIN GC_TABLE` and `ADMIN GC_REGIONS` functions.
use std::time::Duration;
use client::OutputData;
use common_meta::key::table_repart::TableRepartValue;
use common_recordbatch::RecordBatches;
@@ -93,9 +91,6 @@ async fn test_admin_gc_table(store_type: &StorageType) {
let compact_sql = "ADMIN COMPACT_TABLE('test_admin_gc_table')";
execute_sql(&instance, compact_sql).await;
// Wait for compaction to complete
tokio::time::sleep(Duration::from_secs(2)).await;
// List SST files after compaction (should have both old and new files)
let sst_files_after_compaction = list_sst_files(&test_context).await;
info!(
@@ -216,9 +211,6 @@ async fn test_admin_gc_regions(store_type: &StorageType) {
let compact_sql = "ADMIN COMPACT_TABLE('test_admin_gc_regions')";
execute_sql(&instance, compact_sql).await;
// Wait for compaction to complete
tokio::time::sleep(Duration::from_secs(2)).await;
// List SST files after compaction (should have both old and new files)
let sst_files_after_compaction = list_sst_files(&test_context).await;
info!(
@@ -370,7 +362,6 @@ async fn test_admin_gc_missing_cases(store_type: &StorageType) {
let sst_before_compaction = list_sst_files(&test_context).await;
let compact_sql = format!("ADMIN COMPACT_TABLE('{multi_region_table}')");
execute_sql(&instance, &compact_sql).await;
tokio::time::sleep(Duration::from_secs(2)).await;
let sst_after_compaction = list_sst_files(&test_context).await;
assert!(sst_after_compaction.len() >= sst_before_compaction.len());
let region_ids = fetch_region_ids(&instance, multi_region_table).await;