From f02bdf54289f238d696dad6b357ffb2ec471cdf2 Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Thu, 4 Dec 2025 20:00:25 +0800 Subject: [PATCH] test: gc worker scheduler mock test (#7292) * feat: gc worker only on local region Signed-off-by: discord9 feat: gc scheduler wip: gc trigger Signed-off-by: discord9 feat: dn file removal rate Signed-off-by: discord9 feat: trigger gc with stats(WIP) Signed-off-by: discord9 chore Signed-off-by: discord9 also move files ref manifest to store-api Signed-off-by: discord9 feat: basic gc trigger impl Signed-off-by: discord9 wip: handle file ref change Signed-off-by: discord9 refactor: use region ids Signed-off-by: discord9 fix: retry using related regions Signed-off-by: discord9 chore: rm unused Signed-off-by: discord9 fix: update file reference type in GC worker Signed-off-by: discord9 feat: dn gc limiter Signed-off-by: discord9 rename Signed-off-by: discord9 feat: gc scheduler retry with outdated regions Signed-off-by: discord9 feat: use real object store purger Signed-off-by: discord9 wip: add to metasrv Signed-off-by: discord9 feat: add to metasrv Signed-off-by: discord9 feat: datanode gc worker handler Signed-off-by: discord9 fix: no partition col fix Signed-off-by: discord9 fix: RegionId json deser workaround Signed-off-by: discord9 fix: find access layer Signed-off-by: discord9 fix: on host dn Signed-off-by: discord9 fix: stat dedup Signed-off-by: discord9 refactor: rm load-based Signed-off-by: discord9 chore: aft rebase fix Signed-off-by: discord9 feat: not full scan Signed-off-by: discord9 chore: after rebase fix Signed-off-by: discord9 feat: clean tracker Signed-off-by: discord9 after rebase fix Signed-off-by: discord9 clippy Signed-off-by: discord9 refactor: split gc scheduler Signed-off-by: discord9 feat: smaller linger time Signed-off-by: discord9 feat: parallel region gc instr Signed-off-by: discord9 chore: rename Signed-off-by: discord9 chore: rename Signed-off-by: discord9 enable is false Signed-off-by: discord9 feat: update removed files precisely Signed-off-by: discord9 all default to false&use local file purger Signed-off-by: discord9 feat: not evict if gc enabled Signed-off-by: discord9 per review Signed-off-by: discord9 fix: pass gc config in mito&test: after truncate gc Signed-off-by: discord9 WIP: one more test Signed-off-by: discord9 test: basic compact Signed-off-by: discord9 test: compact with ref Signed-off-by: discord9 refactor: for easier mock Signed-off-by: discord9 docs: explain race condition Signed-off-by: discord9 feat: gc region procedure Signed-off-by: discord9 refactor: ctx send gc/ref instr with procedure Signed-off-by: discord9 fix: config deser to default Signed-off-by: discord9 refactor: gc report Signed-off-by: discord9 wip: async index file rm Signed-off-by: discord9 fixme? Signed-off-by: discord9 typo Signed-off-by: discord9 more ut Signed-off-by: discord9 test: more mock test Signed-off-by: discord9 more Signed-off-by: discord9 refactor: split mock test Signed-off-by: discord9 clippy Signed-off-by: discord9 refactor: rm stuff Signed-off-by: discord9 test: mock add gc report per region Signed-off-by: discord9 fix: stricter table failure condition Signed-off-by: discord9 sutff Signed-off-by: discord9 feat: can do different table gc same time&more todos Signed-off-by: discord9 after rebase check Signed-off-by: discord9 * chore Signed-off-by: discord9 * chore Signed-off-by: discord9 * wip: refactoring test Signed-off-by: discord9 * fix: also get from follower peer Signed-off-by: discord9 * test: update mock test Signed-off-by: discord9 * revert some change&clean up Signed-off-by: discord9 * typo Signed-off-by: discord9 * chore: after rebase fix Signed-off-by: discord9 * choer: more fix Signed-off-by: discord9 * revert Signed-off-by: discord9 * revert change to handler.rs Signed-off-by: discord9 * test: fix mock test Signed-off-by: discord9 * chore: rm retry Signed-off-by: discord9 * Apply suggestions from code review Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: discord9 * after rebase fix Signed-off-by: discord9 * pcr Signed-off-by: discord9 --------- Signed-off-by: discord9 Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/meta-srv/src/gc.rs | 2 + src/meta-srv/src/gc/mock.rs | 458 ++++++++++++++++ src/meta-srv/src/gc/mock/basic.rs | 164 ++++++ src/meta-srv/src/gc/mock/candidate_select.rs | 390 ++++++++++++++ src/meta-srv/src/gc/mock/con.rs | 516 +++++++++++++++++++ src/meta-srv/src/gc/mock/config.rs | 197 +++++++ src/meta-srv/src/gc/mock/err_handle.rs | 293 +++++++++++ src/meta-srv/src/gc/mock/full_list.rs | 272 ++++++++++ src/meta-srv/src/gc/mock/integration.rs | 252 +++++++++ src/meta-srv/src/gc/mock/misc.rs | 155 ++++++ 10 files changed, 2699 insertions(+) create mode 100644 src/meta-srv/src/gc/mock.rs create mode 100644 src/meta-srv/src/gc/mock/basic.rs create mode 100644 src/meta-srv/src/gc/mock/candidate_select.rs create mode 100644 src/meta-srv/src/gc/mock/con.rs create mode 100644 src/meta-srv/src/gc/mock/config.rs create mode 100644 src/meta-srv/src/gc/mock/err_handle.rs create mode 100644 src/meta-srv/src/gc/mock/full_list.rs create mode 100644 src/meta-srv/src/gc/mock/integration.rs create mode 100644 src/meta-srv/src/gc/mock/misc.rs diff --git a/src/meta-srv/src/gc.rs b/src/meta-srv/src/gc.rs index 3677e72a41..d8c0adb204 100644 --- a/src/meta-srv/src/gc.rs +++ b/src/meta-srv/src/gc.rs @@ -23,6 +23,8 @@ use store_api::storage::RegionId; mod candidate; mod ctx; mod handler; +#[cfg(test)] +mod mock; mod options; mod procedure; mod scheduler; diff --git a/src/meta-srv/src/gc/mock.rs b/src/meta-srv/src/gc/mock.rs new file mode 100644 index 0000000000..3fdc664495 --- /dev/null +++ b/src/meta-srv/src/gc/mock.rs @@ -0,0 +1,458 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod basic; +mod candidate_select; +mod con; +mod config; +mod err_handle; +mod full_list; +mod integration; +mod misc; + +use std::collections::{HashMap, HashSet}; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; + +use common_meta::datanode::{RegionManifestInfo, RegionStat}; +use common_meta::key::table_route::PhysicalTableRouteValue; +use common_meta::peer::Peer; +use common_meta::rpc::router::{Region, RegionRoute}; +use common_telemetry::debug; +use ordered_float::OrderedFloat; +use store_api::region_engine::RegionRole; +use store_api::storage::{FileRefsManifest, GcReport, RegionId}; +use table::metadata::TableId; +use tokio::sync::mpsc::Sender; + +use crate::error::{Result, UnexpectedSnafu}; +use crate::gc::candidate::GcCandidate; +use crate::gc::ctx::SchedulerCtx; +use crate::gc::handler::Region2Peers; +use crate::gc::options::GcSchedulerOptions; +use crate::gc::scheduler::{Event, GcScheduler}; + +pub const TEST_REGION_SIZE_200MB: u64 = 200_000_000; + +/// Helper function to create an empty GcReport for the given region IDs +pub fn new_empty_report_with(region_ids: impl IntoIterator) -> GcReport { + let mut deleted_files = HashMap::new(); + for region_id in region_ids { + deleted_files.insert(region_id, vec![]); + } + GcReport { + deleted_files, + need_retry_regions: HashSet::new(), + } +} + +#[allow(clippy::type_complexity)] +#[derive(Debug, Default)] +pub struct MockSchedulerCtx { + pub table_to_region_stats: Arc>>>>, + pub table_routes: Arc>>, + pub file_refs: Arc>>, + pub gc_reports: Arc>>, + pub candidates: Arc>>>>, + pub get_table_to_region_stats_calls: Arc>, + pub get_file_references_calls: Arc>, + pub gc_regions_calls: Arc>, + // Error injection fields for testing + pub get_table_to_region_stats_error: Arc>>, + pub get_table_route_error: Arc>>, + pub get_file_references_error: Arc>>, + pub gc_regions_error: Arc>>, + // Retry testing fields + pub gc_regions_retry_count: Arc>>, + pub gc_regions_error_sequence: Arc>>, + pub gc_regions_success_after_retries: Arc>>, + // Per-region error injection + pub gc_regions_per_region_errors: Arc>>, +} + +impl MockSchedulerCtx { + pub fn with_table_routes( + self, + table_routes: HashMap)>, + ) -> Self { + *self.table_routes.lock().unwrap() = table_routes + .into_iter() + .map(|(k, (phy_id, region2peer))| { + let phy = PhysicalTableRouteValue::new( + region2peer + .into_iter() + .map(|(region_id, peer)| RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(peer), + ..Default::default() + }) + .collect(), + ); + + (k, (phy_id, phy)) + }) + .collect(); + self + } + + /// Set an error to be returned by `get_table_to_region_stats` + #[allow(dead_code)] + pub fn with_get_table_to_region_stats_error(self, error: crate::error::Error) -> Self { + *self.get_table_to_region_stats_error.lock().unwrap() = Some(error); + self + } + + /// Set an error to be returned by `get_table_route` + pub fn set_table_route_error(&self, error: crate::error::Error) { + *self.get_table_route_error.lock().unwrap() = Some(error); + } + + /// Set an error to be returned by `get_file_references` + #[allow(dead_code)] + pub fn with_get_file_references_error(self, error: crate::error::Error) -> Self { + *self.get_file_references_error.lock().unwrap() = Some(error); + self + } + + /// Set an error to be returned by `gc_regions` + pub fn with_gc_regions_error(self, error: crate::error::Error) -> Self { + *self.gc_regions_error.lock().unwrap() = Some(error); + self + } + + /// Set a sequence of errors to be returned by `gc_regions` for retry testing + pub fn set_gc_regions_error_sequence(&self, errors: Vec) { + *self.gc_regions_error_sequence.lock().unwrap() = errors; + } + + /// Set success after a specific number of retries for a region + pub fn set_gc_regions_success_after_retries(&self, region_id: RegionId, retries: usize) { + self.gc_regions_success_after_retries + .lock() + .unwrap() + .insert(region_id, retries); + } + + /// Get the retry count for a specific region + pub fn get_retry_count(&self, region_id: RegionId) -> usize { + self.gc_regions_retry_count + .lock() + .unwrap() + .get(®ion_id) + .copied() + .unwrap_or(0) + } + + /// Reset all retry tracking + pub fn reset_retry_tracking(&self) { + *self.gc_regions_retry_count.lock().unwrap() = HashMap::new(); + *self.gc_regions_error_sequence.lock().unwrap() = Vec::new(); + *self.gc_regions_success_after_retries.lock().unwrap() = HashMap::new(); + } + + /// Set an error to be returned for a specific region + pub fn set_gc_regions_error_for_region(&self, region_id: RegionId, error: crate::error::Error) { + self.gc_regions_per_region_errors + .lock() + .unwrap() + .insert(region_id, error); + } + + /// Clear per-region errors + #[allow(unused)] + pub fn clear_gc_regions_per_region_errors(&self) { + self.gc_regions_per_region_errors.lock().unwrap().clear(); + } +} + +#[async_trait::async_trait] +impl SchedulerCtx for MockSchedulerCtx { + async fn get_table_to_region_stats(&self) -> Result>> { + *self.get_table_to_region_stats_calls.lock().unwrap() += 1; + + // Check if we should return an injected error + if let Some(error) = self.get_table_to_region_stats_error.lock().unwrap().take() { + return Err(error); + } + + Ok(self + .table_to_region_stats + .lock() + .unwrap() + .clone() + .unwrap_or_default()) + } + + async fn get_table_route( + &self, + table_id: TableId, + ) -> Result<(TableId, PhysicalTableRouteValue)> { + // Check if we should return an injected error + if let Some(error) = self.get_table_route_error.lock().unwrap().take() { + return Err(error); + } + + Ok(self + .table_routes + .lock() + .unwrap() + .get(&table_id) + .cloned() + .unwrap_or_else(|| (table_id, PhysicalTableRouteValue::default()))) + } + + async fn get_file_references( + &self, + query_regions: &[RegionId], + _related_regions: HashMap>, + region_to_peer: &Region2Peers, + _timeout: Duration, + ) -> Result { + *self.get_file_references_calls.lock().unwrap() += 1; + + // Check if we should return an injected error + if let Some(error) = self.get_file_references_error.lock().unwrap().take() { + return Err(error); + } + if query_regions + .iter() + .any(|region_id| !region_to_peer.contains_key(region_id)) + { + UnexpectedSnafu { + violated: format!( + "region_to_peer{region_to_peer:?} does not contain all region_ids requested: {:?}", + query_regions + ), + }.fail()?; + } + + Ok(self.file_refs.lock().unwrap().clone().unwrap_or_default()) + } + + async fn gc_regions( + &self, + _peer: Peer, + region_ids: &[RegionId], + _file_refs_manifest: &FileRefsManifest, + _full_file_listing: bool, + _timeout: Duration, + ) -> Result { + *self.gc_regions_calls.lock().unwrap() += 1; + + // Check per-region error injection first (for any region) + for ®ion_id in region_ids { + if let Some(error) = self + .gc_regions_per_region_errors + .lock() + .unwrap() + .remove(®ion_id) + { + *self + .gc_regions_retry_count + .lock() + .unwrap() + .entry(region_id) + .or_insert(0) += 1; + return Err(error); + } + } + + // Check if we should return an injected error + if let Some(error) = self.gc_regions_error.lock().unwrap().take() { + for region_id in region_ids { + *self + .gc_regions_retry_count + .lock() + .unwrap() + .entry(*region_id) + .or_insert(0) += 1; + } + return Err(error); + } + + // Handle error sequence for retry testing + { + let mut error_sequence = self.gc_regions_error_sequence.lock().unwrap(); + if !error_sequence.is_empty() { + let error = error_sequence.remove(0); + for region_id in region_ids { + *self + .gc_regions_retry_count + .lock() + .unwrap() + .entry(*region_id) + .or_insert(0) += 1; + } + return Err(error); + } + } + + // Build the final report by processing each region individually + let mut final_report = GcReport::default(); + let gc_reports = self.gc_reports.lock().unwrap(); + let success_after_retries = self.gc_regions_success_after_retries.lock().unwrap(); + + for ®ion_id in region_ids { + // Get current retry count for this region + let retry_count = self + .gc_regions_retry_count + .lock() + .unwrap() + .get(®ion_id) + .copied() + .unwrap_or(0); + + // Check if this region should succeed or need retry + if let Some(&required_retries) = success_after_retries.get(®ion_id) { + if retry_count < required_retries { + debug!( + "Region {} needs retry (attempt {}/{})", + region_id, + retry_count + 1, + required_retries + ); + // This region needs more retries - add to need_retry_regions + final_report.need_retry_regions.insert(region_id); + // Track the retry attempt + let mut retry_count_map = self.gc_regions_retry_count.lock().unwrap(); + *retry_count_map.entry(region_id).or_insert(0) += 1; + } else { + debug!( + "Region {} has completed retries - succeeding now", + region_id + ); + // This region has completed all required retries - succeed + if let Some(report) = gc_reports.get(®ion_id) { + final_report.merge(report.clone()); + } + // Track the success attempt + let mut retry_count_map = self.gc_regions_retry_count.lock().unwrap(); + *retry_count_map.entry(region_id).or_insert(0) += 1; + } + } else { + // No retry requirement - check if we have a GC report for this region + if let Some(report) = gc_reports.get(®ion_id) { + // We have a GC report - succeed immediately + final_report.merge(report.clone()); + // Track the success attempt + let mut retry_count_map = self.gc_regions_retry_count.lock().unwrap(); + *retry_count_map.entry(region_id).or_insert(0) += 1; + } else { + // No GC report available - this region should be marked for retry + final_report.need_retry_regions.insert(region_id); + // Track the attempt + let mut retry_count_map = self.gc_regions_retry_count.lock().unwrap(); + *retry_count_map.entry(region_id).or_insert(0) += 1; + } + } + } + + // Return the report with need_retry_regions populated - let the caller handle retry logic + Ok(final_report) + } +} + +pub struct TestEnv { + pub scheduler: GcScheduler, + pub ctx: Arc, + #[allow(dead_code)] + tx: Sender, +} + +#[allow(unused)] +impl TestEnv { + pub fn new() -> Self { + let ctx = Arc::new(MockSchedulerCtx::default()); + let (tx, rx) = GcScheduler::channel(); + let config = GcSchedulerOptions::default(); + + let scheduler = GcScheduler { + ctx: ctx.clone(), + receiver: rx, + config, + region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())), + last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())), + }; + + Self { scheduler, ctx, tx } + } + + pub fn with_candidates(self, candidates: HashMap>) -> Self { + *self.ctx.candidates.lock().unwrap() = Some(candidates); + self + } + + #[allow(dead_code)] + pub async fn run_scheduler(mut self) { + self.scheduler.run().await; + } + + #[allow(dead_code)] + pub async fn tick(&self) { + self.tx.send(Event::Tick).await.unwrap(); + } +} + +/// Helper function to create a mock GC candidate that will pass the GC threshold +fn new_candidate(region_id: RegionId, score: f64) -> GcCandidate { + // will pass threshold for gc + let region_stat = mock_region_stat(region_id, RegionRole::Leader, 10_000, 10); + + GcCandidate { + region_id, + score: OrderedFloat(score), + region_stat, + } +} + +/// Helper function to create a mock GC candidate +fn mock_candidate(region_id: RegionId) -> GcCandidate { + let region_stat = mock_region_stat(region_id, RegionRole::Leader, TEST_REGION_SIZE_200MB, 10); + GcCandidate { + region_id, + score: ordered_float::OrderedFloat(1.0), + region_stat, + } +} + +/// Helper function to create a mock RegionStat +fn mock_region_stat( + id: RegionId, + role: RegionRole, + approximate_bytes: u64, + sst_num: u64, +) -> RegionStat { + RegionStat { + id, + role, + approximate_bytes, + sst_num, + region_manifest: RegionManifestInfo::Mito { + manifest_version: 0, + flushed_entry_id: 0, + file_removed_cnt: 0, + }, + rcus: 0, + wcus: 0, + engine: "mito".to_string(), + num_rows: 0, + memtable_size: 0, + manifest_size: 0, + sst_size: 0, + index_size: 0, + data_topic_latest_entry_id: 0, + metadata_topic_latest_entry_id: 0, + written_bytes: 0, + } +} diff --git a/src/meta-srv/src/gc/mock/basic.rs b/src/meta-srv/src/gc/mock/basic.rs new file mode 100644 index 0000000000..2cf3679245 --- /dev/null +++ b/src/meta-srv/src/gc/mock/basic.rs @@ -0,0 +1,164 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; +use std::time::Instant; + +use common_meta::peer::Peer; +use common_telemetry::init_default_ut_logging; +use store_api::region_engine::RegionRole; +use store_api::storage::{FileId, FileRefsManifest, GcReport, RegionId}; + +use crate::gc::mock::{ + MockSchedulerCtx, TEST_REGION_SIZE_200MB, TestEnv, mock_region_stat, new_candidate, +}; +use crate::gc::{GcScheduler, GcSchedulerOptions}; + +#[tokio::test] +async fn test_parallel_process_datanodes_empty() { + let env = TestEnv::new(); + let report = env + .scheduler + .parallel_process_datanodes(HashMap::new()) + .await; + + assert_eq!(report.per_datanode_reports.len(), 0); + assert_eq!(report.failed_datanodes.len(), 0); +} + +#[tokio::test] +async fn test_parallel_process_datanodes_with_candidates() { + init_default_ut_logging(); + + let table_id = 1; + let region_id = RegionId::new(table_id, 1); + let peer = Peer::new(1, ""); + let candidates = HashMap::from([(table_id, vec![new_candidate(region_id, 1.0)])]); + + let mut gc_reports = HashMap::new(); + let deleted_files = vec![FileId::random()]; + gc_reports.insert( + region_id, + GcReport { + deleted_files: HashMap::from([(region_id, deleted_files.clone())]), + ..Default::default() + }, + ); + let file_refs = FileRefsManifest { + manifest_version: HashMap::from([(region_id, 1)]), + ..Default::default() + }; + let ctx = MockSchedulerCtx { + gc_reports: Arc::new(Mutex::new(gc_reports)), + file_refs: Arc::new(Mutex::new(Some(file_refs))), + ..Default::default() + } + .with_table_routes(HashMap::from([( + table_id, + (table_id, vec![(region_id, peer.clone())]), + )])); + + let env = TestEnv::new(); + // We need to replace the ctx with the one with gc_reports + let mut scheduler = env.scheduler; + scheduler.ctx = Arc::new(ctx); + + // Convert table-based candidates to datanode-based candidates + let datanode_to_candidates = HashMap::from([( + peer, + candidates + .into_iter() + .flat_map(|(table_id, candidates)| candidates.into_iter().map(move |c| (table_id, c))) + .collect(), + )]); + + let report = scheduler + .parallel_process_datanodes(datanode_to_candidates) + .await; + + assert_eq!(report.per_datanode_reports.len(), 1); + assert_eq!(report.failed_datanodes.len(), 0); +} + +#[tokio::test] +async fn test_handle_tick() { + init_default_ut_logging(); + + let table_id = 1; + let region_id = RegionId::new(table_id, 1); + let peer = Peer::new(1, ""); + let candidates = HashMap::from([(table_id, vec![new_candidate(region_id, 1.0)])]); + + let mut gc_reports = HashMap::new(); + gc_reports.insert(region_id, GcReport::default()); + let file_refs = FileRefsManifest { + manifest_version: HashMap::from([(region_id, 1)]), + ..Default::default() + }; + let ctx = Arc::new( + MockSchedulerCtx { + table_to_region_stats: Arc::new(Mutex::new(Some(HashMap::from([( + table_id, + vec![mock_region_stat( + region_id, + RegionRole::Leader, + TEST_REGION_SIZE_200MB, + 10, + )], + )])))), + gc_reports: Arc::new(Mutex::new(gc_reports)), + candidates: Arc::new(Mutex::new(Some(candidates))), + file_refs: Arc::new(Mutex::new(Some(file_refs))), + ..Default::default() + } + .with_table_routes(HashMap::from([( + table_id, + (table_id, vec![(region_id, peer)]), + )])), + ); + + let scheduler = GcScheduler { + ctx: ctx.clone(), + receiver: GcScheduler::channel().1, + config: GcSchedulerOptions::default(), + region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())), + last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())), + }; + + let report = scheduler.handle_tick().await.unwrap(); + + // Validate the returned GcJobReport + assert_eq!( + report.per_datanode_reports.len(), + 1, + "Should process 1 datanode" + ); + assert_eq!( + report.failed_datanodes.len(), + 0, + "Should have 0 failed datanodes" + ); + + assert_eq!(*ctx.get_table_to_region_stats_calls.lock().unwrap(), 1); + assert_eq!(*ctx.get_file_references_calls.lock().unwrap(), 1); + assert_eq!(*ctx.gc_regions_calls.lock().unwrap(), 1); + + let tracker = scheduler.region_gc_tracker.lock().await; + assert!( + tracker.contains_key(®ion_id), + "Tracker should have one region: {:?}", + tracker + ); +} diff --git a/src/meta-srv/src/gc/mock/candidate_select.rs b/src/meta-srv/src/gc/mock/candidate_select.rs new file mode 100644 index 0000000000..73da83802a --- /dev/null +++ b/src/meta-srv/src/gc/mock/candidate_select.rs @@ -0,0 +1,390 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; +use std::time::Instant; + +use common_meta::datanode::RegionManifestInfo; +use common_telemetry::init_default_ut_logging; +use store_api::region_engine::RegionRole; +use store_api::storage::RegionId; + +use crate::gc::mock::{MockSchedulerCtx, TEST_REGION_SIZE_200MB, mock_region_stat}; +use crate::gc::{GcScheduler, GcSchedulerOptions}; + +/// Candidate Selection Tests +#[tokio::test] +async fn test_gc_candidate_filtering_by_role() { + init_default_ut_logging(); + + let table_id = 1; + let leader_region = RegionId::new(table_id, 1); + let follower_region = RegionId::new(table_id, 2); + + let mut leader_stat = mock_region_stat( + leader_region, + RegionRole::Leader, + TEST_REGION_SIZE_200MB, + 10, + ); // 200MB + + let mut follower_stat = mock_region_stat( + follower_region, + RegionRole::Follower, + TEST_REGION_SIZE_200MB, + 10, + ); // 200MB + + // Set up manifest info for scoring + if let RegionManifestInfo::Mito { + file_removed_cnt, .. + } = &mut leader_stat.region_manifest + { + *file_removed_cnt = 5; + } + if let RegionManifestInfo::Mito { + file_removed_cnt, .. + } = &mut follower_stat.region_manifest + { + *file_removed_cnt = 5; + } + + let table_stats = HashMap::from([(table_id, vec![leader_stat.clone(), follower_stat.clone()])]); + + let ctx = Arc::new(MockSchedulerCtx { + table_to_region_stats: Arc::new(Mutex::new(Some(table_stats))), + ..Default::default() + }); + + let scheduler = GcScheduler { + ctx: ctx.clone(), + receiver: GcScheduler::channel().1, + config: GcSchedulerOptions::default(), + region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())), + last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())), + }; + + let stats = ctx + .table_to_region_stats + .lock() + .unwrap() + .clone() + .unwrap_or_default(); + + let candidates = scheduler.select_gc_candidates(&stats).await.unwrap(); + + // Should only select leader regions + assert_eq!( + candidates.len(), + 1, + "Expected 1 table with candidates, got {}", + candidates.len() + ); + if let Some(table_candidates) = candidates.get(&table_id) { + assert_eq!( + table_candidates.len(), + 1, + "Expected 1 candidate for table {}, got {}", + table_id, + table_candidates.len() + ); + assert_eq!( + table_candidates[0].region_id, leader_region, + "Expected leader region {}, got {}", + leader_region, table_candidates[0].region_id + ); + } else { + panic!("Expected table {} to have candidates", table_id); + } +} + +#[tokio::test] +async fn test_gc_candidate_size_threshold() { + init_default_ut_logging(); + + let table_id = 1; + let small_region = RegionId::new(table_id, 1); + let large_region = RegionId::new(table_id, 2); + + let mut small_stat = mock_region_stat(small_region, RegionRole::Leader, 50_000_000, 5); // 50MB + if let RegionManifestInfo::Mito { + file_removed_cnt, .. + } = &mut small_stat.region_manifest + { + *file_removed_cnt = 3; + } + + let mut large_stat = + mock_region_stat(large_region, RegionRole::Leader, TEST_REGION_SIZE_200MB, 20); // 200MB + if let RegionManifestInfo::Mito { + file_removed_cnt, .. + } = &mut large_stat.region_manifest + { + *file_removed_cnt = 5; + } + + let table_stats = HashMap::from([(table_id, vec![small_stat, large_stat])]); + + let ctx = Arc::new(MockSchedulerCtx { + table_to_region_stats: Arc::new(Mutex::new(Some(table_stats))), + ..Default::default() + }); + + let config = GcSchedulerOptions { + min_region_size_threshold: 100 * 1024 * 1024, // 100MB (default) + ..Default::default() + }; + + let scheduler = GcScheduler { + ctx: ctx.clone(), + receiver: GcScheduler::channel().1, + config, + region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())), + last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())), + }; + + let stats = ctx + .table_to_region_stats + .lock() + .unwrap() + .clone() + .unwrap_or_default(); + + let candidates = scheduler.select_gc_candidates(&stats).await.unwrap(); + + // Should only select large region + assert_eq!( + candidates.len(), + 1, + "Expected 1 table with candidates, got {}", + candidates.len() + ); + if let Some(table_candidates) = candidates.get(&table_id) { + assert_eq!( + table_candidates.len(), + 1, + "Expected 1 candidate for table {}, got {}", + table_id, + table_candidates.len() + ); + assert_eq!( + table_candidates[0].region_id, large_region, + "Expected large region {}, got {}", + large_region, table_candidates[0].region_id + ); + } else { + panic!("Expected table {} to have candidates", table_id); + } +} + +#[tokio::test] +async fn test_gc_candidate_scoring() { + init_default_ut_logging(); + + let table_id = 1; + let low_score_region = RegionId::new(table_id, 1); + let high_score_region = RegionId::new(table_id, 2); + + let mut low_stat = mock_region_stat( + low_score_region, + RegionRole::Leader, + TEST_REGION_SIZE_200MB, + 5, + ); // 200MB + // Set low file removal rate for low_score_region + if let RegionManifestInfo::Mito { + file_removed_cnt, .. + } = &mut low_stat.region_manifest + { + *file_removed_cnt = 2; + } + + let mut high_stat = mock_region_stat( + high_score_region, + RegionRole::Leader, + TEST_REGION_SIZE_200MB, + 50, + ); // 200MB + // Set high file removal rate for high_score_region + if let RegionManifestInfo::Mito { + file_removed_cnt, .. + } = &mut high_stat.region_manifest + { + *file_removed_cnt = 20; + } + + let table_stats = HashMap::from([(table_id, vec![low_stat, high_stat])]); + + let ctx = Arc::new(MockSchedulerCtx { + table_to_region_stats: Arc::new(Mutex::new(Some(table_stats))), + ..Default::default() + }); + + let config = GcSchedulerOptions { + sst_count_weight: 1.0, + file_removed_count_weight: 0.5, + min_region_size_threshold: 100 * 1024 * 1024, // 100MB (default) + ..Default::default() + }; + + let scheduler = GcScheduler { + ctx: ctx.clone(), + receiver: GcScheduler::channel().1, + config, + region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())), + last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())), + }; + + let stats = ctx + .table_to_region_stats + .lock() + .unwrap() + .clone() + .unwrap_or_default(); + + let candidates = scheduler.select_gc_candidates(&stats).await.unwrap(); + + // Should select both regions but high score region should be first + assert_eq!( + candidates.len(), + 1, + "Expected 1 table with candidates, got {}", + candidates.len() + ); + if let Some(table_candidates) = candidates.get(&table_id) { + assert_eq!( + table_candidates.len(), + 2, + "Expected 2 candidates for table {}, got {}", + table_id, + table_candidates.len() + ); + // Higher score region should come first (sorted by score descending) + assert_eq!( + table_candidates[0].region_id, high_score_region, + "High score region should be first" + ); + assert!( + table_candidates[0].score > table_candidates[1].score, + "High score region should have higher score: {} > {}", + table_candidates[0].score, + table_candidates[1].score + ); + } else { + panic!("Expected table {} to have candidates", table_id); + } +} + +#[tokio::test] +async fn test_gc_candidate_regions_per_table_threshold() { + init_default_ut_logging(); + + let table_id = 1; + // Create 10 regions for the same table + let mut region_stats = Vec::new(); + + for i in 0..10 { + let region_id = RegionId::new(table_id, i + 1); + let mut stat = mock_region_stat(region_id, RegionRole::Leader, TEST_REGION_SIZE_200MB, 20); // 200MB + + // Set different file removal rates to create different scores + // Higher region IDs get higher scores (better GC candidates) + if let RegionManifestInfo::Mito { + file_removed_cnt, .. + } = &mut stat.region_manifest + { + *file_removed_cnt = (i as u64 + 1) * 2; // Region 1: 2, Region 2: 4, ..., Region 10: 20 + } + + region_stats.push(stat); + } + + let table_stats = HashMap::from([(table_id, region_stats)]); + + let ctx = Arc::new(MockSchedulerCtx { + table_to_region_stats: Arc::new(Mutex::new(Some(table_stats))), + ..Default::default() + }); + + // Set regions_per_table_threshold to 3 + let config = GcSchedulerOptions { + regions_per_table_threshold: 3, + min_region_size_threshold: 100 * 1024 * 1024, // 100MB (default) + ..Default::default() + }; + + let scheduler = GcScheduler { + ctx: ctx.clone(), + receiver: GcScheduler::channel().1, + config, + region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())), + last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())), + }; + + let stats = ctx + .table_to_region_stats + .lock() + .unwrap() + .clone() + .unwrap_or_default(); + + let candidates = scheduler.select_gc_candidates(&stats).await.unwrap(); + + // Should have 1 table with candidates + assert_eq!( + candidates.len(), + 1, + "Expected 1 table with candidates, got {}", + candidates.len() + ); + + if let Some(table_candidates) = candidates.get(&table_id) { + // Should only have 3 candidates due to regions_per_table_threshold + assert_eq!( + table_candidates.len(), + 3, + "Expected 3 candidates for table {} due to regions_per_table_threshold, got {}", + table_id, + table_candidates.len() + ); + + // Verify that the top 3 scoring regions are selected + // Regions 8, 9, 10 should have the highest scores (file_removed_cnt: 16, 18, 20) + // They should be returned in descending order by score + let expected_regions = vec![10, 9, 8]; + let actual_regions: Vec = table_candidates + .iter() + .map(|c| c.region_id.region_number()) + .collect(); + + assert_eq!( + actual_regions, expected_regions, + "Expected regions {:?} to be selected, got {:?}", + expected_regions, actual_regions + ); + + // Verify they are sorted by score in descending order + for i in 0..table_candidates.len() - 1 { + assert!( + table_candidates[i].score >= table_candidates[i + 1].score, + "Candidates should be sorted by score descending: {} >= {}", + table_candidates[i].score, + table_candidates[i + 1].score + ); + } + } else { + panic!("Expected table {} to have candidates", table_id); + } +} diff --git a/src/meta-srv/src/gc/mock/con.rs b/src/meta-srv/src/gc/mock/con.rs new file mode 100644 index 0000000000..2bef9b9896 --- /dev/null +++ b/src/meta-srv/src/gc/mock/con.rs @@ -0,0 +1,516 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, HashSet}; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; + +use common_meta::key::table_route::PhysicalTableRouteValue; +use common_meta::peer::Peer; +use common_meta::rpc::router::{Region, RegionRoute}; +use common_telemetry::{info, init_default_ut_logging}; +use store_api::region_engine::RegionRole; +use store_api::storage::{FileId, FileRefsManifest, GcReport, RegionId}; + +use crate::gc::mock::{ + MockSchedulerCtx, TEST_REGION_SIZE_200MB, mock_candidate, mock_region_stat, new_candidate, +}; +use crate::gc::{GcScheduler, GcSchedulerOptions}; + +/// Concurrent Processing Tests +#[tokio::test] +async fn test_concurrent_table_processing_limits() { + init_default_ut_logging(); + + let mut candidates = HashMap::new(); + let mut gc_reports = HashMap::new(); + + // Create many tables with candidates + for table_id in 1..=10 { + let region_id = RegionId::new(table_id, 1); + candidates.insert(table_id, vec![new_candidate(region_id, 1.0)]); + gc_reports.insert( + region_id, + GcReport { + deleted_files: HashMap::from([(region_id, vec![FileId::random()])]), + ..Default::default() + }, + ); + } + + let ctx = MockSchedulerCtx { + candidates: Arc::new(Mutex::new(Some(candidates))), + file_refs: Arc::new(Mutex::new(Some(FileRefsManifest { + manifest_version: (1..=10).map(|i| (RegionId::new(i, 1), 1)).collect(), + ..Default::default() + }))), + gc_reports: Arc::new(Mutex::new(gc_reports)), + ..Default::default() + } + .with_table_routes( + (1..=10) + .map(|table_id| { + let region_id = RegionId::new(table_id, 1); + (table_id, (table_id, vec![(region_id, Peer::new(1, ""))])) + }) + .collect(), + ); + + let ctx = Arc::new(ctx); + + let config = GcSchedulerOptions { + max_concurrent_tables: 3, // Set a low limit + retry_backoff_duration: Duration::from_millis(50), // for faster test + ..Default::default() + }; + + let scheduler = GcScheduler { + ctx: ctx.clone(), + receiver: GcScheduler::channel().1, + config, + region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())), + last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())), + }; + + let candidates = ctx.candidates.lock().unwrap().clone().unwrap_or_default(); + + // Convert table-based candidates to datanode-based candidates + let peer = Peer::new(1, ""); + let datanode_to_candidates = HashMap::from([( + peer, + candidates + .into_iter() + .flat_map(|(table_id, candidates)| candidates.into_iter().map(move |c| (table_id, c))) + .collect(), + )]); + + let report = scheduler + .parallel_process_datanodes(datanode_to_candidates) + .await; + + // Should process all datanodes + assert_eq!(report.per_datanode_reports.len(), 1); + assert_eq!(report.failed_datanodes.len(), 0); +} + +#[tokio::test] +async fn test_datanode_processes_tables_with_partial_gc_failures() { + init_default_ut_logging(); + + let table1 = 1; + let region1 = RegionId::new(table1, 1); + let table2 = 2; + let region2 = RegionId::new(table2, 1); + let peer = Peer::new(1, ""); + + let mut candidates = HashMap::new(); + candidates.insert(table1, vec![new_candidate(region1, 1.0)]); + candidates.insert(table2, vec![new_candidate(region2, 1.0)]); + + // Set up GC reports for success and failure + let mut gc_reports = HashMap::new(); + gc_reports.insert( + region1, + GcReport { + deleted_files: HashMap::from([(region1, vec![])]), + ..Default::default() + }, + ); + // region2 will have no GC report, simulating failure + + let file_refs = FileRefsManifest { + manifest_version: HashMap::from([(region1, 1), (region2, 1)]), + ..Default::default() + }; + + let ctx = Arc::new( + MockSchedulerCtx { + gc_reports: Arc::new(Mutex::new(gc_reports)), + file_refs: Arc::new(Mutex::new(Some(file_refs))), + candidates: Arc::new(Mutex::new(Some(candidates))), + ..Default::default() + } + .with_table_routes(HashMap::from([ + (table1, (table1, vec![(region1, peer.clone())])), + (table2, (table2, vec![(region2, peer.clone())])), + ])), + ); + + let scheduler = GcScheduler { + ctx: ctx.clone(), + receiver: GcScheduler::channel().1, + config: GcSchedulerOptions::default(), + region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())), + last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())), + }; + + let candidates = ctx.candidates.lock().unwrap().clone().unwrap_or_default(); + + // Convert table-based candidates to datanode-based candidates + + let datanode_to_candidates = HashMap::from([( + peer, + candidates + .into_iter() + .flat_map(|(table_id, candidates)| candidates.into_iter().map(move |c| (table_id, c))) + .collect(), + )]); + + let report = scheduler + .parallel_process_datanodes(datanode_to_candidates) + .await; + + // Should have one datanode with mixed results + assert_eq!(report.per_datanode_reports.len(), 1); + // also check one failed region (region2 has no GC report, so it should be in need_retry_regions) + let datanode_report = report.per_datanode_reports.values().next().unwrap(); + assert_eq!(datanode_report.need_retry_regions.len(), 1); + assert_eq!(report.failed_datanodes.len(), 0); +} + +// Region Concurrency Tests + +#[tokio::test] +async fn test_region_gc_concurrency_limit() { + init_default_ut_logging(); + + let table_id = 1; + let peer = Peer::new(1, ""); + + // Create multiple regions for the same table + let mut region_stats = Vec::new(); + let mut candidates = Vec::new(); + let mut gc_reports = HashMap::new(); + + for i in 1..=10 { + let region_id = RegionId::new(table_id, i as u32); + let region_stat = + mock_region_stat(region_id, RegionRole::Leader, TEST_REGION_SIZE_200MB, 10); // 200MB + region_stats.push(region_stat); + + candidates.push(mock_candidate(region_id)); + + gc_reports.insert( + region_id, + GcReport { + deleted_files: HashMap::from([( + region_id, + vec![FileId::random(), FileId::random()], + )]), + ..Default::default() + }, + ); + } + + let table_stats = HashMap::from([(table_id, region_stats)]); + + let file_refs = FileRefsManifest { + manifest_version: (1..=10) + .map(|i| (RegionId::new(table_id, i as u32), 1)) + .collect(), + ..Default::default() + }; + + let ctx = Arc::new( + MockSchedulerCtx { + table_to_region_stats: Arc::new(Mutex::new(Some(table_stats))), + gc_reports: Arc::new(Mutex::new(gc_reports)), + file_refs: Arc::new(Mutex::new(Some(file_refs))), + ..Default::default() + } + .with_table_routes(HashMap::from([( + table_id, + ( + table_id, + (1..=10) + .map(|i| (RegionId::new(table_id, i as u32), peer.clone())) + .collect(), + ), + )])), + ); + + // Configure low concurrency limit + let config = GcSchedulerOptions { + region_gc_concurrency: 3, // Only 3 regions can be processed concurrently + retry_backoff_duration: Duration::from_millis(50), // for faster test + ..Default::default() + }; + + let scheduler = GcScheduler { + ctx: ctx.clone(), + receiver: GcScheduler::channel().1, + config, + region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())), + last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())), + }; + + let start_time = Instant::now(); + let report = scheduler + .process_datanode_gc( + peer, + candidates.into_iter().map(|c| (table_id, c)).collect(), + ) + .await + .unwrap(); + let duration = start_time.elapsed(); + + // All regions should be processed successfully + // Check that all 10 regions have deleted files + assert_eq!(report.deleted_files.len(), 10); + for i in 1..=10 { + let region_id = RegionId::new(table_id, i as u32); + assert!(report.deleted_files.contains_key(®ion_id)); + assert_eq!(report.deleted_files[®ion_id].len(), 2); // Each region has 2 deleted files + } + assert!(report.need_retry_regions.is_empty()); + + // Verify that concurrency limit was respected (this is hard to test directly, + // but we can verify that the processing completed successfully) + info!( + "Processed 10 regions with concurrency limit 3 in {:?}", + duration + ); +} + +#[tokio::test] +async fn test_region_gc_concurrency_with_partial_failures() { + init_default_ut_logging(); + + let table_id = 1; + let peer = Peer::new(1, ""); + + // Create multiple regions with mixed success/failure + let mut region_stats = Vec::new(); + let mut candidates = Vec::new(); + let mut gc_reports = HashMap::new(); + + // Create the context first so we can set errors on it + let ctx = Arc::new(MockSchedulerCtx::default()); + + for i in 1..=6 { + let region_id = RegionId::new(table_id, i as u32); + let region_stat = + mock_region_stat(region_id, RegionRole::Leader, TEST_REGION_SIZE_200MB, 10); // 200MB + region_stats.push(region_stat); + + candidates.push(mock_candidate(region_id)); + + if i % 2 == 0 { + // Even regions will succeed + gc_reports.insert( + region_id, + GcReport { + deleted_files: HashMap::from([( + region_id, + vec![FileId::random(), FileId::random()], + )]), + ..Default::default() + }, + ); + } else { + // Odd regions will fail - don't add them to gc_reports + // This will cause them to be marked as needing retry + } + } + + let table_stats = HashMap::from([(table_id, region_stats)]); + + let file_refs = FileRefsManifest { + manifest_version: (1..=6) + .map(|i| (RegionId::new(table_id, i as u32), 1)) + .collect(), + ..Default::default() + }; + + // Update the context with the data + *ctx.table_to_region_stats.lock().unwrap() = Some(table_stats); + *ctx.gc_reports.lock().unwrap() = gc_reports; + *ctx.file_refs.lock().unwrap() = Some(file_refs); + let region_routes = (1..=6) + .map(|i| RegionRoute { + region: Region::new_test(RegionId::new(table_id, i as u32)), + leader_peer: Some(peer.clone()), + ..Default::default() + }) + .collect(); + + *ctx.table_routes.lock().unwrap() = HashMap::from([( + table_id, + (table_id, PhysicalTableRouteValue::new(region_routes)), + )]); + + // Configure concurrency limit + let config = GcSchedulerOptions { + region_gc_concurrency: 2, // Process 2 regions concurrently + retry_backoff_duration: Duration::from_millis(50), // for faster test + ..Default::default() + }; + + let scheduler = GcScheduler { + ctx: ctx.clone(), + receiver: GcScheduler::channel().1, + config, + region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())), + last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())), + }; + + let datanode_to_candidates = HashMap::from([( + peer.clone(), + candidates.into_iter().map(|c| (table_id, c)).collect(), + )]); + + let report = scheduler + .parallel_process_datanodes(datanode_to_candidates) + .await; + + let report = report.per_datanode_reports.get(&peer.id).unwrap(); + + // Should have 3 successful and 3 failed regions + // Even regions (2, 4, 6) should succeed, odd regions (1, 3, 5) should fail + let mut successful_regions = 0; + let mut failed_regions = 0; + + for i in 1..=6 { + let region_id = RegionId::new(table_id, i as u32); + if i % 2 == 0 { + // Even regions should succeed + if report.deleted_files.contains_key(®ion_id) { + successful_regions += 1; + } + } else { + // Odd regions should fail - they should be in need_retry_regions + if report.need_retry_regions.contains(®ion_id) { + failed_regions += 1; + } + } + } + + // In the new implementation, regions that cause gc_regions to return an error + // are added to need_retry_regions. Let's check if we have the expected mix. + info!( + "Successful regions: {}, Failed regions: {}", + successful_regions, failed_regions + ); + info!( + "Deleted files: {:?}", + report.deleted_files.keys().collect::>() + ); + info!("Need retry regions: {:?}", report.need_retry_regions); + + // The exact count might vary depending on how the mock handles errors, + // but we should have some successful and some failed regions + assert!( + successful_regions > 0, + "Should have at least some successful regions" + ); + assert!( + failed_regions > 0, + "Should have at least some failed regions" + ); +} + +#[tokio::test] +async fn test_region_gc_concurrency_with_retryable_errors() { + init_default_ut_logging(); + + let table_id = 1; + let peer = Peer::new(1, ""); + + // Create multiple regions + let mut region_stats = Vec::new(); + let mut candidates = Vec::new(); + + for i in 1..=5 { + let region_id = RegionId::new(table_id, i as u32); + let region_stat = + mock_region_stat(region_id, RegionRole::Leader, TEST_REGION_SIZE_200MB, 10); // 200MB + region_stats.push(region_stat); + candidates.push(mock_candidate(region_id)); + } + + let table_stats = HashMap::from([(table_id, region_stats)]); + + let file_refs = FileRefsManifest { + manifest_version: (1..=5) + .map(|i| (RegionId::new(table_id, i as u32), 1)) + .collect(), + ..Default::default() + }; + + let gc_report = (1..=5) + .map(|i| { + let region_id = RegionId::new(table_id, i as u32); + ( + region_id, + // mock the actual gc report with deleted files when succeeded(even no files to delete) + GcReport::new(HashMap::from([(region_id, vec![])]), HashSet::new()), + ) + }) + .collect(); + + let ctx = Arc::new( + MockSchedulerCtx { + table_to_region_stats: Arc::new(Mutex::new(Some(table_stats))), + file_refs: Arc::new(Mutex::new(Some(file_refs))), + gc_reports: Arc::new(Mutex::new(gc_report)), + ..Default::default() + } + .with_table_routes(HashMap::from([( + table_id, + ( + table_id, + (1..=5) + .map(|i| (RegionId::new(table_id, i as u32), peer.clone())) + .collect(), + ), + )])), + ); + + // Configure concurrency limit + let config = GcSchedulerOptions { + region_gc_concurrency: 2, // Process 2 regions concurrently + retry_backoff_duration: Duration::from_millis(50), + ..Default::default() + }; + + let scheduler = GcScheduler { + ctx: ctx.clone(), + receiver: GcScheduler::channel().1, + config, + region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())), + last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())), + }; + + let datanode_to_candidates = HashMap::from([( + peer.clone(), + candidates.into_iter().map(|c| (table_id, c)).collect(), + )]); + let report = scheduler + .parallel_process_datanodes(datanode_to_candidates) + .await; + + let report = report.per_datanode_reports.get(&peer.id).unwrap(); + + // In the new implementation without retry logic, all regions should be processed + // The exact behavior depends on how the mock handles the regions + info!( + "Deleted files: {:?}", + report.deleted_files.keys().collect::>() + ); + info!("Need retry regions: {:?}", report.need_retry_regions); + + // We should have processed all 5 regions in some way + let total_processed = report.deleted_files.len() + report.need_retry_regions.len(); + assert_eq!(total_processed, 5, "Should have processed all 5 regions"); +} diff --git a/src/meta-srv/src/gc/mock/config.rs b/src/meta-srv/src/gc/mock/config.rs new file mode 100644 index 0000000000..f4ec9be948 --- /dev/null +++ b/src/meta-srv/src/gc/mock/config.rs @@ -0,0 +1,197 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; +use std::time::Instant; + +use common_meta::datanode::RegionManifestInfo; +use common_telemetry::init_default_ut_logging; +use store_api::region_engine::RegionRole; +use store_api::storage::RegionId; + +use crate::gc::mock::{MockSchedulerCtx, TEST_REGION_SIZE_200MB, mock_region_stat}; +use crate::gc::{GcScheduler, GcSchedulerOptions}; + +/// Configuration Tests +#[tokio::test] +async fn test_different_gc_weights() { + init_default_ut_logging(); + + let table_id = 1; + let region_id = RegionId::new(table_id, 1); + + let mut region_stat = + mock_region_stat(region_id, RegionRole::Leader, TEST_REGION_SIZE_200MB, 10); // 200MB to pass size threshold + + if let RegionManifestInfo::Mito { + file_removed_cnt, .. + } = &mut region_stat.region_manifest + { + *file_removed_cnt = 5; + } + + let table_stats = HashMap::from([(table_id, vec![region_stat])]); + + let ctx = Arc::new(MockSchedulerCtx { + table_to_region_stats: Arc::new(Mutex::new(Some(table_stats))), + ..Default::default() + }); + + // Test with different weights + let config1 = GcSchedulerOptions { + sst_count_weight: 2.0, + file_removed_count_weight: 0.5, + min_region_size_threshold: 100 * 1024 * 1024, // 100MB (default) + ..Default::default() + }; + + let scheduler1 = GcScheduler { + ctx: ctx.clone(), + receiver: GcScheduler::channel().1, + config: config1, + region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())), + last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())), + }; + + let stats = ctx + .table_to_region_stats + .lock() + .unwrap() + .clone() + .unwrap_or_default(); + + let candidates1 = scheduler1.select_gc_candidates(&stats).await.unwrap(); + + let config2 = GcSchedulerOptions { + sst_count_weight: 0.5, + file_removed_count_weight: 2.0, + min_region_size_threshold: 100 * 1024 * 1024, // 100MB (default) + ..Default::default() + }; + + let scheduler2 = GcScheduler { + ctx: ctx.clone(), + receiver: GcScheduler::channel().1, + config: config2, + region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())), + last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())), + }; + + let stats = &ctx + .table_to_region_stats + .lock() + .unwrap() + .clone() + .unwrap_or_default(); + let candidates2 = scheduler2.select_gc_candidates(stats).await.unwrap(); + + // Both should select the region but with different scores + assert_eq!( + candidates1.len(), + 1, + "Expected 1 table with candidates for config1, got {}", + candidates1.len() + ); + assert_eq!( + candidates2.len(), + 1, + "Expected 1 table with candidates for config2, got {}", + candidates2.len() + ); + + // Verify the region is actually selected + assert!( + candidates1.contains_key(&table_id), + "Config1 should contain table_id {}", + table_id + ); + assert!( + candidates2.contains_key(&table_id), + "Config2 should contain table_id {}", + table_id + ); +} + +#[tokio::test] +async fn test_regions_per_table_threshold() { + init_default_ut_logging(); + + let table_id = 1; + let mut region_stats = Vec::new(); + + // Create many regions + for i in 1..=10 { + let region_id = RegionId::new(table_id, i as u32); + let mut stat = mock_region_stat(region_id, RegionRole::Leader, TEST_REGION_SIZE_200MB, 10); // 200MB + + if let RegionManifestInfo::Mito { + file_removed_cnt, .. + } = &mut stat.region_manifest + { + *file_removed_cnt = 5; + } + + region_stats.push(stat); + } + + let table_stats = HashMap::from([(table_id, region_stats)]); + + let ctx = Arc::new(MockSchedulerCtx { + table_to_region_stats: Arc::new(Mutex::new(Some(table_stats))), + ..Default::default() + }); + + let config = GcSchedulerOptions { + regions_per_table_threshold: 3, // Limit to 3 regions per table + min_region_size_threshold: 100 * 1024 * 1024, // 100MB (default) + ..Default::default() + }; + + let scheduler = GcScheduler { + ctx: ctx.clone(), + receiver: GcScheduler::channel().1, + config, + region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())), + last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())), + }; + + let stats = ctx + .table_to_region_stats + .lock() + .unwrap() + .clone() + .unwrap_or_default(); + + let candidates = scheduler.select_gc_candidates(&stats).await.unwrap(); + + assert_eq!( + candidates.len(), + 1, + "Expected 1 table with candidates, got {}", + candidates.len() + ); + if let Some(table_candidates) = candidates.get(&table_id) { + // Should be limited to 3 regions + assert_eq!( + table_candidates.len(), + 3, + "Expected 3 candidates for table {}, got {}", + table_id, + table_candidates.len() + ); + } else { + panic!("Expected table {} to have candidates", table_id); + } +} diff --git a/src/meta-srv/src/gc/mock/err_handle.rs b/src/meta-srv/src/gc/mock/err_handle.rs new file mode 100644 index 0000000000..952671006d --- /dev/null +++ b/src/meta-srv/src/gc/mock/err_handle.rs @@ -0,0 +1,293 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, HashSet}; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; + +use common_meta::datanode::RegionManifestInfo; +use common_meta::peer::Peer; +use common_telemetry::init_default_ut_logging; +use store_api::region_engine::RegionRole; +use store_api::storage::{FileId, FileRefsManifest, GcReport, RegionId}; + +use crate::gc::mock::{ + MockSchedulerCtx, TEST_REGION_SIZE_200MB, mock_region_stat, new_empty_report_with, +}; +use crate::gc::{GcScheduler, GcSchedulerOptions}; + +/// Error Handling Tests +#[tokio::test] +async fn test_gc_regions_failure_handling() { + init_default_ut_logging(); + + let table_id = 1; + let region_id = RegionId::new(table_id, 1); + let peer = Peer::new(1, ""); + + // Create region stat with proper size and file_removed_cnt to ensure it gets selected as candidate + let mut region_stat = + mock_region_stat(region_id, RegionRole::Leader, TEST_REGION_SIZE_200MB, 10); // 200MB + if let RegionManifestInfo::Mito { + file_removed_cnt, .. + } = &mut region_stat.region_manifest + { + *file_removed_cnt = 5; + } + + let table_stats = HashMap::from([(table_id, vec![region_stat])]); + + // Create a context that will return an error for gc_regions + let mut gc_reports = HashMap::new(); + gc_reports.insert(region_id, GcReport::default()); + + // Inject an error for gc_regions method + let gc_error = crate::error::UnexpectedSnafu { + violated: "Simulated GC failure for testing".to_string(), + } + .build(); + + let file_refs = FileRefsManifest { + manifest_version: HashMap::from([(region_id, 1)]), + file_refs: HashMap::from([(region_id, HashSet::from([FileId::random()]))]), + }; + + let ctx = Arc::new( + MockSchedulerCtx { + table_to_region_stats: Arc::new(Mutex::new(Some(table_stats))), + gc_reports: Arc::new(Mutex::new(gc_reports)), + file_refs: Arc::new(Mutex::new(Some(file_refs))), + ..Default::default() + } + .with_table_routes(HashMap::from([( + table_id, + (table_id, vec![(region_id, peer)]), + )])) + .with_gc_regions_error(gc_error), + ); + + let scheduler = GcScheduler { + ctx: ctx.clone(), + receiver: GcScheduler::channel().1, + config: GcSchedulerOptions::default(), + region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())), + last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())), + }; + + // This should handle the failure gracefully + let report = scheduler.handle_tick().await.unwrap(); + + // Validate the report shows the failure handling + assert_eq!( + report.per_datanode_reports.len(), + 1, + "Should process 1 datanode despite failure" + ); + assert_eq!( + report.failed_datanodes.len(), + 0, + "Should have 0 failed datanodes (failure handled via need_retry_regions)" + ); + + // Check that the region is in need_retry_regions due to the failure + let datanode_report = report.per_datanode_reports.values().next().unwrap(); + assert_eq!( + datanode_report.need_retry_regions.len(), + 1, + "Should have 1 region in need_retry_regions due to failure" + ); + assert!( + datanode_report.need_retry_regions.contains(®ion_id), + "Region should be in need_retry_regions" + ); + + // Verify that calls were made despite potential failures + assert_eq!( + *ctx.get_table_to_region_stats_calls.lock().unwrap(), + 1, + "Expected 1 call to get_table_to_region_stats" + ); + assert!( + *ctx.get_file_references_calls.lock().unwrap() >= 1, + "Expected at least 1 call to get_file_references" + ); + assert!( + *ctx.gc_regions_calls.lock().unwrap() >= 1, + "Expected at least 1 call to gc_regions" + ); +} + +#[tokio::test] +async fn test_get_file_references_failure() { + init_default_ut_logging(); + + let table_id = 1; + let region_id = RegionId::new(table_id, 1); + let peer = Peer::new(1, ""); + + // Create region stat with proper size and file_removed_cnt to ensure it gets selected as candidate + let mut region_stat = + mock_region_stat(region_id, RegionRole::Leader, TEST_REGION_SIZE_200MB, 10); // 200MB + if let RegionManifestInfo::Mito { + file_removed_cnt, .. + } = &mut region_stat.region_manifest + { + *file_removed_cnt = 5; + } + + let table_stats = HashMap::from([(table_id, vec![region_stat])]); + + // Create context with empty file refs (simulating failure) + let ctx = Arc::new( + MockSchedulerCtx { + table_to_region_stats: Arc::new(Mutex::new(Some(table_stats))), + file_refs: Arc::new(Mutex::new(Some(FileRefsManifest::default()))), + gc_reports: Arc::new(Mutex::new(HashMap::from([( + region_id, + new_empty_report_with([region_id]), + )]))), + ..Default::default() + } + .with_table_routes(HashMap::from([( + table_id, + (table_id, vec![(region_id, peer)]), + )])), + ); + + let scheduler = GcScheduler { + ctx: ctx.clone(), + receiver: GcScheduler::channel().1, + config: GcSchedulerOptions { + retry_backoff_duration: Duration::from_millis(10), // shorten for test + ..Default::default() + }, + region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())), + last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())), + }; + + let report = scheduler.handle_tick().await.unwrap(); + + // Validate the report shows the expected results + // In the new implementation, even if get_file_references fails, we still create a datanode report + assert_eq!( + report.per_datanode_reports.len(), + 1, + "Should process 1 datanode" + ); + assert_eq!( + report.failed_datanodes.len(), + 0, + "Should have 0 failed datanodes (failure handled gracefully)" + ); + + // The region should be processed but may have empty results due to file refs failure + let datanode_report = report.per_datanode_reports.values().next().unwrap(); + // The current implementation still processes the region even with file refs failure + // and creates an empty entry in deleted_files + assert!( + datanode_report.deleted_files.contains_key(®ion_id), + "Should have region in deleted_files (even if empty)" + ); + assert!( + datanode_report.deleted_files[®ion_id].is_empty(), + "Should have empty deleted files due to file refs failure" + ); + + // Should still attempt to get file references (may be called multiple times due to retry logic) + assert!( + *ctx.get_file_references_calls.lock().unwrap() >= 1, + "Expected at least 1 call to get_file_references, got {}", + *ctx.get_file_references_calls.lock().unwrap() + ); +} + +#[tokio::test] +async fn test_get_table_route_failure() { + init_default_ut_logging(); + + let table_id = 1; + let region_id = RegionId::new(table_id, 1); + + // Create region stat with proper size and file_removed_cnt to ensure it gets selected as candidate + let mut region_stat = + mock_region_stat(region_id, RegionRole::Leader, TEST_REGION_SIZE_200MB, 10); // 200MB + if let RegionManifestInfo::Mito { + file_removed_cnt, .. + } = &mut region_stat.region_manifest + { + *file_removed_cnt = 5; + } + + let table_stats = HashMap::from([(table_id, vec![region_stat])]); + + // Inject an error for get_table_route method to simulate failure + let route_error = crate::error::UnexpectedSnafu { + violated: "Simulated table route failure for testing".to_string(), + } + .build(); + + // Create context with table route error injection + let ctx = Arc::new(MockSchedulerCtx { + table_to_region_stats: Arc::new(Mutex::new(Some(table_stats))), + ..Default::default() + }); + ctx.set_table_route_error(route_error); + + let scheduler = GcScheduler { + ctx: ctx.clone(), + receiver: GcScheduler::channel().1, + config: GcSchedulerOptions::default(), + region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())), + last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())), + }; + + // Get candidates first + let stats = &ctx + .table_to_region_stats + .lock() + .unwrap() + .clone() + .unwrap_or_default(); + let candidates = scheduler.select_gc_candidates(stats).await.unwrap(); + + // Convert table-based candidates to datanode-based candidates + let datanode_to_candidates = HashMap::from([( + Peer::new(1, ""), + candidates + .into_iter() + .flat_map(|(table_id, candidates)| candidates.into_iter().map(move |c| (table_id, c))) + .collect(), + )]); + + // This should handle table route failure gracefully + let report = scheduler + .parallel_process_datanodes(datanode_to_candidates) + .await; + + // Should process the datanode but handle route error gracefully + assert_eq!( + report.per_datanode_reports.len(), + 0, + "Expected 0 datanode report" + ); + assert_eq!( + report.failed_datanodes.len(), + 1, + "Expected 1 failed datanodes (route error handled gracefully)" + ); + assert!( + report.failed_datanodes.contains_key(&1), + "Failed datanodes should contain the datanode with route error" + ); +} diff --git a/src/meta-srv/src/gc/mock/full_list.rs b/src/meta-srv/src/gc/mock/full_list.rs new file mode 100644 index 0000000000..649334938a --- /dev/null +++ b/src/meta-srv/src/gc/mock/full_list.rs @@ -0,0 +1,272 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; + +use common_meta::peer::Peer; +use common_telemetry::init_default_ut_logging; +use store_api::region_engine::RegionRole; +use store_api::storage::{FileId, FileRefsManifest, GcReport, RegionId}; + +use crate::gc::mock::{MockSchedulerCtx, TEST_REGION_SIZE_200MB, mock_candidate, mock_region_stat}; +use crate::gc::{GcScheduler, GcSchedulerOptions}; + +// Full File Listing Tests + +#[tokio::test] +async fn test_full_file_listing_first_time_gc() { + init_default_ut_logging(); + + let table_id = 1; + let region_id = RegionId::new(table_id, 1); + let peer = Peer::new(1, ""); + + let region_stat = mock_region_stat(region_id, RegionRole::Leader, TEST_REGION_SIZE_200MB, 10); // 200MB + let table_stats = HashMap::from([(table_id, vec![region_stat])]); + + let gc_report = GcReport { + deleted_files: HashMap::from([(region_id, vec![FileId::random(), FileId::random()])]), + ..Default::default() + }; + + let file_refs = FileRefsManifest { + manifest_version: HashMap::from([(region_id, 1)]), + ..Default::default() + }; + + let ctx = Arc::new( + MockSchedulerCtx { + table_to_region_stats: Arc::new(Mutex::new(Some(table_stats))), + gc_reports: Arc::new(Mutex::new(HashMap::from([(region_id, gc_report)]))), + file_refs: Arc::new(Mutex::new(Some(file_refs))), + ..Default::default() + } + .with_table_routes(HashMap::from([( + table_id, + (table_id, vec![(region_id, peer.clone())]), + )])), + ); + + // Configure short full file listing interval for testing + let config = GcSchedulerOptions { + full_file_listing_interval: Duration::from_secs(3600), // 1 hour + ..Default::default() + }; + + let scheduler = GcScheduler { + ctx: ctx.clone(), + receiver: GcScheduler::channel().1, + config, + region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())), + last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())), + }; + + // First GC - should use full listing since region has never been GC'd + let reports = scheduler + .process_datanode_gc(peer.clone(), vec![(table_id, mock_candidate(region_id))]) + .await + .unwrap(); + + assert_eq!(reports.deleted_files.len(), 1); + + // Verify that full listing was used by checking the tracker + let tracker = scheduler.region_gc_tracker.lock().await; + let gc_info = tracker + .get(®ion_id) + .expect("Region should be in tracker"); + assert!( + gc_info.last_full_listing_time.is_some(), + "First GC should use full listing" + ); +} + +#[tokio::test] +async fn test_full_file_listing_interval_enforcement() { + init_default_ut_logging(); + + let table_id = 1; + let region_id = RegionId::new(table_id, 1); + let peer = Peer::new(1, ""); + + let region_stat = mock_region_stat(region_id, RegionRole::Leader, TEST_REGION_SIZE_200MB, 10); // 200MB + let table_stats = HashMap::from([(table_id, vec![region_stat])]); + + let gc_report = GcReport { + deleted_files: HashMap::from([(region_id, vec![FileId::random(), FileId::random()])]), + ..Default::default() + }; + + let file_refs = FileRefsManifest { + manifest_version: HashMap::from([(region_id, 1)]), + ..Default::default() + }; + + let ctx = Arc::new( + MockSchedulerCtx { + table_to_region_stats: Arc::new(Mutex::new(Some(table_stats))), + gc_reports: Arc::new(Mutex::new(HashMap::from([(region_id, gc_report)]))), + file_refs: Arc::new(Mutex::new(Some(file_refs))), + ..Default::default() + } + .with_table_routes(HashMap::from([( + table_id, + (table_id, vec![(region_id, peer.clone())]), + )])), + ); + + // Configure very short full file listing interval for testing + let config = GcSchedulerOptions { + full_file_listing_interval: Duration::from_millis(100), // 100ms + ..Default::default() + }; + + let scheduler = GcScheduler { + ctx: ctx.clone(), + receiver: GcScheduler::channel().1, + config, + region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())), + last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())), + }; + + // First GC - should use full listing + let reports1 = scheduler + .process_datanode_gc(peer.clone(), vec![(table_id, mock_candidate(region_id))]) + .await + .unwrap(); + assert_eq!(reports1.deleted_files.len(), 1); + + // Get the first full listing time + let first_full_listing_time = { + let tracker = scheduler.region_gc_tracker.lock().await; + let gc_info = tracker + .get(®ion_id) + .expect("Region should be in tracker"); + gc_info + .last_full_listing_time + .expect("Should have full listing time") + }; + + // Wait for interval to pass + tokio::time::sleep(Duration::from_millis(150)).await; + + // Second GC - should use full listing again since interval has passed + let _reports2 = scheduler + .process_datanode_gc(peer.clone(), vec![(table_id, mock_candidate(region_id))]) + .await + .unwrap(); + + // Verify that full listing was used again + let tracker = scheduler.region_gc_tracker.lock().await; + let gc_info = tracker + .get(®ion_id) + .expect("Region should be in tracker"); + let second_full_listing_time = gc_info + .last_full_listing_time + .expect("Should have full listing time"); + + assert!( + second_full_listing_time > first_full_listing_time, + "Second GC should update full listing time" + ); +} + +#[tokio::test] +async fn test_full_file_listing_no_interval_passed() { + init_default_ut_logging(); + + let table_id = 1; + let region_id = RegionId::new(table_id, 1); + let peer = Peer::new(1, ""); + + let region_stat = mock_region_stat(region_id, RegionRole::Leader, TEST_REGION_SIZE_200MB, 10); // 200MB + let table_stats = HashMap::from([(table_id, vec![region_stat])]); + + let gc_report = GcReport { + deleted_files: HashMap::from([(region_id, vec![FileId::random(), FileId::random()])]), + ..Default::default() + }; + + let file_refs = FileRefsManifest { + manifest_version: HashMap::from([(region_id, 1)]), + ..Default::default() + }; + + let ctx = Arc::new( + MockSchedulerCtx { + table_to_region_stats: Arc::new(Mutex::new(Some(table_stats))), + gc_reports: Arc::new(Mutex::new(HashMap::from([(region_id, gc_report)]))), + file_refs: Arc::new(Mutex::new(Some(file_refs))), + ..Default::default() + } + .with_table_routes(HashMap::from([( + table_id, + (table_id, vec![(region_id, peer.clone())]), + )])), + ); + + // Configure long full file listing interval + let config = GcSchedulerOptions { + full_file_listing_interval: Duration::from_secs(3600), // 1 hour + ..Default::default() + }; + + let scheduler = GcScheduler { + ctx: ctx.clone(), + receiver: GcScheduler::channel().1, + config, + region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())), + last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())), + }; + + // First GC - should use full listing + let reports1 = scheduler + .process_datanode_gc(peer.clone(), vec![(table_id, mock_candidate(region_id))]) + .await + .unwrap(); + assert_eq!(reports1.deleted_files.len(), 1); + + // Get the first full listing time + let first_full_listing_time = { + let tracker = scheduler.region_gc_tracker.lock().await; + let gc_info = tracker + .get(®ion_id) + .expect("Region should be in tracker"); + gc_info + .last_full_listing_time + .expect("Should have full listing time") + }; + + // Second GC immediately - should NOT use full listing since interval hasn't passed + let reports2 = scheduler + .process_datanode_gc(peer.clone(), vec![(table_id, mock_candidate(region_id))]) + .await + .unwrap(); + assert_eq!(reports2.deleted_files.len(), 1); + + // Verify that full listing time was NOT updated + let tracker = scheduler.region_gc_tracker.lock().await; + let gc_info = tracker + .get(®ion_id) + .expect("Region should be in tracker"); + let second_full_listing_time = gc_info + .last_full_listing_time + .expect("Should have full listing time"); + + assert_eq!( + second_full_listing_time, first_full_listing_time, + "Second GC should not update full listing time when interval hasn't passed" + ); +} diff --git a/src/meta-srv/src/gc/mock/integration.rs b/src/meta-srv/src/gc/mock/integration.rs new file mode 100644 index 0000000000..a64fca1e19 --- /dev/null +++ b/src/meta-srv/src/gc/mock/integration.rs @@ -0,0 +1,252 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; + +use common_meta::datanode::RegionManifestInfo; +use common_meta::peer::Peer; +use common_telemetry::init_default_ut_logging; +use store_api::region_engine::RegionRole; +use store_api::storage::{FileId, FileRefsManifest, GcReport, RegionId}; + +use crate::gc::mock::{ + MockSchedulerCtx, TEST_REGION_SIZE_200MB, mock_region_stat, new_empty_report_with, +}; +use crate::gc::{GcScheduler, GcSchedulerOptions}; + +// Integration Flow Tests + +#[tokio::test] +async fn test_full_gc_workflow() { + init_default_ut_logging(); + + let table_id = 1; + let region_id = RegionId::new(table_id, 1); + let peer = Peer::new(1, ""); + + let mut region_stat = + mock_region_stat(region_id, RegionRole::Leader, TEST_REGION_SIZE_200MB, 10); // 200MB + + if let RegionManifestInfo::Mito { + file_removed_cnt, .. + } = &mut region_stat.region_manifest + { + *file_removed_cnt = 5; + } + + let table_stats = HashMap::from([(table_id, vec![region_stat])]); + + let mut gc_reports = HashMap::new(); + gc_reports.insert( + region_id, + GcReport { + deleted_files: HashMap::from([(region_id, vec![FileId::random(), FileId::random()])]), + ..Default::default() + }, + ); + + let file_refs = FileRefsManifest { + manifest_version: HashMap::from([(region_id, 1)]), + ..Default::default() + }; + + let ctx = Arc::new( + MockSchedulerCtx { + table_to_region_stats: Arc::new(Mutex::new(Some(table_stats))), + gc_reports: Arc::new(Mutex::new(gc_reports)), + file_refs: Arc::new(Mutex::new(Some(file_refs))), + ..Default::default() + } + .with_table_routes(HashMap::from([( + table_id, + (table_id, vec![(region_id, peer)]), + )])), + ); + + let scheduler = GcScheduler { + ctx: ctx.clone(), + receiver: GcScheduler::channel().1, + config: GcSchedulerOptions::default(), + region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())), + last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())), + }; + + // Run the full workflow + let report = scheduler.handle_tick().await.unwrap(); + + // Validate the returned GcJobReport - should have 1 datanode report + assert_eq!( + report.per_datanode_reports.len(), + 1, + "Should process 1 datanode" + ); + assert_eq!( + report.failed_datanodes.len(), + 0, + "Should have no failed datanodes" + ); + + // Get the datanode report + let datanode_report = report.per_datanode_reports.values().next().unwrap(); + + // Check that the region was processed successfully + assert!( + datanode_report.deleted_files.contains_key(®ion_id), + "Should have deleted files for region" + ); + assert_eq!( + datanode_report.deleted_files[®ion_id].len(), + 2, + "Should have 2 deleted files" + ); + assert!( + datanode_report.need_retry_regions.is_empty(), + "Should have no retry regions" + ); + + // Verify all steps were executed + assert_eq!( + *ctx.get_table_to_region_stats_calls.lock().unwrap(), + 1, + "Expected 1 call to get_table_to_region_stats" + ); + assert_eq!( + *ctx.get_file_references_calls.lock().unwrap(), + 1, + "Expected 1 call to get_file_references" + ); + assert_eq!( + *ctx.gc_regions_calls.lock().unwrap(), + 1, + "Expected 1 call to gc_regions" + ); +} + +#[tokio::test] +async fn test_tracker_cleanup() { + init_default_ut_logging(); + + let table_id = 1; + let region_id = RegionId::new(table_id, 1); + let peer = Peer::new(1, ""); + + // Create region stat with proper file_removed_cnt to ensure it gets selected as candidate + let mut region_stat = + mock_region_stat(region_id, RegionRole::Leader, TEST_REGION_SIZE_200MB, 10); // 200MB + if let RegionManifestInfo::Mito { + file_removed_cnt, .. + } = &mut region_stat.region_manifest + { + *file_removed_cnt = 5; + } + + let table_stats = HashMap::from([(table_id, vec![region_stat])]); + + let mut gc_reports = HashMap::new(); + gc_reports.insert(region_id, new_empty_report_with([region_id])); + + let file_refs = FileRefsManifest { + manifest_version: HashMap::from([(region_id, 1)]), + ..Default::default() + }; + + let ctx = Arc::new( + MockSchedulerCtx { + table_to_region_stats: Arc::new(Mutex::new(Some(table_stats))), + gc_reports: Arc::new(Mutex::new(gc_reports)), + file_refs: Arc::new(Mutex::new(Some(file_refs))), + ..Default::default() + } + .with_table_routes(HashMap::from([( + table_id, + (table_id, vec![(region_id, peer)]), + )])), + ); + + let old_region_gc_tracker = { + let mut tracker = HashMap::new(); + tracker.insert( + region_id, + crate::gc::tracker::RegionGcInfo { + last_full_listing_time: Some(Instant::now() - Duration::from_secs(7200)), // 2 hours ago + last_gc_time: Instant::now() - Duration::from_secs(7200), // 2 hours ago + }, + ); + // also insert a different table that should also be cleaned up + tracker.insert( + RegionId::new(2, 1), + crate::gc::tracker::RegionGcInfo { + last_full_listing_time: Some(Instant::now() - Duration::from_secs(7200)), // 2 hours ago + last_gc_time: Instant::now() - Duration::from_secs(7200), // 2 hours ago + }, + ); + tracker + }; + + // Use a custom config with shorter cleanup interval to trigger cleanup + let config = GcSchedulerOptions { + // 30 minutes + tracker_cleanup_interval: Duration::from_secs(1800), + ..Default::default() + }; + + let scheduler = GcScheduler { + ctx: ctx.clone(), + receiver: GcScheduler::channel().1, + config, + region_gc_tracker: Arc::new(tokio::sync::Mutex::new(old_region_gc_tracker)), + last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new( + Instant::now() - Duration::from_secs(3600), // Old cleanup time (1 hour ago) + )), + }; + + let report = scheduler.handle_tick().await.unwrap(); + + // Validate the returned GcJobReport - should have 1 datanode report + assert_eq!( + report.per_datanode_reports.len(), + 1, + "Should process 1 datanode" + ); + assert_eq!( + report.failed_datanodes.len(), + 0, + "Should have no failed datanodes" + ); + + // Get the datanode report + let datanode_report = report.per_datanode_reports.values().next().unwrap(); + + // Check that the region was processed successfully + assert!( + datanode_report.deleted_files.contains_key(®ion_id), + "Should have deleted files for region" + ); + assert!( + datanode_report.need_retry_regions.is_empty(), + "Should have no retry regions" + ); + + // Verify tracker was updated + let tracker = scheduler.region_gc_tracker.lock().await; + assert!( + tracker.contains_key(®ion_id), + "Tracker should contain region {}", + region_id + ); + // only one entry + assert_eq!(tracker.len(), 1, "Tracker should only have 1 entry"); +} diff --git a/src/meta-srv/src/gc/mock/misc.rs b/src/meta-srv/src/gc/mock/misc.rs new file mode 100644 index 0000000000..eb5a9de2c2 --- /dev/null +++ b/src/meta-srv/src/gc/mock/misc.rs @@ -0,0 +1,155 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; +use std::time::Instant; + +use common_meta::peer::Peer; +use common_telemetry::init_default_ut_logging; +use store_api::storage::{FileRefsManifest, GcReport, RegionId}; + +use crate::gc::mock::{MockSchedulerCtx, new_candidate}; +use crate::gc::{GcScheduler, GcSchedulerOptions}; + +/// Edge Case Tests + +#[tokio::test] +async fn test_empty_file_refs_manifest() { + init_default_ut_logging(); + + let table_id = 1; + let region_id = RegionId::new(table_id, 1); + let peer = Peer::new(1, ""); + let candidates = HashMap::from([(table_id, vec![new_candidate(region_id, 1.0)])]); + + // Empty file refs manifest + let file_refs = FileRefsManifest::default(); + + let ctx = Arc::new( + MockSchedulerCtx { + file_refs: Arc::new(Mutex::new(Some(file_refs))), + candidates: Arc::new(Mutex::new(Some(candidates))), + ..Default::default() + } + .with_table_routes(HashMap::from([( + table_id, + (table_id, vec![(region_id, peer)]), + )])), + ); + + let scheduler = GcScheduler { + ctx: ctx.clone(), + receiver: GcScheduler::channel().1, + config: GcSchedulerOptions::default(), + region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())), + last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())), + }; + + let candidates = ctx.candidates.lock().unwrap().clone().unwrap_or_default(); + + // Convert table-based candidates to datanode-based candidates + let peer = Peer::new(1, ""); + let datanode_to_candidates = HashMap::from([( + peer, + candidates + .into_iter() + .flat_map(|(table_id, candidates)| candidates.into_iter().map(move |c| (table_id, c))) + .collect(), + )]); + + let report = scheduler + .parallel_process_datanodes(datanode_to_candidates) + .await; + + assert_eq!(report.per_datanode_reports.len(), 1); + assert_eq!(report.failed_datanodes.len(), 0); + // Should handle empty file refs gracefully +} + +#[tokio::test] +async fn test_multiple_regions_per_table() { + init_default_ut_logging(); + + let table_id = 1; + let region1 = RegionId::new(table_id, 1); + let region2 = RegionId::new(table_id, 2); + let region3 = RegionId::new(table_id, 3); + let peer = Peer::new(1, ""); + + let candidates = HashMap::from([( + table_id, + vec![ + new_candidate(region1, 1.0), + new_candidate(region2, 2.0), + new_candidate(region3, 3.0), + ], + )]); + + let mut gc_reports = HashMap::new(); + gc_reports.insert(region1, GcReport::default()); + gc_reports.insert(region2, GcReport::default()); + gc_reports.insert(region3, GcReport::default()); + + let file_refs = FileRefsManifest { + manifest_version: HashMap::from([(region1, 1), (region2, 1), (region3, 1)]), + ..Default::default() + }; + + let ctx = Arc::new( + MockSchedulerCtx { + gc_reports: Arc::new(Mutex::new(gc_reports)), + file_refs: Arc::new(Mutex::new(Some(file_refs))), + candidates: Arc::new(Mutex::new(Some(candidates))), + ..Default::default() + } + .with_table_routes(HashMap::from([( + table_id, + ( + table_id, + vec![ + (region1, peer.clone()), + (region2, peer.clone()), + (region3, peer.clone()), + ], + ), + )])), + ); + + let scheduler = GcScheduler { + ctx: ctx.clone(), + receiver: GcScheduler::channel().1, + config: GcSchedulerOptions::default(), + region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())), + last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())), + }; + + let candidates = ctx.candidates.lock().unwrap().clone().unwrap_or_default(); + + // Convert table-based candidates to datanode-based candidates + let datanode_to_candidates = HashMap::from([( + peer.clone(), + candidates + .into_iter() + .flat_map(|(table_id, candidates)| candidates.into_iter().map(move |c| (table_id, c))) + .collect(), + )]); + + let report = scheduler + .parallel_process_datanodes(datanode_to_candidates) + .await; + + assert_eq!(report.per_datanode_reports.len(), 1); + assert_eq!(report.failed_datanodes.len(), 0); +}