feat: basic gc scheduler (#7263)

* feat: basic gc scheduler

Signed-off-by: discord9 <discord9@163.com>

* refactor: rm dup code

Signed-off-by: discord9 <discord9@163.com>

* docs: todo for cleaner code

Signed-off-by: discord9 <discord9@163.com>

* chore

Signed-off-by: discord9 <discord9@163.com>

* feat: rm retry path

Signed-off-by: discord9 <discord9@163.com>

* per review

Signed-off-by: discord9 <discord9@163.com>

* feat: skip first full listing after metasrv start

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2025-11-24 15:57:18 +08:00
committed by GitHub
parent c0d0b99a32
commit 52a576cf6d
10 changed files with 1044 additions and 34 deletions

View File

@@ -22,7 +22,14 @@ use store_api::storage::RegionId;
mod candidate;
mod ctx;
mod handler;
mod options;
mod procedure;
mod scheduler;
mod tracker;
pub(crate) use options::GcSchedulerOptions;
pub(crate) use scheduler::{GcScheduler, GcTickerRef};
pub(crate) type Region2Peers = HashMap<RegionId, (Peer, Vec<Peer>)>;

View File

@@ -23,6 +23,7 @@ use store_api::storage::RegionId;
use table::metadata::TableId;
use crate::error::Result;
use crate::gc::scheduler::GcScheduler;
/// Represents a region candidate for GC with its priority score.
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -46,3 +47,88 @@ impl GcCandidate {
self.score.into_inner()
}
}
impl GcScheduler {
/// Calculate GC priority score for a region based on various metrics.
fn calculate_gc_score(&self, region_stat: &RegionStat) -> f64 {
let sst_count_score = region_stat.sst_num as f64 * self.config.sst_count_weight;
let file_remove_cnt_score = match &region_stat.region_manifest {
RegionManifestInfo::Mito {
file_removed_cnt, ..
} => *file_removed_cnt as f64 * self.config.file_removed_count_weight,
// Metric engine doesn't have file_removal_rate, also this should be unreachable since metrics engine doesn't support gc
RegionManifestInfo::Metric { .. } => 0.0,
};
sst_count_score + file_remove_cnt_score
}
/// Filter and score regions that are candidates for GC, grouped by table.
pub(crate) async fn select_gc_candidates(
&self,
table_to_region_stats: &HashMap<TableId, Vec<RegionStat>>,
) -> Result<HashMap<TableId, Vec<GcCandidate>>> {
let mut table_candidates: HashMap<TableId, Vec<GcCandidate>> = HashMap::new();
let now = Instant::now();
for (table_id, region_stats) in table_to_region_stats {
let mut candidates = Vec::new();
let tracker = self.region_gc_tracker.lock().await;
for region_stat in region_stats {
if region_stat.role != RegionRole::Leader {
continue;
}
// Skip regions that are too small
if region_stat.approximate_bytes < self.config.min_region_size_threshold {
continue;
}
// Skip regions that are in cooldown period
if let Some(gc_info) = tracker.get(&region_stat.id)
&& now.duration_since(gc_info.last_gc_time) < self.config.gc_cooldown_period
{
debug!("Skipping region {} due to cooldown", region_stat.id);
continue;
}
let score = self.calculate_gc_score(region_stat);
debug!(
"Region {} (table {}) has GC score {:.4}",
region_stat.id, table_id, score
);
// Only consider regions with a meaningful score
if score > 0.0 {
candidates.push(GcCandidate::new(region_stat.id, score, region_stat.clone()));
}
}
// Sort candidates by score in descending order and take top N
candidates.sort_by(|a, b| b.score.cmp(&a.score));
let top_candidates: Vec<GcCandidate> = candidates
.into_iter()
.take(self.config.regions_per_table_threshold)
.collect();
if !top_candidates.is_empty() {
info!(
"Selected {} GC candidates for table {} (top {} out of all qualified)",
top_candidates.len(),
table_id,
self.config.regions_per_table_threshold
);
table_candidates.insert(*table_id, top_candidates);
}
}
info!(
"Selected GC candidates for {} tables",
table_candidates.len()
);
Ok(table_candidates)
}
}

View File

@@ -12,9 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// TODO(discord9): remove this once gc scheduler is fully merged
#![allow(unused)]
use std::collections::{HashMap, HashSet};
use std::time::Duration;

View File

@@ -0,0 +1,459 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use std::time::Instant;
use common_meta::key::table_route::PhysicalTableRouteValue;
use common_meta::peer::Peer;
use common_telemetry::{debug, error, info, warn};
use futures::StreamExt;
use itertools::Itertools;
use store_api::storage::{FileRefsManifest, GcReport, RegionId};
use table::metadata::TableId;
use tokio::time::sleep;
use crate::error::Result;
use crate::gc::candidate::GcCandidate;
use crate::gc::scheduler::{GcJobReport, GcScheduler};
use crate::gc::tracker::RegionGcInfo;
use crate::region;
pub(crate) type Region2Peers = HashMap<RegionId, (Peer, Vec<Peer>)>;
pub(crate) type Peer2Regions = HashMap<Peer, HashSet<RegionId>>;
impl GcScheduler {
/// Iterate through all region stats, find region that might need gc, and send gc instruction to
/// the corresponding datanode with improved parallel processing and retry logic.
pub(crate) async fn trigger_gc(&self) -> Result<GcJobReport> {
let start_time = Instant::now();
info!("Starting GC cycle");
// Step 1: Get all region statistics
let table_to_region_stats = self.ctx.get_table_to_region_stats().await?;
info!(
"Fetched region stats for {} tables",
table_to_region_stats.len()
);
// Step 2: Select GC candidates based on our scoring algorithm
let per_table_candidates = self.select_gc_candidates(&table_to_region_stats).await?;
if per_table_candidates.is_empty() {
info!("No GC candidates found, skipping GC cycle");
return Ok(Default::default());
}
// Step 3: Aggregate candidates by datanode
let datanode_to_candidates = self
.aggregate_candidates_by_datanode(per_table_candidates)
.await?;
if datanode_to_candidates.is_empty() {
info!("No valid datanode candidates found, skipping GC cycle");
return Ok(Default::default());
}
// Step 4: Process datanodes concurrently with limited parallelism
let report = self
.parallel_process_datanodes(datanode_to_candidates)
.await;
let duration = start_time.elapsed();
info!(
"Finished GC cycle. Processed {} datanodes ({} failed). Duration: {:?}",
report.per_datanode_reports.len(), // Reuse field for datanode count
report.failed_datanodes.len(),
duration
);
debug!("Detailed GC Job Report: {report:#?}");
Ok(report)
}
/// Find related regions that might share files with the candidate regions.
/// Currently returns the same regions since repartition is not implemented yet.
/// TODO(discord9): When repartition is implemented, this should also find src/dst regions
/// that might share files with the candidate regions.
pub(crate) async fn find_related_regions(
&self,
candidate_region_ids: &[RegionId],
) -> Result<HashMap<RegionId, Vec<RegionId>>> {
Ok(candidate_region_ids.iter().map(|&r| (r, vec![r])).collect())
}
/// Aggregate GC candidates by their corresponding datanode peer.
pub(crate) async fn aggregate_candidates_by_datanode(
&self,
per_table_candidates: HashMap<TableId, Vec<GcCandidate>>,
) -> Result<HashMap<Peer, Vec<(TableId, GcCandidate)>>> {
let mut datanode_to_candidates: HashMap<Peer, Vec<(TableId, GcCandidate)>> = HashMap::new();
for (table_id, candidates) in per_table_candidates {
if candidates.is_empty() {
continue;
}
// Get table route information to map regions to peers
let (phy_table_id, table_peer) = self.ctx.get_table_route(table_id).await?;
if phy_table_id != table_id {
// Skip logical tables
continue;
}
let region_to_peer = table_peer
.region_routes
.iter()
.filter_map(|r| {
r.leader_peer
.as_ref()
.map(|peer| (r.region.id, peer.clone()))
})
.collect::<HashMap<RegionId, Peer>>();
for candidate in candidates {
if let Some(peer) = region_to_peer.get(&candidate.region_id) {
datanode_to_candidates
.entry(peer.clone())
.or_default()
.push((table_id, candidate));
} else {
warn!(
"Skipping region {} for table {}: no leader peer found",
candidate.region_id, table_id
);
}
}
}
info!(
"Aggregated GC candidates for {} datanodes",
datanode_to_candidates.len()
);
Ok(datanode_to_candidates)
}
/// Process multiple datanodes concurrently with limited parallelism.
pub(crate) async fn parallel_process_datanodes(
&self,
datanode_to_candidates: HashMap<Peer, Vec<(TableId, GcCandidate)>>,
) -> GcJobReport {
let mut report = GcJobReport::default();
// Create a stream of datanode GC tasks with limited concurrency
let results: Vec<_> = futures::stream::iter(
datanode_to_candidates
.into_iter()
.filter(|(_, candidates)| !candidates.is_empty()),
)
.map(|(peer, candidates)| {
let scheduler = self;
let peer_clone = peer.clone();
async move {
(
peer,
scheduler.process_datanode_gc(peer_clone, candidates).await,
)
}
})
.buffer_unordered(self.config.max_concurrent_tables) // Reuse table concurrency limit for datanodes
.collect()
.await;
// Process all datanode GC results and collect regions that need retry from table reports
for (peer, result) in results {
match result {
Ok(dn_report) => {
report.per_datanode_reports.insert(peer.id, dn_report);
}
Err(e) => {
error!("Failed to process datanode GC for peer {}: {:#?}", peer, e);
// Note: We don't have a direct way to map peer to table_id here,
// so we just log the error. The table_reports will contain individual region failures.
report.failed_datanodes.entry(peer.id).or_default().push(e);
}
}
}
report
}
/// Process GC for a single datanode with all its candidate regions.
/// Returns the table reports for this datanode.
pub(crate) async fn process_datanode_gc(
&self,
peer: Peer,
candidates: Vec<(TableId, GcCandidate)>,
) -> Result<GcReport> {
info!(
"Starting GC for datanode {} with {} candidate regions",
peer,
candidates.len()
);
if candidates.is_empty() {
return Ok(Default::default());
}
let all_region_ids: Vec<RegionId> = candidates.iter().map(|(_, c)| c.region_id).collect();
let all_related_regions = self.find_related_regions(&all_region_ids).await?;
let (region_to_peer, _) = self
.discover_datanodes_for_regions(&all_related_regions.keys().cloned().collect_vec())
.await?;
// Step 1: Get file references for all regions on this datanode
let file_refs_manifest = self
.ctx
.get_file_references(
&all_region_ids,
all_related_regions,
&region_to_peer,
self.config.mailbox_timeout,
)
.await?;
// Step 2: Create a single GcRegionProcedure for all regions on this datanode
let (gc_report, fully_listed_regions) = {
// Partition regions into full listing and fast listing in a single pass
let mut batch_full_listing_decisions =
self.batch_should_use_full_listing(&all_region_ids).await;
let need_full_list_regions = batch_full_listing_decisions
.iter()
.filter_map(
|(&region_id, &need_full)| {
if need_full { Some(region_id) } else { None }
},
)
.collect_vec();
let mut fast_list_regions = batch_full_listing_decisions
.iter()
.filter_map(
|(&region_id, &need_full)| {
if !need_full { Some(region_id) } else { None }
},
)
.collect_vec();
let mut combined_report = GcReport::default();
// First process regions that can fast list
if !fast_list_regions.is_empty() {
match self
.ctx
.gc_regions(
peer.clone(),
&fast_list_regions,
&file_refs_manifest,
false,
self.config.mailbox_timeout,
)
.await
{
Ok(report) => combined_report.merge(report),
Err(e) => {
error!(
"Failed to GC regions {:?} on datanode {}: {}",
fast_list_regions, peer, e
);
// Add to need_retry_regions since it failed
combined_report
.need_retry_regions
.extend(fast_list_regions.clone().into_iter());
}
}
}
if !need_full_list_regions.is_empty() {
match self
.ctx
.gc_regions(
peer.clone(),
&need_full_list_regions,
&file_refs_manifest,
true,
self.config.mailbox_timeout,
)
.await
{
Ok(report) => combined_report.merge(report),
Err(e) => {
error!(
"Failed to GC regions {:?} on datanode {}: {}",
need_full_list_regions, peer, e
);
// Add to need_retry_regions since it failed
combined_report
.need_retry_regions
.extend(need_full_list_regions.clone());
}
}
}
let fully_listed_regions = need_full_list_regions
.into_iter()
.filter(|r| !combined_report.need_retry_regions.contains(r))
.collect::<HashSet<_>>();
(combined_report, fully_listed_regions)
};
// Step 3: Process the combined GC report and update table reports
for region_id in &all_region_ids {
self.update_full_listing_time(*region_id, fully_listed_regions.contains(region_id))
.await;
}
info!(
"Completed GC for datanode {}: {} regions processed",
peer,
all_region_ids.len()
);
Ok(gc_report)
}
/// Discover datanodes for the given regions(and it's related regions) by fetching table routes in batches.
/// Returns mappings from region to peer(leader, Vec<followers>) and peer to regions.
async fn discover_datanodes_for_regions(
&self,
regions: &[RegionId],
) -> Result<(Region2Peers, Peer2Regions)> {
let all_related_regions = self
.find_related_regions(regions)
.await?
.into_iter()
.flat_map(|(k, mut v)| {
v.push(k);
v
})
.collect_vec();
let mut region_to_peer = HashMap::new();
let mut peer_to_regions = HashMap::new();
// Group regions by table ID for batch processing
let mut table_to_regions: HashMap<TableId, Vec<RegionId>> = HashMap::new();
for region_id in all_related_regions {
let table_id = region_id.table_id();
table_to_regions
.entry(table_id)
.or_default()
.push(region_id);
}
// Process each table's regions together for efficiency
for (table_id, table_regions) in table_to_regions {
match self.ctx.get_table_route(table_id).await {
Ok((_phy_table_id, table_route)) => {
self.get_table_regions_peer(
&table_route,
&table_regions,
&mut region_to_peer,
&mut peer_to_regions,
);
}
Err(e) => {
// Continue with other tables instead of failing completely
// TODO(discord9): consider failing here instead
warn!(
"Failed to get table route for table {}: {}, skipping its regions",
table_id, e
);
continue;
}
}
}
Ok((region_to_peer, peer_to_regions))
}
/// Process regions for a single table to find their current leader peers.
fn get_table_regions_peer(
&self,
table_route: &PhysicalTableRouteValue,
table_regions: &[RegionId],
region_to_peer: &mut Region2Peers,
peer_to_regions: &mut Peer2Regions,
) {
for &region_id in table_regions {
let mut found = false;
// Find the region in the table route
for region_route in &table_route.region_routes {
if region_route.region.id == region_id
&& let Some(leader_peer) = &region_route.leader_peer
{
region_to_peer.insert(
region_id,
(leader_peer.clone(), region_route.follower_peers.clone()),
);
peer_to_regions
.entry(leader_peer.clone())
.or_default()
.insert(region_id);
found = true;
break;
}
}
if !found {
warn!(
"Failed to find region {} in table route or no leader peer found",
region_id,
);
}
}
}
async fn batch_should_use_full_listing(
&self,
region_ids: &[RegionId],
) -> HashMap<RegionId, bool> {
let mut result = HashMap::new();
let mut gc_tracker = self.region_gc_tracker.lock().await;
let now = Instant::now();
for &region_id in region_ids {
let use_full_listing = {
if let Some(gc_info) = gc_tracker.get(&region_id) {
if let Some(last_full_listing) = gc_info.last_full_listing_time {
// check if pass cooling down interval after last full listing
let elapsed = now.duration_since(last_full_listing);
elapsed >= self.config.full_file_listing_interval
} else {
// Never did full listing for this region, do it now
true
}
} else {
// First time GC for this region, skip doing full listing, for this time
gc_tracker.insert(
region_id,
RegionGcInfo {
last_gc_time: now,
last_full_listing_time: Some(now),
},
);
false
}
};
result.insert(region_id, use_full_listing);
}
result
}
}

View File

@@ -0,0 +1,171 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use serde::{Deserialize, Serialize};
use snafu::ensure;
use crate::error::{self, Result};
/// The interval of the gc ticker.
#[allow(unused)]
pub(crate) const TICKER_INTERVAL: Duration = Duration::from_secs(60 * 5);
/// Configuration for GC operations.
///
/// TODO(discord9): not expose most config to users for now, until GC scheduler is fully stable.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct GcSchedulerOptions {
/// Whether GC is enabled. Default to false.
/// If set to false, no GC will be performed, and potentially some
/// files from datanodes will never be deleted.
pub enable: bool,
/// Maximum number of tables to process concurrently.
pub max_concurrent_tables: usize,
/// Maximum number of retries per region when GC fails.
pub max_retries_per_region: usize,
/// Concurrency for region GC within a table.
pub region_gc_concurrency: usize,
/// Backoff duration between retries.
pub retry_backoff_duration: Duration,
/// Minimum region size threshold for GC (in bytes).
pub min_region_size_threshold: u64,
/// Weight for SST file count in GC scoring.
pub sst_count_weight: f64,
/// Weight for file removal rate in GC scoring.
pub file_removed_count_weight: f64,
/// Cooldown period between GC operations on the same region.
pub gc_cooldown_period: Duration,
/// Maximum number of regions to select for GC per table.
pub regions_per_table_threshold: usize,
/// Timeout duration for mailbox communication with datanodes.
pub mailbox_timeout: Duration,
/// Interval for performing full file listing during GC to find orphan files.
/// Full file listing is expensive but necessary to clean up orphan files.
/// Set to a larger value (e.g., 24 hours) to balance performance and cleanup.
/// Every Nth GC cycle will use full file listing, where N = full_file_listing_interval / TICKER_INTERVAL.
pub full_file_listing_interval: Duration,
/// Interval for cleaning up stale region entries from the GC tracker.
/// This removes entries for regions that no longer exist (e.g., after table drops).
/// Set to a larger value (e.g., 6 hours) since this is just for memory cleanup.
pub tracker_cleanup_interval: Duration,
}
impl Default for GcSchedulerOptions {
fn default() -> Self {
Self {
enable: false,
max_concurrent_tables: 10,
max_retries_per_region: 3,
retry_backoff_duration: Duration::from_secs(5),
region_gc_concurrency: 16,
min_region_size_threshold: 100 * 1024 * 1024, // 100MB
sst_count_weight: 1.0,
file_removed_count_weight: 0.5,
gc_cooldown_period: Duration::from_secs(60 * 5), // 5 minutes
regions_per_table_threshold: 20, // Select top 20 regions per table
mailbox_timeout: Duration::from_secs(60), // 60 seconds
// Perform full file listing every 24 hours to find orphan files
full_file_listing_interval: Duration::from_secs(60 * 60 * 24),
// Clean up stale tracker entries every 6 hours
tracker_cleanup_interval: Duration::from_secs(60 * 60 * 6),
}
}
}
impl GcSchedulerOptions {
/// Validates the configuration options.
pub fn validate(&self) -> Result<()> {
ensure!(
self.max_concurrent_tables > 0,
error::InvalidArgumentsSnafu {
err_msg: "max_concurrent_tables must be greater than 0",
}
);
ensure!(
self.max_retries_per_region > 0,
error::InvalidArgumentsSnafu {
err_msg: "max_retries_per_region must be greater than 0",
}
);
ensure!(
self.region_gc_concurrency > 0,
error::InvalidArgumentsSnafu {
err_msg: "region_gc_concurrency must be greater than 0",
}
);
ensure!(
!self.retry_backoff_duration.is_zero(),
error::InvalidArgumentsSnafu {
err_msg: "retry_backoff_duration must be greater than 0",
}
);
ensure!(
self.sst_count_weight >= 0.0,
error::InvalidArgumentsSnafu {
err_msg: "sst_count_weight must be non-negative",
}
);
ensure!(
self.file_removed_count_weight >= 0.0,
error::InvalidArgumentsSnafu {
err_msg: "file_removal_rate_weight must be non-negative",
}
);
ensure!(
!self.gc_cooldown_period.is_zero(),
error::InvalidArgumentsSnafu {
err_msg: "gc_cooldown_period must be greater than 0",
}
);
ensure!(
self.regions_per_table_threshold > 0,
error::InvalidArgumentsSnafu {
err_msg: "regions_per_table_threshold must be greater than 0",
}
);
ensure!(
!self.mailbox_timeout.is_zero(),
error::InvalidArgumentsSnafu {
err_msg: "mailbox_timeout must be greater than 0",
}
);
ensure!(
!self.full_file_listing_interval.is_zero(),
error::InvalidArgumentsSnafu {
err_msg: "full_file_listing_interval must be greater than 0",
}
);
ensure!(
!self.tracker_cleanup_interval.is_zero(),
error::InvalidArgumentsSnafu {
err_msg: "tracker_cleanup_interval must be greater than 0",
}
);
Ok(())
}
}

View File

@@ -0,0 +1,162 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Instant;
use common_meta::DatanodeId;
use common_meta::key::TableMetadataManagerRef;
use common_procedure::ProcedureManagerRef;
use common_telemetry::{error, info};
use store_api::storage::GcReport;
use tokio::sync::Mutex;
use tokio::sync::mpsc::{Receiver, Sender};
use crate::cluster::MetaPeerClientRef;
use crate::define_ticker;
use crate::error::{Error, Result};
use crate::gc::ctx::{DefaultGcSchedulerCtx, SchedulerCtx};
use crate::gc::options::{GcSchedulerOptions, TICKER_INTERVAL};
use crate::gc::tracker::RegionGcTracker;
use crate::service::mailbox::MailboxRef;
/// Report for a GC job.
#[derive(Debug, Default)]
pub struct GcJobReport {
pub per_datanode_reports: HashMap<DatanodeId, GcReport>,
pub failed_datanodes: HashMap<DatanodeId, Vec<Error>>,
}
impl GcJobReport {
pub fn merge(&mut self, mut other: GcJobReport) {
// merge per_datanode_reports&failed_datanodes
for (dn_id, report) in other.per_datanode_reports {
let self_report = self.per_datanode_reports.entry(dn_id).or_default();
self_report.merge(report);
}
let all_failed_dn_ids = self
.failed_datanodes
.keys()
.cloned()
.chain(other.failed_datanodes.keys().cloned())
.collect::<HashSet<_>>();
for dn_id in all_failed_dn_ids {
let entry = self.failed_datanodes.entry(dn_id).or_default();
if let Some(other_errors) = other.failed_datanodes.remove(&dn_id) {
entry.extend(other_errors);
}
}
self.failed_datanodes
.retain(|dn_id, _| !self.per_datanode_reports.contains_key(dn_id));
}
}
/// [`Event`] represents various types of events that can be processed by the gc ticker.
///
/// Variants:
/// - `Tick`: This event is used to trigger gc periodically.
pub(crate) enum Event {
Tick,
}
#[allow(unused)]
pub(crate) type GcTickerRef = Arc<GcTicker>;
define_ticker!(
/// [GcTicker] is used to trigger gc periodically.
GcTicker,
event_type = Event,
event_value = Event::Tick
);
/// [`GcScheduler`] is used to periodically trigger garbage collection on datanodes.
pub struct GcScheduler {
pub(crate) ctx: Arc<dyn SchedulerCtx>,
/// The receiver of events.
pub(crate) receiver: Receiver<Event>,
/// GC configuration.
pub(crate) config: GcSchedulerOptions,
/// Tracks the last GC time for regions.
pub(crate) region_gc_tracker: Arc<Mutex<RegionGcTracker>>,
/// Last time the tracker was cleaned up.
pub(crate) last_tracker_cleanup: Arc<Mutex<Instant>>,
}
impl GcScheduler {
/// Creates a new [`GcScheduler`] with custom configuration.
pub(crate) fn new_with_config(
table_metadata_manager: TableMetadataManagerRef,
procedure_manager: ProcedureManagerRef,
meta_peer_client: MetaPeerClientRef,
mailbox: MailboxRef,
server_addr: String,
config: GcSchedulerOptions,
) -> Result<(Self, GcTicker)> {
// Validate configuration before creating the scheduler
config.validate()?;
let (tx, rx) = Self::channel();
let gc_ticker = GcTicker::new(TICKER_INTERVAL, tx);
let gc_trigger = Self {
ctx: Arc::new(DefaultGcSchedulerCtx::try_new(
table_metadata_manager,
procedure_manager,
meta_peer_client,
mailbox,
server_addr,
)?),
receiver: rx,
config,
region_gc_tracker: Arc::new(Mutex::new(HashMap::new())),
last_tracker_cleanup: Arc::new(Mutex::new(Instant::now())),
};
Ok((gc_trigger, gc_ticker))
}
pub(crate) fn channel() -> (Sender<Event>, Receiver<Event>) {
tokio::sync::mpsc::channel(8)
}
/// Starts the gc trigger.
pub fn try_start(mut self) -> Result<()> {
common_runtime::spawn_global(async move { self.run().await });
info!("GC trigger started");
Ok(())
}
pub(crate) async fn run(&mut self) {
while let Some(event) = self.receiver.recv().await {
match event {
Event::Tick => {
info!("Received gc tick");
if let Err(e) = self.handle_tick().await {
error!("Failed to handle gc tick: {}", e);
}
}
}
}
}
pub(crate) async fn handle_tick(&self) -> Result<GcJobReport> {
info!("Start to trigger gc");
let report = self.trigger_gc().await?;
// Periodically clean up stale tracker entries
self.cleanup_tracker_if_needed().await?;
info!("Finished gc trigger");
Ok(report)
}
}

View File

@@ -0,0 +1,129 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use std::time::Instant;
use common_telemetry::info;
use store_api::storage::RegionId;
use crate::error::Result;
use crate::gc::scheduler::GcScheduler;
/// Tracks GC timing information for a region.
#[derive(Debug, Clone)]
pub(crate) struct RegionGcInfo {
/// Last time a regular GC was performed on this region.
pub(crate) last_gc_time: Instant,
/// Last time a full file listing GC was performed on this region.
pub(crate) last_full_listing_time: Option<Instant>,
}
impl RegionGcInfo {
pub(crate) fn new(last_gc_time: Instant) -> Self {
Self {
last_gc_time,
last_full_listing_time: None,
}
}
}
/// Tracks the last GC time for regions to implement cooldown.
pub(crate) type RegionGcTracker = HashMap<RegionId, RegionGcInfo>;
impl GcScheduler {
/// Clean up stale entries from the region GC tracker if enough time has passed.
/// This removes entries for regions that no longer exist in the current table routes.
pub(crate) async fn cleanup_tracker_if_needed(&self) -> Result<()> {
let mut last_cleanup = *self.last_tracker_cleanup.lock().await;
let now = Instant::now();
// Check if enough time has passed since last cleanup
if now.duration_since(last_cleanup) < self.config.tracker_cleanup_interval {
return Ok(());
}
info!("Starting region GC tracker cleanup");
let cleanup_start = Instant::now();
// Get all current region IDs from table routes
let table_to_region_stats = self.ctx.get_table_to_region_stats().await?;
let mut current_regions = HashSet::new();
for region_stats in table_to_region_stats.values() {
for region_stat in region_stats {
current_regions.insert(region_stat.id);
}
}
// Remove stale entries from tracker
let mut tracker = self.region_gc_tracker.lock().await;
let initial_count = tracker.len();
tracker.retain(|region_id, _| current_regions.contains(region_id));
let removed_count = initial_count - tracker.len();
*self.last_tracker_cleanup.lock().await = now;
info!(
"Completed region GC tracker cleanup: removed {} stale entries out of {} total (retained {}). Duration: {:?}",
removed_count,
initial_count,
tracker.len(),
cleanup_start.elapsed()
);
Ok(())
}
/// Determine if full file listing should be used for a region based on the last full listing time.
pub(crate) async fn should_use_full_listing(&self, region_id: RegionId) -> bool {
let gc_tracker = self.region_gc_tracker.lock().await;
let now = Instant::now();
if let Some(gc_info) = gc_tracker.get(&region_id) {
if let Some(last_full_listing) = gc_info.last_full_listing_time {
let elapsed = now.duration_since(last_full_listing);
elapsed >= self.config.full_file_listing_interval
} else {
// Never did full listing for this region, do it now
true
}
} else {
// First time GC for this region, do full listing
true
}
}
pub(crate) async fn update_full_listing_time(
&self,
region_id: RegionId,
did_full_listing: bool,
) {
let mut gc_tracker = self.region_gc_tracker.lock().await;
let now = Instant::now();
gc_tracker
.entry(region_id)
.and_modify(|info| {
if did_full_listing {
info.last_full_listing_time = Some(now);
}
info.last_gc_time = now;
})
.or_insert_with(|| RegionGcInfo {
last_gc_time: now,
// prevent need to full listing on the first GC
last_full_listing_time: Some(now),
});
}
}

View File

@@ -67,6 +67,7 @@ use crate::error::{
StartTelemetryTaskSnafu, StopProcedureManagerSnafu,
};
use crate::failure_detector::PhiAccrualFailureDetectorOptions;
use crate::gc::{GcSchedulerOptions, GcTickerRef};
use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatHandlerGroupRef};
use crate::procedure::ProcedureManagerListenerAdapter;
use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
@@ -207,6 +208,8 @@ pub struct MetasrvOptions {
pub event_recorder: EventRecorderOptions,
/// The stats persistence options.
pub stats_persistence: StatsPersistenceOptions,
/// The GC scheduler options.
pub gc: GcSchedulerOptions,
}
impl fmt::Debug for MetasrvOptions {
@@ -303,6 +306,7 @@ impl Default for MetasrvOptions {
node_max_idle_time: Duration::from_secs(24 * 60 * 60),
event_recorder: EventRecorderOptions::default(),
stats_persistence: StatsPersistenceOptions::default(),
gc: GcSchedulerOptions::default(),
}
}
}
@@ -524,6 +528,7 @@ pub struct Metasrv {
table_id_sequence: SequenceRef,
reconciliation_manager: ReconciliationManagerRef,
resource_stat: ResourceStatRef,
gc_ticker: Option<GcTickerRef>,
plugins: Plugins,
}
@@ -584,6 +589,9 @@ impl Metasrv {
if let Some(region_flush_trigger) = &self.region_flush_ticker {
leadership_change_notifier.add_listener(region_flush_trigger.clone() as _);
}
if let Some(gc_ticker) = &self.gc_ticker {
leadership_change_notifier.add_listener(gc_ticker.clone() as _);
}
if let Some(customizer) = self.plugins.get::<LeadershipChangeNotifierCustomizerRef>() {
customizer.customize(&mut leadership_change_notifier);
}

View File

@@ -56,6 +56,7 @@ use crate::cache_invalidator::MetasrvCacheInvalidator;
use crate::cluster::MetaPeerClientRef;
use crate::error::{self, BuildWalOptionsAllocatorSnafu, Result};
use crate::events::EventHandlerImpl;
use crate::gc::GcScheduler;
use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
use crate::handler::failure_handler::RegionFailureHandler;
use crate::handler::flow_state_handler::FlowStateHandler;
@@ -458,6 +459,22 @@ impl MetasrvBuilder {
None
};
let gc_ticker = if options.gc.enable {
let (gc_scheduler, gc_ticker) = GcScheduler::new_with_config(
table_metadata_manager.clone(),
procedure_manager.clone(),
meta_peer_client.clone(),
mailbox.clone(),
options.grpc.server_addr.clone(),
options.gc.clone(),
)?;
gc_scheduler.try_start()?;
Some(Arc::new(gc_ticker))
} else {
None
};
let customized_region_lease_renewer = plugins
.as_ref()
.and_then(|plugins| plugins.get::<CustomizedRegionLeaseRenewerRef>());
@@ -562,6 +579,7 @@ impl MetasrvBuilder {
reconciliation_manager,
topic_stats_registry,
resource_stat: Arc::new(resource_stat),
gc_ticker,
})
}
}

View File

@@ -30,7 +30,7 @@ use common_telemetry::{debug, error, info, warn};
use common_time::Timestamp;
use object_store::{Entry, Lister};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt as _};
use snafu::ResultExt as _;
use store_api::storage::{FileId, FileRefsManifest, GcReport, RegionId};
use tokio::sync::{OwnedSemaphorePermit, TryAcquireError};
use tokio_stream::StreamExt;
@@ -212,35 +212,9 @@ impl LocalGcWorker {
/// Outdated regions are added to `outdated_regions` set, which means their manifest version in
/// self.file_ref_manifest is older than the current manifest version on datanode.
/// so they need to retry GC later by metasrv with updated tmp ref files.
pub async fn read_tmp_ref_files(
&self,
outdated_regions: &mut HashSet<RegionId>,
) -> Result<HashMap<RegionId, HashSet<FileId>>> {
// verify manifest version before reading tmp ref files
for (region_id, mito_region) in &self.regions {
let current_version = mito_region.manifest_ctx.manifest_version().await;
if &current_version
> self
.file_ref_manifest
.manifest_version
.get(region_id)
.with_context(|| UnexpectedSnafu {
reason: format!(
"Region {} not found in tmp ref manifest version map",
region_id
),
})?
{
outdated_regions.insert(*region_id);
}
}
pub async fn read_tmp_ref_files(&self) -> Result<HashMap<RegionId, HashSet<FileId>>> {
let mut tmp_ref_files = HashMap::new();
for (region_id, file_refs) in &self.file_ref_manifest.file_refs {
if outdated_regions.contains(region_id) {
// skip outdated regions
continue;
}
tmp_ref_files
.entry(*region_id)
.or_insert_with(HashSet::new)
@@ -259,9 +233,8 @@ impl LocalGcWorker {
info!("LocalGcWorker started");
let now = std::time::Instant::now();
let mut outdated_regions = HashSet::new();
let mut deleted_files = HashMap::new();
let tmp_ref_files = self.read_tmp_ref_files(&mut outdated_regions).await?;
let tmp_ref_files = self.read_tmp_ref_files().await?;
for (region_id, region) in &self.regions {
let per_region_time = std::time::Instant::now();
if region.manifest_ctx.current_state() == RegionRoleState::Follower {
@@ -291,7 +264,7 @@ impl LocalGcWorker {
);
let report = GcReport {
deleted_files,
need_retry_regions: outdated_regions.into_iter().collect(),
need_retry_regions: HashSet::new(),
};
Ok(report)
}