feat: allow manual gc to return error

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-03-03 14:35:54 +08:00
parent 2418afe347
commit 026525fcc9
3 changed files with 89 additions and 18 deletions

View File

@@ -86,13 +86,13 @@ impl GcJobReport {
/// Variants:
/// - `Tick`: This event is used to trigger gc periodically.
/// - `Manually`: This event is used to trigger a manual gc run and provides a channel
/// to send back the [`GcJobReport`] for that run.
/// to send back the result for that run.
/// Optional parameters allow specifying target regions and GC behavior.
pub enum Event {
Tick,
Manually {
/// Channel sender to return the GC job report
sender: oneshot::Sender<GcJobReport>,
/// Channel sender to return the GC job report or error
sender: oneshot::Sender<Result<GcJobReport>>,
/// Optional specific region IDs to GC. If None, scheduler will select candidates automatically.
region_ids: Option<Vec<RegionId>>,
/// Optional override for full file listing. If None, uses scheduler config.
@@ -187,21 +187,14 @@ impl GcScheduler {
info!("Received manually gc request");
let span =
common_telemetry::tracing::info_span!("meta_gc_tick", trigger = "manual");
match self
let result = self
.handle_manual_gc(region_ids, full_file_listing, timeout)
.instrument(span)
.await
{
Ok(report) => {
// ignore error
let _ = sender.send(report);
}
Err(e) => {
error!(e; "Failed to handle manual gc");
// Send empty report on error to avoid blocking caller
let _ = sender.send(GcJobReport::default());
}
};
.await;
if let Err(e) = &result {
error!(e; "Failed to handle manual gc");
}
let _ = sender.send(result);
}
}
}
@@ -302,3 +295,81 @@ impl GcScheduler {
Ok(report)
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::time::Duration;
use common_meta::datanode::RegionStat;
use common_meta::key::table_repart::TableRepartValue;
use common_meta::key::table_route::PhysicalTableRouteValue;
use store_api::storage::RegionId;
use table::metadata::TableId;
use super::*;
struct ErrorMockSchedulerCtx;
#[async_trait::async_trait]
impl SchedulerCtx for ErrorMockSchedulerCtx {
async fn get_table_to_region_stats(&self) -> Result<HashMap<TableId, Vec<RegionStat>>> {
Ok(HashMap::new())
}
async fn get_table_reparts(&self) -> Result<Vec<(TableId, TableRepartValue)>> {
Ok(vec![])
}
async fn get_table_route(
&self,
_table_id: TableId,
) -> Result<(TableId, PhysicalTableRouteValue)> {
unreachable!("get_table_route should not be called in this test")
}
async fn batch_get_table_route(
&self,
_table_ids: &[TableId],
) -> Result<HashMap<TableId, PhysicalTableRouteValue>> {
Ok(HashMap::new())
}
async fn gc_regions(
&self,
_region_ids: &[RegionId],
_full_file_listing: bool,
_timeout: Duration,
_region_routes_override: Region2Peers,
) -> Result<GcReport> {
crate::error::UnexpectedSnafu {
violated: "mock gc failure".to_string(),
}
.fail()
}
}
#[tokio::test]
async fn test_handle_manual_gc_propagates_error() {
let (tx, rx) = GcScheduler::channel();
drop(tx);
let scheduler = GcScheduler {
ctx: Arc::new(ErrorMockSchedulerCtx),
receiver: rx,
config: GcSchedulerOptions::default(),
region_gc_tracker: Arc::new(Mutex::new(HashMap::new())),
last_tracker_cleanup: Arc::new(Mutex::new(Instant::now())),
};
let result = scheduler
.handle_manual_gc(
Some(vec![RegionId::new(1, 0)]),
Some(false),
Some(Duration::from_secs(1)),
)
.await;
assert!(result.is_err());
}
}

View File

@@ -376,7 +376,7 @@ impl Metasrv {
violated: "GC job channel closed unexpectedly".to_string(),
}
.build()
})?;
})??;
let report = gc_job_report_to_gc_report(job_report);

View File

@@ -135,7 +135,7 @@ async fn trigger_full_gc(ticker: &GcTickerRef) {
})
.await
.unwrap();
let _ = rx.await.unwrap();
let _ = rx.await.unwrap().unwrap();
}
fn query_partitions_sql(table_name: &str) -> String {