refactor: use gc ticker for admin gc

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-02-04 21:45:18 +08:00
parent 60e5b8bdc6
commit 06dafc57b5
4 changed files with 146 additions and 63 deletions

View File

@@ -31,7 +31,7 @@ mod util;
pub use options::GcSchedulerOptions;
pub use procedure::BatchGcProcedure;
pub use scheduler::{Event, GcScheduler, GcTickerRef};
pub use scheduler::{Event, GcJobReport, GcScheduler, GcTickerRef};
/// Mapping from region ID to its associated peers (leader and followers).
pub type Region2Peers = HashMap<RegionId, (Peer, Vec<Peer>)>;

View File

@@ -14,20 +14,21 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
use std::time::{Duration, Instant};
use common_meta::DatanodeId;
use common_meta::key::TableMetadataManagerRef;
use common_procedure::ProcedureManagerRef;
use common_telemetry::tracing::Instrument as _;
use common_telemetry::{error, info};
use store_api::storage::GcReport;
use store_api::storage::{GcReport, RegionId};
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::{Mutex, oneshot};
use crate::cluster::MetaPeerClientRef;
use crate::define_ticker;
use crate::error::{Error, Result};
use crate::gc::Region2Peers;
use crate::gc::ctx::{DefaultGcSchedulerCtx, SchedulerCtx};
use crate::gc::options::{GcSchedulerOptions, TICKER_INTERVAL};
use crate::gc::tracker::RegionGcTracker;
@@ -49,9 +50,19 @@ pub struct GcJobReport {
/// - `Tick`: This event is used to trigger gc periodically.
/// - `Manually`: This event is used to trigger a manual gc run and provides a channel
/// to send back the [`GcJobReport`] for that run.
/// Optional parameters allow specifying target regions and GC behavior.
pub enum Event {
Tick,
Manually(oneshot::Sender<GcJobReport>),
Manually {
/// Channel sender to return the GC job report
sender: oneshot::Sender<GcJobReport>,
/// Optional specific region IDs to GC. If None, scheduler will select candidates automatically.
region_ids: Option<Vec<RegionId>>,
/// Optional override for full file listing. If None, uses scheduler config.
full_file_listing: Option<bool>,
/// Optional override for timeout. If None, uses scheduler config.
timeout: Option<Duration>,
},
}
#[allow(unused)]
@@ -130,17 +141,27 @@ impl GcScheduler {
error!(e; "Failed to handle gc tick");
}
}
Event::Manually(sender) => {
Event::Manually {
sender,
region_ids,
full_file_listing,
timeout,
} => {
info!("Received manually gc request");
let span =
common_telemetry::tracing::info_span!("meta_gc_tick", trigger = "manual");
match self.handle_tick().instrument(span).await {
match self
.handle_manual_gc(region_ids, full_file_listing, timeout)
.await
{
Ok(report) => {
// ignore error
let _ = sender.send(report);
}
Err(e) => {
error!(e; "Failed to handle gc tick");
error!(e; "Failed to handle manual gc");
// Send empty report on error to avoid blocking caller
let _ = sender.send(GcJobReport::default());
}
};
}
@@ -162,4 +183,40 @@ impl GcScheduler {
Ok(report)
}
/// Handles a manual GC request with optional specific parameters.
///
/// If `region_ids` is specified, GC will be performed only on those regions.
/// Otherwise, falls back to automatic candidate selection.
pub(crate) async fn handle_manual_gc(
&self,
region_ids: Option<Vec<RegionId>>,
full_file_listing: Option<bool>,
timeout: Option<Duration>,
) -> Result<GcJobReport> {
info!("Start to handle manual gc request");
let report = if let Some(regions) = region_ids {
let full_listing = full_file_listing.unwrap_or(false);
let gc_timeout = timeout.unwrap_or(self.config.mailbox_timeout);
let gc_report = self
.ctx
.gc_regions(&regions, full_listing, gc_timeout, Region2Peers::new())
.await?;
let mut per_datanode_reports = HashMap::new();
per_datanode_reports.insert(0, gc_report);
GcJobReport {
per_datanode_reports,
failed_datanodes: HashMap::new(),
}
} else {
// No specific regions, use default tick behavior
self.trigger_gc().await?
};
info!("Finished manual gc request");
Ok(report)
}
}

View File

@@ -30,20 +30,18 @@ use common_meta::rpc::procedure::{
self, GcRegionsRequest as MetaGcRegionsRequest, GcResponse,
GcTableRequest as MetaGcTableRequest,
};
use common_procedure::{ProcedureWithId, watcher};
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use table::table_reference::TableReference;
use tonic::Request;
use crate::error::{TableMetadataManagerSnafu, TableNotFoundSnafu};
use crate::gc::{BatchGcProcedure, Region2Peers};
use crate::metasrv::Metasrv;
use crate::procedure::region_migration::manager::{
RegionMigrationProcedureTask, RegionMigrationTriggerReason,
};
use crate::service::GrpcResult;
use crate::{check_leader, error};
use crate::{check_leader, error, gc};
#[async_trait::async_trait]
impl procedure_service_server::ProcedureService for Metasrv {
@@ -307,18 +305,42 @@ impl procedure_service_server::ProcedureService for Metasrv {
impl Metasrv {
async fn handle_gc_regions(&self, request: MetaGcRegionsRequest) -> error::Result<GcResponse> {
let region_ids = request
let region_ids: Vec<RegionId> = request
.region_ids
.into_iter()
.map(RegionId::from_u64)
.collect::<Vec<_>>();
let report = self
.run_gc_procedure(
region_ids.clone(),
request.full_file_listing,
request.timeout,
)
.await?;
.collect();
// Use GcTickerRef to trigger manual GC
let gc_ticker = self.gc_ticker().context(error::UnexpectedSnafu {
violated: "GC ticker not available".to_string(),
})?;
let (tx, rx) = tokio::sync::oneshot::channel();
gc_ticker
.sender
.send(gc::Event::Manually {
sender: tx,
region_ids: Some(region_ids.clone()),
full_file_listing: Some(request.full_file_listing),
timeout: Some(request.timeout),
})
.await
.map_err(|_| {
error::UnexpectedSnafu {
violated: "Failed to send GC event".to_string(),
}
.build()
})?;
let job_report = rx.await.map_err(|_| {
error::UnexpectedSnafu {
violated: "GC job channel closed unexpectedly".to_string(),
}
.build()
})?;
let report = gc_job_report_to_gc_report(job_report);
Ok(gc_report_to_response(&report, region_ids.len() as u64))
}
@@ -347,55 +369,50 @@ impl Metasrv {
.await
.context(TableMetadataManagerSnafu)?;
let region_ids = route
.region_routes
.iter()
.map(|r| r.region.id)
.collect::<Vec<_>>();
let region_ids: Vec<RegionId> = route.region_routes.iter().map(|r| r.region.id).collect();
let report = self
.run_gc_procedure(
region_ids.clone(),
request.full_file_listing,
request.timeout,
)
.await?;
// Use GcTickerRef to trigger manual GC
let gc_ticker = self.gc_ticker().context(error::UnexpectedSnafu {
violated: "GC ticker not available".to_string(),
})?;
let (tx, rx) = tokio::sync::oneshot::channel();
gc_ticker
.sender
.send(gc::Event::Manually {
sender: tx,
region_ids: Some(region_ids.clone()),
full_file_listing: Some(request.full_file_listing),
timeout: Some(request.timeout),
})
.await
.map_err(|_| {
error::UnexpectedSnafu {
violated: "Failed to send GC event".to_string(),
}
.build()
})?;
let job_report = rx.await.map_err(|_| {
error::UnexpectedSnafu {
violated: "GC job channel closed unexpectedly".to_string(),
}
.build()
})?;
let report = gc_job_report_to_gc_report(job_report);
Ok(gc_report_to_response(&report, region_ids.len() as u64))
}
}
async fn run_gc_procedure(
&self,
region_ids: Vec<RegionId>,
full_file_listing: bool,
timeout: Duration,
) -> error::Result<store_api::storage::GcReport> {
let procedure = BatchGcProcedure::new(
self.mailbox().clone(),
self.table_metadata_manager().clone(),
self.options().grpc.server_addr.clone(),
region_ids,
full_file_listing,
timeout,
Region2Peers::new(),
);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
let mut watcher = self
.procedure_manager()
.submit(procedure_with_id)
.await
.context(error::SubmitProcedureSnafu)?;
let res = watcher::wait(&mut watcher)
.await
.context(error::WaitProcedureSnafu)?
.with_context(|| error::UnexpectedSnafu {
violated: "GC procedure completed but returned no result".to_string(),
})?;
BatchGcProcedure::cast_result(res)
fn gc_job_report_to_gc_report(job_report: crate::gc::GcJobReport) -> store_api::storage::GcReport {
// Merge all datanode reports into a single GcReport
let mut gc_report = store_api::storage::GcReport::default();
for (_datanode_id, report) in job_report.per_datanode_reports {
gc_report.merge(report);
}
gc_report
}
fn gc_report_to_response(

View File

@@ -125,7 +125,16 @@ async fn trigger_table_gc(metasrv: &Arc<Metasrv>, table_name: &str) {
async fn trigger_full_gc(ticker: &GcTickerRef) {
info!("triggering full gc");
let (tx, rx) = oneshot::channel();
ticker.sender.send(gc::Event::Manually(tx)).await.unwrap();
ticker
.sender
.send(gc::Event::Manually {
sender: tx,
region_ids: None,
full_file_listing: None,
timeout: None,
})
.await
.unwrap();
let _ = rx.await.unwrap();
}