From 52a576cf6dd98ca8f9c1cd0da4cfdffe072ea6ce Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Mon, 24 Nov 2025 15:57:18 +0800 Subject: [PATCH] feat: basic gc scheduler (#7263) * feat: basic gc scheduler Signed-off-by: discord9 * refactor: rm dup code Signed-off-by: discord9 * docs: todo for cleaner code Signed-off-by: discord9 * chore Signed-off-by: discord9 * feat: rm retry path Signed-off-by: discord9 * per review Signed-off-by: discord9 * feat: skip first full listing after metasrv start Signed-off-by: discord9 --------- Signed-off-by: discord9 --- src/meta-srv/src/gc.rs | 7 + src/meta-srv/src/gc/candidate.rs | 86 ++++++ src/meta-srv/src/gc/ctx.rs | 3 - src/meta-srv/src/gc/handler.rs | 459 ++++++++++++++++++++++++++++ src/meta-srv/src/gc/options.rs | 171 +++++++++++ src/meta-srv/src/gc/scheduler.rs | 162 ++++++++++ src/meta-srv/src/gc/tracker.rs | 129 ++++++++ src/meta-srv/src/metasrv.rs | 8 + src/meta-srv/src/metasrv/builder.rs | 18 ++ src/mito2/src/gc.rs | 35 +-- 10 files changed, 1044 insertions(+), 34 deletions(-) create mode 100644 src/meta-srv/src/gc/handler.rs create mode 100644 src/meta-srv/src/gc/options.rs create mode 100644 src/meta-srv/src/gc/scheduler.rs create mode 100644 src/meta-srv/src/gc/tracker.rs diff --git a/src/meta-srv/src/gc.rs b/src/meta-srv/src/gc.rs index f44cc084e2..0b9b6fe438 100644 --- a/src/meta-srv/src/gc.rs +++ b/src/meta-srv/src/gc.rs @@ -22,7 +22,14 @@ use store_api::storage::RegionId; mod candidate; mod ctx; +mod handler; +mod options; mod procedure; +mod scheduler; +mod tracker; + +pub(crate) use options::GcSchedulerOptions; +pub(crate) use scheduler::{GcScheduler, GcTickerRef}; pub(crate) type Region2Peers = HashMap)>; diff --git a/src/meta-srv/src/gc/candidate.rs b/src/meta-srv/src/gc/candidate.rs index ac52d9f81f..7d9ac9558b 100644 --- a/src/meta-srv/src/gc/candidate.rs +++ b/src/meta-srv/src/gc/candidate.rs @@ -23,6 +23,7 @@ use store_api::storage::RegionId; use table::metadata::TableId; use crate::error::Result; +use crate::gc::scheduler::GcScheduler; /// Represents a region candidate for GC with its priority score. #[derive(Debug, Clone, PartialEq, Eq)] @@ -46,3 +47,88 @@ impl GcCandidate { self.score.into_inner() } } + +impl GcScheduler { + /// Calculate GC priority score for a region based on various metrics. + fn calculate_gc_score(&self, region_stat: &RegionStat) -> f64 { + let sst_count_score = region_stat.sst_num as f64 * self.config.sst_count_weight; + + let file_remove_cnt_score = match ®ion_stat.region_manifest { + RegionManifestInfo::Mito { + file_removed_cnt, .. + } => *file_removed_cnt as f64 * self.config.file_removed_count_weight, + // Metric engine doesn't have file_removal_rate, also this should be unreachable since metrics engine doesn't support gc + RegionManifestInfo::Metric { .. } => 0.0, + }; + + sst_count_score + file_remove_cnt_score + } + + /// Filter and score regions that are candidates for GC, grouped by table. + pub(crate) async fn select_gc_candidates( + &self, + table_to_region_stats: &HashMap>, + ) -> Result>> { + let mut table_candidates: HashMap> = HashMap::new(); + let now = Instant::now(); + + for (table_id, region_stats) in table_to_region_stats { + let mut candidates = Vec::new(); + let tracker = self.region_gc_tracker.lock().await; + + for region_stat in region_stats { + if region_stat.role != RegionRole::Leader { + continue; + } + + // Skip regions that are too small + if region_stat.approximate_bytes < self.config.min_region_size_threshold { + continue; + } + + // Skip regions that are in cooldown period + if let Some(gc_info) = tracker.get(®ion_stat.id) + && now.duration_since(gc_info.last_gc_time) < self.config.gc_cooldown_period + { + debug!("Skipping region {} due to cooldown", region_stat.id); + continue; + } + + let score = self.calculate_gc_score(region_stat); + + debug!( + "Region {} (table {}) has GC score {:.4}", + region_stat.id, table_id, score + ); + + // Only consider regions with a meaningful score + if score > 0.0 { + candidates.push(GcCandidate::new(region_stat.id, score, region_stat.clone())); + } + } + + // Sort candidates by score in descending order and take top N + candidates.sort_by(|a, b| b.score.cmp(&a.score)); + let top_candidates: Vec = candidates + .into_iter() + .take(self.config.regions_per_table_threshold) + .collect(); + + if !top_candidates.is_empty() { + info!( + "Selected {} GC candidates for table {} (top {} out of all qualified)", + top_candidates.len(), + table_id, + self.config.regions_per_table_threshold + ); + table_candidates.insert(*table_id, top_candidates); + } + } + + info!( + "Selected GC candidates for {} tables", + table_candidates.len() + ); + Ok(table_candidates) + } +} diff --git a/src/meta-srv/src/gc/ctx.rs b/src/meta-srv/src/gc/ctx.rs index 848e7f2c28..8583b6375b 100644 --- a/src/meta-srv/src/gc/ctx.rs +++ b/src/meta-srv/src/gc/ctx.rs @@ -12,9 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -// TODO(discord9): remove this once gc scheduler is fully merged -#![allow(unused)] - use std::collections::{HashMap, HashSet}; use std::time::Duration; diff --git a/src/meta-srv/src/gc/handler.rs b/src/meta-srv/src/gc/handler.rs new file mode 100644 index 0000000000..c5574f1adb --- /dev/null +++ b/src/meta-srv/src/gc/handler.rs @@ -0,0 +1,459 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, HashSet}; +use std::time::Instant; + +use common_meta::key::table_route::PhysicalTableRouteValue; +use common_meta::peer::Peer; +use common_telemetry::{debug, error, info, warn}; +use futures::StreamExt; +use itertools::Itertools; +use store_api::storage::{FileRefsManifest, GcReport, RegionId}; +use table::metadata::TableId; +use tokio::time::sleep; + +use crate::error::Result; +use crate::gc::candidate::GcCandidate; +use crate::gc::scheduler::{GcJobReport, GcScheduler}; +use crate::gc::tracker::RegionGcInfo; +use crate::region; + +pub(crate) type Region2Peers = HashMap)>; + +pub(crate) type Peer2Regions = HashMap>; + +impl GcScheduler { + /// Iterate through all region stats, find region that might need gc, and send gc instruction to + /// the corresponding datanode with improved parallel processing and retry logic. + pub(crate) async fn trigger_gc(&self) -> Result { + let start_time = Instant::now(); + info!("Starting GC cycle"); + + // Step 1: Get all region statistics + let table_to_region_stats = self.ctx.get_table_to_region_stats().await?; + info!( + "Fetched region stats for {} tables", + table_to_region_stats.len() + ); + + // Step 2: Select GC candidates based on our scoring algorithm + let per_table_candidates = self.select_gc_candidates(&table_to_region_stats).await?; + + if per_table_candidates.is_empty() { + info!("No GC candidates found, skipping GC cycle"); + return Ok(Default::default()); + } + + // Step 3: Aggregate candidates by datanode + let datanode_to_candidates = self + .aggregate_candidates_by_datanode(per_table_candidates) + .await?; + + if datanode_to_candidates.is_empty() { + info!("No valid datanode candidates found, skipping GC cycle"); + return Ok(Default::default()); + } + + // Step 4: Process datanodes concurrently with limited parallelism + let report = self + .parallel_process_datanodes(datanode_to_candidates) + .await; + + let duration = start_time.elapsed(); + info!( + "Finished GC cycle. Processed {} datanodes ({} failed). Duration: {:?}", + report.per_datanode_reports.len(), // Reuse field for datanode count + report.failed_datanodes.len(), + duration + ); + debug!("Detailed GC Job Report: {report:#?}"); + + Ok(report) + } + + /// Find related regions that might share files with the candidate regions. + /// Currently returns the same regions since repartition is not implemented yet. + /// TODO(discord9): When repartition is implemented, this should also find src/dst regions + /// that might share files with the candidate regions. + pub(crate) async fn find_related_regions( + &self, + candidate_region_ids: &[RegionId], + ) -> Result>> { + Ok(candidate_region_ids.iter().map(|&r| (r, vec![r])).collect()) + } + + /// Aggregate GC candidates by their corresponding datanode peer. + pub(crate) async fn aggregate_candidates_by_datanode( + &self, + per_table_candidates: HashMap>, + ) -> Result>> { + let mut datanode_to_candidates: HashMap> = HashMap::new(); + + for (table_id, candidates) in per_table_candidates { + if candidates.is_empty() { + continue; + } + + // Get table route information to map regions to peers + let (phy_table_id, table_peer) = self.ctx.get_table_route(table_id).await?; + + if phy_table_id != table_id { + // Skip logical tables + continue; + } + + let region_to_peer = table_peer + .region_routes + .iter() + .filter_map(|r| { + r.leader_peer + .as_ref() + .map(|peer| (r.region.id, peer.clone())) + }) + .collect::>(); + + for candidate in candidates { + if let Some(peer) = region_to_peer.get(&candidate.region_id) { + datanode_to_candidates + .entry(peer.clone()) + .or_default() + .push((table_id, candidate)); + } else { + warn!( + "Skipping region {} for table {}: no leader peer found", + candidate.region_id, table_id + ); + } + } + } + + info!( + "Aggregated GC candidates for {} datanodes", + datanode_to_candidates.len() + ); + Ok(datanode_to_candidates) + } + + /// Process multiple datanodes concurrently with limited parallelism. + pub(crate) async fn parallel_process_datanodes( + &self, + datanode_to_candidates: HashMap>, + ) -> GcJobReport { + let mut report = GcJobReport::default(); + + // Create a stream of datanode GC tasks with limited concurrency + let results: Vec<_> = futures::stream::iter( + datanode_to_candidates + .into_iter() + .filter(|(_, candidates)| !candidates.is_empty()), + ) + .map(|(peer, candidates)| { + let scheduler = self; + let peer_clone = peer.clone(); + async move { + ( + peer, + scheduler.process_datanode_gc(peer_clone, candidates).await, + ) + } + }) + .buffer_unordered(self.config.max_concurrent_tables) // Reuse table concurrency limit for datanodes + .collect() + .await; + + // Process all datanode GC results and collect regions that need retry from table reports + for (peer, result) in results { + match result { + Ok(dn_report) => { + report.per_datanode_reports.insert(peer.id, dn_report); + } + Err(e) => { + error!("Failed to process datanode GC for peer {}: {:#?}", peer, e); + // Note: We don't have a direct way to map peer to table_id here, + // so we just log the error. The table_reports will contain individual region failures. + report.failed_datanodes.entry(peer.id).or_default().push(e); + } + } + } + + report + } + + /// Process GC for a single datanode with all its candidate regions. + /// Returns the table reports for this datanode. + pub(crate) async fn process_datanode_gc( + &self, + peer: Peer, + candidates: Vec<(TableId, GcCandidate)>, + ) -> Result { + info!( + "Starting GC for datanode {} with {} candidate regions", + peer, + candidates.len() + ); + + if candidates.is_empty() { + return Ok(Default::default()); + } + + let all_region_ids: Vec = candidates.iter().map(|(_, c)| c.region_id).collect(); + + let all_related_regions = self.find_related_regions(&all_region_ids).await?; + + let (region_to_peer, _) = self + .discover_datanodes_for_regions(&all_related_regions.keys().cloned().collect_vec()) + .await?; + + // Step 1: Get file references for all regions on this datanode + let file_refs_manifest = self + .ctx + .get_file_references( + &all_region_ids, + all_related_regions, + ®ion_to_peer, + self.config.mailbox_timeout, + ) + .await?; + + // Step 2: Create a single GcRegionProcedure for all regions on this datanode + let (gc_report, fully_listed_regions) = { + // Partition regions into full listing and fast listing in a single pass + + let mut batch_full_listing_decisions = + self.batch_should_use_full_listing(&all_region_ids).await; + + let need_full_list_regions = batch_full_listing_decisions + .iter() + .filter_map( + |(®ion_id, &need_full)| { + if need_full { Some(region_id) } else { None } + }, + ) + .collect_vec(); + let mut fast_list_regions = batch_full_listing_decisions + .iter() + .filter_map( + |(®ion_id, &need_full)| { + if !need_full { Some(region_id) } else { None } + }, + ) + .collect_vec(); + + let mut combined_report = GcReport::default(); + + // First process regions that can fast list + if !fast_list_regions.is_empty() { + match self + .ctx + .gc_regions( + peer.clone(), + &fast_list_regions, + &file_refs_manifest, + false, + self.config.mailbox_timeout, + ) + .await + { + Ok(report) => combined_report.merge(report), + Err(e) => { + error!( + "Failed to GC regions {:?} on datanode {}: {}", + fast_list_regions, peer, e + ); + + // Add to need_retry_regions since it failed + combined_report + .need_retry_regions + .extend(fast_list_regions.clone().into_iter()); + } + } + } + + if !need_full_list_regions.is_empty() { + match self + .ctx + .gc_regions( + peer.clone(), + &need_full_list_regions, + &file_refs_manifest, + true, + self.config.mailbox_timeout, + ) + .await + { + Ok(report) => combined_report.merge(report), + Err(e) => { + error!( + "Failed to GC regions {:?} on datanode {}: {}", + need_full_list_regions, peer, e + ); + + // Add to need_retry_regions since it failed + combined_report + .need_retry_regions + .extend(need_full_list_regions.clone()); + } + } + } + let fully_listed_regions = need_full_list_regions + .into_iter() + .filter(|r| !combined_report.need_retry_regions.contains(r)) + .collect::>(); + + (combined_report, fully_listed_regions) + }; + + // Step 3: Process the combined GC report and update table reports + for region_id in &all_region_ids { + self.update_full_listing_time(*region_id, fully_listed_regions.contains(region_id)) + .await; + } + + info!( + "Completed GC for datanode {}: {} regions processed", + peer, + all_region_ids.len() + ); + + Ok(gc_report) + } + + /// Discover datanodes for the given regions(and it's related regions) by fetching table routes in batches. + /// Returns mappings from region to peer(leader, Vec) and peer to regions. + async fn discover_datanodes_for_regions( + &self, + regions: &[RegionId], + ) -> Result<(Region2Peers, Peer2Regions)> { + let all_related_regions = self + .find_related_regions(regions) + .await? + .into_iter() + .flat_map(|(k, mut v)| { + v.push(k); + v + }) + .collect_vec(); + let mut region_to_peer = HashMap::new(); + let mut peer_to_regions = HashMap::new(); + + // Group regions by table ID for batch processing + let mut table_to_regions: HashMap> = HashMap::new(); + for region_id in all_related_regions { + let table_id = region_id.table_id(); + table_to_regions + .entry(table_id) + .or_default() + .push(region_id); + } + + // Process each table's regions together for efficiency + for (table_id, table_regions) in table_to_regions { + match self.ctx.get_table_route(table_id).await { + Ok((_phy_table_id, table_route)) => { + self.get_table_regions_peer( + &table_route, + &table_regions, + &mut region_to_peer, + &mut peer_to_regions, + ); + } + Err(e) => { + // Continue with other tables instead of failing completely + // TODO(discord9): consider failing here instead + warn!( + "Failed to get table route for table {}: {}, skipping its regions", + table_id, e + ); + continue; + } + } + } + + Ok((region_to_peer, peer_to_regions)) + } + + /// Process regions for a single table to find their current leader peers. + fn get_table_regions_peer( + &self, + table_route: &PhysicalTableRouteValue, + table_regions: &[RegionId], + region_to_peer: &mut Region2Peers, + peer_to_regions: &mut Peer2Regions, + ) { + for ®ion_id in table_regions { + let mut found = false; + + // Find the region in the table route + for region_route in &table_route.region_routes { + if region_route.region.id == region_id + && let Some(leader_peer) = ®ion_route.leader_peer + { + region_to_peer.insert( + region_id, + (leader_peer.clone(), region_route.follower_peers.clone()), + ); + peer_to_regions + .entry(leader_peer.clone()) + .or_default() + .insert(region_id); + found = true; + break; + } + } + + if !found { + warn!( + "Failed to find region {} in table route or no leader peer found", + region_id, + ); + } + } + } + + async fn batch_should_use_full_listing( + &self, + region_ids: &[RegionId], + ) -> HashMap { + let mut result = HashMap::new(); + let mut gc_tracker = self.region_gc_tracker.lock().await; + let now = Instant::now(); + for ®ion_id in region_ids { + let use_full_listing = { + if let Some(gc_info) = gc_tracker.get(®ion_id) { + if let Some(last_full_listing) = gc_info.last_full_listing_time { + // check if pass cooling down interval after last full listing + let elapsed = now.duration_since(last_full_listing); + elapsed >= self.config.full_file_listing_interval + } else { + // Never did full listing for this region, do it now + true + } + } else { + // First time GC for this region, skip doing full listing, for this time + gc_tracker.insert( + region_id, + RegionGcInfo { + last_gc_time: now, + last_full_listing_time: Some(now), + }, + ); + false + } + }; + result.insert(region_id, use_full_listing); + } + result + } +} diff --git a/src/meta-srv/src/gc/options.rs b/src/meta-srv/src/gc/options.rs new file mode 100644 index 0000000000..02ed25323a --- /dev/null +++ b/src/meta-srv/src/gc/options.rs @@ -0,0 +1,171 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use serde::{Deserialize, Serialize}; +use snafu::ensure; + +use crate::error::{self, Result}; + +/// The interval of the gc ticker. +#[allow(unused)] +pub(crate) const TICKER_INTERVAL: Duration = Duration::from_secs(60 * 5); + +/// Configuration for GC operations. +/// +/// TODO(discord9): not expose most config to users for now, until GC scheduler is fully stable. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct GcSchedulerOptions { + /// Whether GC is enabled. Default to false. + /// If set to false, no GC will be performed, and potentially some + /// files from datanodes will never be deleted. + pub enable: bool, + /// Maximum number of tables to process concurrently. + pub max_concurrent_tables: usize, + /// Maximum number of retries per region when GC fails. + pub max_retries_per_region: usize, + /// Concurrency for region GC within a table. + pub region_gc_concurrency: usize, + /// Backoff duration between retries. + pub retry_backoff_duration: Duration, + /// Minimum region size threshold for GC (in bytes). + pub min_region_size_threshold: u64, + /// Weight for SST file count in GC scoring. + pub sst_count_weight: f64, + /// Weight for file removal rate in GC scoring. + pub file_removed_count_weight: f64, + /// Cooldown period between GC operations on the same region. + pub gc_cooldown_period: Duration, + /// Maximum number of regions to select for GC per table. + pub regions_per_table_threshold: usize, + /// Timeout duration for mailbox communication with datanodes. + pub mailbox_timeout: Duration, + /// Interval for performing full file listing during GC to find orphan files. + /// Full file listing is expensive but necessary to clean up orphan files. + /// Set to a larger value (e.g., 24 hours) to balance performance and cleanup. + /// Every Nth GC cycle will use full file listing, where N = full_file_listing_interval / TICKER_INTERVAL. + pub full_file_listing_interval: Duration, + /// Interval for cleaning up stale region entries from the GC tracker. + /// This removes entries for regions that no longer exist (e.g., after table drops). + /// Set to a larger value (e.g., 6 hours) since this is just for memory cleanup. + pub tracker_cleanup_interval: Duration, +} + +impl Default for GcSchedulerOptions { + fn default() -> Self { + Self { + enable: false, + max_concurrent_tables: 10, + max_retries_per_region: 3, + retry_backoff_duration: Duration::from_secs(5), + region_gc_concurrency: 16, + min_region_size_threshold: 100 * 1024 * 1024, // 100MB + sst_count_weight: 1.0, + file_removed_count_weight: 0.5, + gc_cooldown_period: Duration::from_secs(60 * 5), // 5 minutes + regions_per_table_threshold: 20, // Select top 20 regions per table + mailbox_timeout: Duration::from_secs(60), // 60 seconds + // Perform full file listing every 24 hours to find orphan files + full_file_listing_interval: Duration::from_secs(60 * 60 * 24), + // Clean up stale tracker entries every 6 hours + tracker_cleanup_interval: Duration::from_secs(60 * 60 * 6), + } + } +} + +impl GcSchedulerOptions { + /// Validates the configuration options. + pub fn validate(&self) -> Result<()> { + ensure!( + self.max_concurrent_tables > 0, + error::InvalidArgumentsSnafu { + err_msg: "max_concurrent_tables must be greater than 0", + } + ); + + ensure!( + self.max_retries_per_region > 0, + error::InvalidArgumentsSnafu { + err_msg: "max_retries_per_region must be greater than 0", + } + ); + + ensure!( + self.region_gc_concurrency > 0, + error::InvalidArgumentsSnafu { + err_msg: "region_gc_concurrency must be greater than 0", + } + ); + + ensure!( + !self.retry_backoff_duration.is_zero(), + error::InvalidArgumentsSnafu { + err_msg: "retry_backoff_duration must be greater than 0", + } + ); + + ensure!( + self.sst_count_weight >= 0.0, + error::InvalidArgumentsSnafu { + err_msg: "sst_count_weight must be non-negative", + } + ); + + ensure!( + self.file_removed_count_weight >= 0.0, + error::InvalidArgumentsSnafu { + err_msg: "file_removal_rate_weight must be non-negative", + } + ); + + ensure!( + !self.gc_cooldown_period.is_zero(), + error::InvalidArgumentsSnafu { + err_msg: "gc_cooldown_period must be greater than 0", + } + ); + + ensure!( + self.regions_per_table_threshold > 0, + error::InvalidArgumentsSnafu { + err_msg: "regions_per_table_threshold must be greater than 0", + } + ); + + ensure!( + !self.mailbox_timeout.is_zero(), + error::InvalidArgumentsSnafu { + err_msg: "mailbox_timeout must be greater than 0", + } + ); + + ensure!( + !self.full_file_listing_interval.is_zero(), + error::InvalidArgumentsSnafu { + err_msg: "full_file_listing_interval must be greater than 0", + } + ); + + ensure!( + !self.tracker_cleanup_interval.is_zero(), + error::InvalidArgumentsSnafu { + err_msg: "tracker_cleanup_interval must be greater than 0", + } + ); + + Ok(()) + } +} diff --git a/src/meta-srv/src/gc/scheduler.rs b/src/meta-srv/src/gc/scheduler.rs new file mode 100644 index 0000000000..e3ed3834bb --- /dev/null +++ b/src/meta-srv/src/gc/scheduler.rs @@ -0,0 +1,162 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; +use std::time::Instant; + +use common_meta::DatanodeId; +use common_meta::key::TableMetadataManagerRef; +use common_procedure::ProcedureManagerRef; +use common_telemetry::{error, info}; +use store_api::storage::GcReport; +use tokio::sync::Mutex; +use tokio::sync::mpsc::{Receiver, Sender}; + +use crate::cluster::MetaPeerClientRef; +use crate::define_ticker; +use crate::error::{Error, Result}; +use crate::gc::ctx::{DefaultGcSchedulerCtx, SchedulerCtx}; +use crate::gc::options::{GcSchedulerOptions, TICKER_INTERVAL}; +use crate::gc::tracker::RegionGcTracker; +use crate::service::mailbox::MailboxRef; + +/// Report for a GC job. +#[derive(Debug, Default)] +pub struct GcJobReport { + pub per_datanode_reports: HashMap, + pub failed_datanodes: HashMap>, +} +impl GcJobReport { + pub fn merge(&mut self, mut other: GcJobReport) { + // merge per_datanode_reports&failed_datanodes + for (dn_id, report) in other.per_datanode_reports { + let self_report = self.per_datanode_reports.entry(dn_id).or_default(); + self_report.merge(report); + } + let all_failed_dn_ids = self + .failed_datanodes + .keys() + .cloned() + .chain(other.failed_datanodes.keys().cloned()) + .collect::>(); + for dn_id in all_failed_dn_ids { + let entry = self.failed_datanodes.entry(dn_id).or_default(); + if let Some(other_errors) = other.failed_datanodes.remove(&dn_id) { + entry.extend(other_errors); + } + } + self.failed_datanodes + .retain(|dn_id, _| !self.per_datanode_reports.contains_key(dn_id)); + } +} + +/// [`Event`] represents various types of events that can be processed by the gc ticker. +/// +/// Variants: +/// - `Tick`: This event is used to trigger gc periodically. +pub(crate) enum Event { + Tick, +} + +#[allow(unused)] +pub(crate) type GcTickerRef = Arc; + +define_ticker!( + /// [GcTicker] is used to trigger gc periodically. + GcTicker, + event_type = Event, + event_value = Event::Tick +); + +/// [`GcScheduler`] is used to periodically trigger garbage collection on datanodes. +pub struct GcScheduler { + pub(crate) ctx: Arc, + /// The receiver of events. + pub(crate) receiver: Receiver, + /// GC configuration. + pub(crate) config: GcSchedulerOptions, + /// Tracks the last GC time for regions. + pub(crate) region_gc_tracker: Arc>, + /// Last time the tracker was cleaned up. + pub(crate) last_tracker_cleanup: Arc>, +} + +impl GcScheduler { + /// Creates a new [`GcScheduler`] with custom configuration. + pub(crate) fn new_with_config( + table_metadata_manager: TableMetadataManagerRef, + procedure_manager: ProcedureManagerRef, + meta_peer_client: MetaPeerClientRef, + mailbox: MailboxRef, + server_addr: String, + config: GcSchedulerOptions, + ) -> Result<(Self, GcTicker)> { + // Validate configuration before creating the scheduler + config.validate()?; + + let (tx, rx) = Self::channel(); + let gc_ticker = GcTicker::new(TICKER_INTERVAL, tx); + let gc_trigger = Self { + ctx: Arc::new(DefaultGcSchedulerCtx::try_new( + table_metadata_manager, + procedure_manager, + meta_peer_client, + mailbox, + server_addr, + )?), + receiver: rx, + config, + region_gc_tracker: Arc::new(Mutex::new(HashMap::new())), + last_tracker_cleanup: Arc::new(Mutex::new(Instant::now())), + }; + Ok((gc_trigger, gc_ticker)) + } + + pub(crate) fn channel() -> (Sender, Receiver) { + tokio::sync::mpsc::channel(8) + } + + /// Starts the gc trigger. + pub fn try_start(mut self) -> Result<()> { + common_runtime::spawn_global(async move { self.run().await }); + info!("GC trigger started"); + Ok(()) + } + + pub(crate) async fn run(&mut self) { + while let Some(event) = self.receiver.recv().await { + match event { + Event::Tick => { + info!("Received gc tick"); + if let Err(e) = self.handle_tick().await { + error!("Failed to handle gc tick: {}", e); + } + } + } + } + } + + pub(crate) async fn handle_tick(&self) -> Result { + info!("Start to trigger gc"); + let report = self.trigger_gc().await?; + + // Periodically clean up stale tracker entries + self.cleanup_tracker_if_needed().await?; + + info!("Finished gc trigger"); + + Ok(report) + } +} diff --git a/src/meta-srv/src/gc/tracker.rs b/src/meta-srv/src/gc/tracker.rs new file mode 100644 index 0000000000..a5d6757c2c --- /dev/null +++ b/src/meta-srv/src/gc/tracker.rs @@ -0,0 +1,129 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, HashSet}; +use std::time::Instant; + +use common_telemetry::info; +use store_api::storage::RegionId; + +use crate::error::Result; +use crate::gc::scheduler::GcScheduler; + +/// Tracks GC timing information for a region. +#[derive(Debug, Clone)] +pub(crate) struct RegionGcInfo { + /// Last time a regular GC was performed on this region. + pub(crate) last_gc_time: Instant, + /// Last time a full file listing GC was performed on this region. + pub(crate) last_full_listing_time: Option, +} + +impl RegionGcInfo { + pub(crate) fn new(last_gc_time: Instant) -> Self { + Self { + last_gc_time, + last_full_listing_time: None, + } + } +} + +/// Tracks the last GC time for regions to implement cooldown. +pub(crate) type RegionGcTracker = HashMap; + +impl GcScheduler { + /// Clean up stale entries from the region GC tracker if enough time has passed. + /// This removes entries for regions that no longer exist in the current table routes. + pub(crate) async fn cleanup_tracker_if_needed(&self) -> Result<()> { + let mut last_cleanup = *self.last_tracker_cleanup.lock().await; + let now = Instant::now(); + + // Check if enough time has passed since last cleanup + if now.duration_since(last_cleanup) < self.config.tracker_cleanup_interval { + return Ok(()); + } + + info!("Starting region GC tracker cleanup"); + let cleanup_start = Instant::now(); + + // Get all current region IDs from table routes + let table_to_region_stats = self.ctx.get_table_to_region_stats().await?; + let mut current_regions = HashSet::new(); + for region_stats in table_to_region_stats.values() { + for region_stat in region_stats { + current_regions.insert(region_stat.id); + } + } + + // Remove stale entries from tracker + let mut tracker = self.region_gc_tracker.lock().await; + let initial_count = tracker.len(); + tracker.retain(|region_id, _| current_regions.contains(region_id)); + let removed_count = initial_count - tracker.len(); + + *self.last_tracker_cleanup.lock().await = now; + + info!( + "Completed region GC tracker cleanup: removed {} stale entries out of {} total (retained {}). Duration: {:?}", + removed_count, + initial_count, + tracker.len(), + cleanup_start.elapsed() + ); + + Ok(()) + } + + /// Determine if full file listing should be used for a region based on the last full listing time. + pub(crate) async fn should_use_full_listing(&self, region_id: RegionId) -> bool { + let gc_tracker = self.region_gc_tracker.lock().await; + let now = Instant::now(); + + if let Some(gc_info) = gc_tracker.get(®ion_id) { + if let Some(last_full_listing) = gc_info.last_full_listing_time { + let elapsed = now.duration_since(last_full_listing); + elapsed >= self.config.full_file_listing_interval + } else { + // Never did full listing for this region, do it now + true + } + } else { + // First time GC for this region, do full listing + true + } + } + + pub(crate) async fn update_full_listing_time( + &self, + region_id: RegionId, + did_full_listing: bool, + ) { + let mut gc_tracker = self.region_gc_tracker.lock().await; + let now = Instant::now(); + + gc_tracker + .entry(region_id) + .and_modify(|info| { + if did_full_listing { + info.last_full_listing_time = Some(now); + } + info.last_gc_time = now; + }) + .or_insert_with(|| RegionGcInfo { + last_gc_time: now, + // prevent need to full listing on the first GC + last_full_listing_time: Some(now), + }); + } +} diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 39fbf66ccb..c454bc1ca5 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -67,6 +67,7 @@ use crate::error::{ StartTelemetryTaskSnafu, StopProcedureManagerSnafu, }; use crate::failure_detector::PhiAccrualFailureDetectorOptions; +use crate::gc::{GcSchedulerOptions, GcTickerRef}; use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatHandlerGroupRef}; use crate::procedure::ProcedureManagerListenerAdapter; use crate::procedure::region_migration::manager::RegionMigrationManagerRef; @@ -207,6 +208,8 @@ pub struct MetasrvOptions { pub event_recorder: EventRecorderOptions, /// The stats persistence options. pub stats_persistence: StatsPersistenceOptions, + /// The GC scheduler options. + pub gc: GcSchedulerOptions, } impl fmt::Debug for MetasrvOptions { @@ -303,6 +306,7 @@ impl Default for MetasrvOptions { node_max_idle_time: Duration::from_secs(24 * 60 * 60), event_recorder: EventRecorderOptions::default(), stats_persistence: StatsPersistenceOptions::default(), + gc: GcSchedulerOptions::default(), } } } @@ -524,6 +528,7 @@ pub struct Metasrv { table_id_sequence: SequenceRef, reconciliation_manager: ReconciliationManagerRef, resource_stat: ResourceStatRef, + gc_ticker: Option, plugins: Plugins, } @@ -584,6 +589,9 @@ impl Metasrv { if let Some(region_flush_trigger) = &self.region_flush_ticker { leadership_change_notifier.add_listener(region_flush_trigger.clone() as _); } + if let Some(gc_ticker) = &self.gc_ticker { + leadership_change_notifier.add_listener(gc_ticker.clone() as _); + } if let Some(customizer) = self.plugins.get::() { customizer.customize(&mut leadership_change_notifier); } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 5a33dc9c4f..04b5bd02c6 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -56,6 +56,7 @@ use crate::cache_invalidator::MetasrvCacheInvalidator; use crate::cluster::MetaPeerClientRef; use crate::error::{self, BuildWalOptionsAllocatorSnafu, Result}; use crate::events::EventHandlerImpl; +use crate::gc::GcScheduler; use crate::greptimedb_telemetry::get_greptimedb_telemetry_task; use crate::handler::failure_handler::RegionFailureHandler; use crate::handler::flow_state_handler::FlowStateHandler; @@ -458,6 +459,22 @@ impl MetasrvBuilder { None }; + let gc_ticker = if options.gc.enable { + let (gc_scheduler, gc_ticker) = GcScheduler::new_with_config( + table_metadata_manager.clone(), + procedure_manager.clone(), + meta_peer_client.clone(), + mailbox.clone(), + options.grpc.server_addr.clone(), + options.gc.clone(), + )?; + gc_scheduler.try_start()?; + + Some(Arc::new(gc_ticker)) + } else { + None + }; + let customized_region_lease_renewer = plugins .as_ref() .and_then(|plugins| plugins.get::()); @@ -562,6 +579,7 @@ impl MetasrvBuilder { reconciliation_manager, topic_stats_registry, resource_stat: Arc::new(resource_stat), + gc_ticker, }) } } diff --git a/src/mito2/src/gc.rs b/src/mito2/src/gc.rs index b9285c183b..04d59f2d99 100644 --- a/src/mito2/src/gc.rs +++ b/src/mito2/src/gc.rs @@ -30,7 +30,7 @@ use common_telemetry::{debug, error, info, warn}; use common_time::Timestamp; use object_store::{Entry, Lister}; use serde::{Deserialize, Serialize}; -use snafu::{OptionExt, ResultExt as _}; +use snafu::ResultExt as _; use store_api::storage::{FileId, FileRefsManifest, GcReport, RegionId}; use tokio::sync::{OwnedSemaphorePermit, TryAcquireError}; use tokio_stream::StreamExt; @@ -212,35 +212,9 @@ impl LocalGcWorker { /// Outdated regions are added to `outdated_regions` set, which means their manifest version in /// self.file_ref_manifest is older than the current manifest version on datanode. /// so they need to retry GC later by metasrv with updated tmp ref files. - pub async fn read_tmp_ref_files( - &self, - outdated_regions: &mut HashSet, - ) -> Result>> { - // verify manifest version before reading tmp ref files - for (region_id, mito_region) in &self.regions { - let current_version = mito_region.manifest_ctx.manifest_version().await; - if ¤t_version - > self - .file_ref_manifest - .manifest_version - .get(region_id) - .with_context(|| UnexpectedSnafu { - reason: format!( - "Region {} not found in tmp ref manifest version map", - region_id - ), - })? - { - outdated_regions.insert(*region_id); - } - } - + pub async fn read_tmp_ref_files(&self) -> Result>> { let mut tmp_ref_files = HashMap::new(); for (region_id, file_refs) in &self.file_ref_manifest.file_refs { - if outdated_regions.contains(region_id) { - // skip outdated regions - continue; - } tmp_ref_files .entry(*region_id) .or_insert_with(HashSet::new) @@ -259,9 +233,8 @@ impl LocalGcWorker { info!("LocalGcWorker started"); let now = std::time::Instant::now(); - let mut outdated_regions = HashSet::new(); let mut deleted_files = HashMap::new(); - let tmp_ref_files = self.read_tmp_ref_files(&mut outdated_regions).await?; + let tmp_ref_files = self.read_tmp_ref_files().await?; for (region_id, region) in &self.regions { let per_region_time = std::time::Instant::now(); if region.manifest_ctx.current_state() == RegionRoleState::Follower { @@ -291,7 +264,7 @@ impl LocalGcWorker { ); let report = GcReport { deleted_files, - need_retry_regions: outdated_regions.into_iter().collect(), + need_retry_regions: HashSet::new(), }; Ok(report) }