diff --git a/src/meta-srv/src/gc.rs b/src/meta-srv/src/gc.rs index c9d346e5f8..f9b3869bc6 100644 --- a/src/meta-srv/src/gc.rs +++ b/src/meta-srv/src/gc.rs @@ -31,7 +31,7 @@ mod util; pub use options::GcSchedulerOptions; pub use procedure::BatchGcProcedure; -pub use scheduler::{Event, GcScheduler, GcTickerRef}; +pub use scheduler::{Event, GcJobReport, GcScheduler, GcTickerRef}; /// Mapping from region ID to its associated peers (leader and followers). pub type Region2Peers = HashMap)>; diff --git a/src/meta-srv/src/gc/scheduler.rs b/src/meta-srv/src/gc/scheduler.rs index a275c64452..8f09edcb5a 100644 --- a/src/meta-srv/src/gc/scheduler.rs +++ b/src/meta-srv/src/gc/scheduler.rs @@ -14,20 +14,21 @@ use std::collections::HashMap; use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; use common_meta::DatanodeId; use common_meta::key::TableMetadataManagerRef; use common_procedure::ProcedureManagerRef; use common_telemetry::tracing::Instrument as _; use common_telemetry::{error, info}; -use store_api::storage::GcReport; +use store_api::storage::{GcReport, RegionId}; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::{Mutex, oneshot}; use crate::cluster::MetaPeerClientRef; use crate::define_ticker; use crate::error::{Error, Result}; +use crate::gc::Region2Peers; use crate::gc::ctx::{DefaultGcSchedulerCtx, SchedulerCtx}; use crate::gc::options::{GcSchedulerOptions, TICKER_INTERVAL}; use crate::gc::tracker::RegionGcTracker; @@ -49,9 +50,19 @@ pub struct GcJobReport { /// - `Tick`: This event is used to trigger gc periodically. /// - `Manually`: This event is used to trigger a manual gc run and provides a channel /// to send back the [`GcJobReport`] for that run. +/// Optional parameters allow specifying target regions and GC behavior. pub enum Event { Tick, - Manually(oneshot::Sender), + Manually { + /// Channel sender to return the GC job report + sender: oneshot::Sender, + /// Optional specific region IDs to GC. If None, scheduler will select candidates automatically. + region_ids: Option>, + /// Optional override for full file listing. If None, uses scheduler config. + full_file_listing: Option, + /// Optional override for timeout. If None, uses scheduler config. + timeout: Option, + }, } #[allow(unused)] @@ -130,17 +141,27 @@ impl GcScheduler { error!(e; "Failed to handle gc tick"); } } - Event::Manually(sender) => { + Event::Manually { + sender, + region_ids, + full_file_listing, + timeout, + } => { info!("Received manually gc request"); let span = common_telemetry::tracing::info_span!("meta_gc_tick", trigger = "manual"); - match self.handle_tick().instrument(span).await { + match self + .handle_manual_gc(region_ids, full_file_listing, timeout) + .await + { Ok(report) => { // ignore error let _ = sender.send(report); } Err(e) => { - error!(e; "Failed to handle gc tick"); + error!(e; "Failed to handle manual gc"); + // Send empty report on error to avoid blocking caller + let _ = sender.send(GcJobReport::default()); } }; } @@ -162,4 +183,40 @@ impl GcScheduler { Ok(report) } + + /// Handles a manual GC request with optional specific parameters. + /// + /// If `region_ids` is specified, GC will be performed only on those regions. + /// Otherwise, falls back to automatic candidate selection. + pub(crate) async fn handle_manual_gc( + &self, + region_ids: Option>, + full_file_listing: Option, + timeout: Option, + ) -> Result { + info!("Start to handle manual gc request"); + + let report = if let Some(regions) = region_ids { + let full_listing = full_file_listing.unwrap_or(false); + let gc_timeout = timeout.unwrap_or(self.config.mailbox_timeout); + + let gc_report = self + .ctx + .gc_regions(®ions, full_listing, gc_timeout, Region2Peers::new()) + .await?; + + let mut per_datanode_reports = HashMap::new(); + per_datanode_reports.insert(0, gc_report); + GcJobReport { + per_datanode_reports, + failed_datanodes: HashMap::new(), + } + } else { + // No specific regions, use default tick behavior + self.trigger_gc().await? + }; + + info!("Finished manual gc request"); + Ok(report) + } } diff --git a/src/meta-srv/src/service/procedure.rs b/src/meta-srv/src/service/procedure.rs index 60f74df43a..eb37bbddd9 100644 --- a/src/meta-srv/src/service/procedure.rs +++ b/src/meta-srv/src/service/procedure.rs @@ -30,20 +30,18 @@ use common_meta::rpc::procedure::{ self, GcRegionsRequest as MetaGcRegionsRequest, GcResponse, GcTableRequest as MetaGcTableRequest, }; -use common_procedure::{ProcedureWithId, watcher}; use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; use table::table_reference::TableReference; use tonic::Request; use crate::error::{TableMetadataManagerSnafu, TableNotFoundSnafu}; -use crate::gc::{BatchGcProcedure, Region2Peers}; use crate::metasrv::Metasrv; use crate::procedure::region_migration::manager::{ RegionMigrationProcedureTask, RegionMigrationTriggerReason, }; use crate::service::GrpcResult; -use crate::{check_leader, error}; +use crate::{check_leader, error, gc}; #[async_trait::async_trait] impl procedure_service_server::ProcedureService for Metasrv { @@ -307,18 +305,42 @@ impl procedure_service_server::ProcedureService for Metasrv { impl Metasrv { async fn handle_gc_regions(&self, request: MetaGcRegionsRequest) -> error::Result { - let region_ids = request + let region_ids: Vec = request .region_ids .into_iter() .map(RegionId::from_u64) - .collect::>(); - let report = self - .run_gc_procedure( - region_ids.clone(), - request.full_file_listing, - request.timeout, - ) - .await?; + .collect(); + + // Use GcTickerRef to trigger manual GC + let gc_ticker = self.gc_ticker().context(error::UnexpectedSnafu { + violated: "GC ticker not available".to_string(), + })?; + + let (tx, rx) = tokio::sync::oneshot::channel(); + gc_ticker + .sender + .send(gc::Event::Manually { + sender: tx, + region_ids: Some(region_ids.clone()), + full_file_listing: Some(request.full_file_listing), + timeout: Some(request.timeout), + }) + .await + .map_err(|_| { + error::UnexpectedSnafu { + violated: "Failed to send GC event".to_string(), + } + .build() + })?; + + let job_report = rx.await.map_err(|_| { + error::UnexpectedSnafu { + violated: "GC job channel closed unexpectedly".to_string(), + } + .build() + })?; + + let report = gc_job_report_to_gc_report(job_report); Ok(gc_report_to_response(&report, region_ids.len() as u64)) } @@ -347,55 +369,50 @@ impl Metasrv { .await .context(TableMetadataManagerSnafu)?; - let region_ids = route - .region_routes - .iter() - .map(|r| r.region.id) - .collect::>(); + let region_ids: Vec = route.region_routes.iter().map(|r| r.region.id).collect(); - let report = self - .run_gc_procedure( - region_ids.clone(), - request.full_file_listing, - request.timeout, - ) - .await?; + // Use GcTickerRef to trigger manual GC + let gc_ticker = self.gc_ticker().context(error::UnexpectedSnafu { + violated: "GC ticker not available".to_string(), + })?; + + let (tx, rx) = tokio::sync::oneshot::channel(); + gc_ticker + .sender + .send(gc::Event::Manually { + sender: tx, + region_ids: Some(region_ids.clone()), + full_file_listing: Some(request.full_file_listing), + timeout: Some(request.timeout), + }) + .await + .map_err(|_| { + error::UnexpectedSnafu { + violated: "Failed to send GC event".to_string(), + } + .build() + })?; + + let job_report = rx.await.map_err(|_| { + error::UnexpectedSnafu { + violated: "GC job channel closed unexpectedly".to_string(), + } + .build() + })?; + + let report = gc_job_report_to_gc_report(job_report); Ok(gc_report_to_response(&report, region_ids.len() as u64)) } +} - async fn run_gc_procedure( - &self, - region_ids: Vec, - full_file_listing: bool, - timeout: Duration, - ) -> error::Result { - let procedure = BatchGcProcedure::new( - self.mailbox().clone(), - self.table_metadata_manager().clone(), - self.options().grpc.server_addr.clone(), - region_ids, - full_file_listing, - timeout, - Region2Peers::new(), - ); - - let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); - let mut watcher = self - .procedure_manager() - .submit(procedure_with_id) - .await - .context(error::SubmitProcedureSnafu)?; - - let res = watcher::wait(&mut watcher) - .await - .context(error::WaitProcedureSnafu)? - .with_context(|| error::UnexpectedSnafu { - violated: "GC procedure completed but returned no result".to_string(), - })?; - - BatchGcProcedure::cast_result(res) +fn gc_job_report_to_gc_report(job_report: crate::gc::GcJobReport) -> store_api::storage::GcReport { + // Merge all datanode reports into a single GcReport + let mut gc_report = store_api::storage::GcReport::default(); + for (_datanode_id, report) in job_report.per_datanode_reports { + gc_report.merge(report); } + gc_report } fn gc_report_to_response( diff --git a/tests-integration/tests/repartition.rs b/tests-integration/tests/repartition.rs index 4347417b9a..b7f6acd315 100644 --- a/tests-integration/tests/repartition.rs +++ b/tests-integration/tests/repartition.rs @@ -125,7 +125,16 @@ async fn trigger_table_gc(metasrv: &Arc, table_name: &str) { async fn trigger_full_gc(ticker: &GcTickerRef) { info!("triggering full gc"); let (tx, rx) = oneshot::channel(); - ticker.sender.send(gc::Event::Manually(tx)).await.unwrap(); + ticker + .sender + .send(gc::Event::Manually { + sender: tx, + region_ids: None, + full_file_listing: None, + timeout: None, + }) + .await + .unwrap(); let _ = rx.await.unwrap(); }