diff --git a/src/meta-srv/src/gc/scheduler.rs b/src/meta-srv/src/gc/scheduler.rs index 0a04b16439..772c7e7cb6 100644 --- a/src/meta-srv/src/gc/scheduler.rs +++ b/src/meta-srv/src/gc/scheduler.rs @@ -86,13 +86,13 @@ impl GcJobReport { /// Variants: /// - `Tick`: This event is used to trigger gc periodically. /// - `Manually`: This event is used to trigger a manual gc run and provides a channel -/// to send back the [`GcJobReport`] for that run. +/// to send back the result for that run. /// Optional parameters allow specifying target regions and GC behavior. pub enum Event { Tick, Manually { - /// Channel sender to return the GC job report - sender: oneshot::Sender, + /// Channel sender to return the GC job report or error + sender: oneshot::Sender>, /// Optional specific region IDs to GC. If None, scheduler will select candidates automatically. region_ids: Option>, /// Optional override for full file listing. If None, uses scheduler config. @@ -187,21 +187,14 @@ impl GcScheduler { info!("Received manually gc request"); let span = common_telemetry::tracing::info_span!("meta_gc_tick", trigger = "manual"); - match self + let result = self .handle_manual_gc(region_ids, full_file_listing, timeout) .instrument(span) - .await - { - Ok(report) => { - // ignore error - let _ = sender.send(report); - } - Err(e) => { - error!(e; "Failed to handle manual gc"); - // Send empty report on error to avoid blocking caller - let _ = sender.send(GcJobReport::default()); - } - }; + .await; + if let Err(e) = &result { + error!(e; "Failed to handle manual gc"); + } + let _ = sender.send(result); } } } @@ -302,3 +295,81 @@ impl GcScheduler { Ok(report) } } + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::time::Duration; + + use common_meta::datanode::RegionStat; + use common_meta::key::table_repart::TableRepartValue; + use common_meta::key::table_route::PhysicalTableRouteValue; + use store_api::storage::RegionId; + use table::metadata::TableId; + + use super::*; + + struct ErrorMockSchedulerCtx; + + #[async_trait::async_trait] + impl SchedulerCtx for ErrorMockSchedulerCtx { + async fn get_table_to_region_stats(&self) -> Result>> { + Ok(HashMap::new()) + } + + async fn get_table_reparts(&self) -> Result> { + Ok(vec![]) + } + + async fn get_table_route( + &self, + _table_id: TableId, + ) -> Result<(TableId, PhysicalTableRouteValue)> { + unreachable!("get_table_route should not be called in this test") + } + + async fn batch_get_table_route( + &self, + _table_ids: &[TableId], + ) -> Result> { + Ok(HashMap::new()) + } + + async fn gc_regions( + &self, + _region_ids: &[RegionId], + _full_file_listing: bool, + _timeout: Duration, + _region_routes_override: Region2Peers, + ) -> Result { + crate::error::UnexpectedSnafu { + violated: "mock gc failure".to_string(), + } + .fail() + } + } + + #[tokio::test] + async fn test_handle_manual_gc_propagates_error() { + let (tx, rx) = GcScheduler::channel(); + drop(tx); + + let scheduler = GcScheduler { + ctx: Arc::new(ErrorMockSchedulerCtx), + receiver: rx, + config: GcSchedulerOptions::default(), + region_gc_tracker: Arc::new(Mutex::new(HashMap::new())), + last_tracker_cleanup: Arc::new(Mutex::new(Instant::now())), + }; + + let result = scheduler + .handle_manual_gc( + Some(vec![RegionId::new(1, 0)]), + Some(false), + Some(Duration::from_secs(1)), + ) + .await; + + assert!(result.is_err()); + } +} diff --git a/src/meta-srv/src/service/procedure.rs b/src/meta-srv/src/service/procedure.rs index e09f0ec1bc..21c573c518 100644 --- a/src/meta-srv/src/service/procedure.rs +++ b/src/meta-srv/src/service/procedure.rs @@ -376,7 +376,7 @@ impl Metasrv { violated: "GC job channel closed unexpectedly".to_string(), } .build() - })?; + })??; let report = gc_job_report_to_gc_report(job_report); diff --git a/tests-integration/tests/repartition.rs b/tests-integration/tests/repartition.rs index b7f6acd315..ce8ec5bf96 100644 --- a/tests-integration/tests/repartition.rs +++ b/tests-integration/tests/repartition.rs @@ -135,7 +135,7 @@ async fn trigger_full_gc(ticker: &GcTickerRef) { }) .await .unwrap(); - let _ = rx.await.unwrap(); + let _ = rx.await.unwrap().unwrap(); } fn query_partitions_sql(table_name: &str) -> String {