test: gc worker scheduler mock test (#7292)

* feat: gc worker only on local region

Signed-off-by: discord9 <discord9@163.com>

feat: gc scheduler

wip: gc trigger

Signed-off-by: discord9 <discord9@163.com>

feat: dn file removal rate

Signed-off-by: discord9 <discord9@163.com>

feat: trigger gc with stats(WIP)

Signed-off-by: discord9 <discord9@163.com>

chore

Signed-off-by: discord9 <discord9@163.com>

also move files ref manifest to store-api

Signed-off-by: discord9 <discord9@163.com>

feat: basic gc trigger impl

Signed-off-by: discord9 <discord9@163.com>

wip: handle file ref change

Signed-off-by: discord9 <discord9@163.com>

refactor: use region ids

Signed-off-by: discord9 <discord9@163.com>

fix: retry using related regions

Signed-off-by: discord9 <discord9@163.com>

chore: rm unused

Signed-off-by: discord9 <discord9@163.com>

fix: update file reference type in GC worker

Signed-off-by: discord9 <discord9@163.com>

feat: dn gc limiter

Signed-off-by: discord9 <discord9@163.com>

rename

Signed-off-by: discord9 <discord9@163.com>

feat: gc scheduler retry with outdated regions

Signed-off-by: discord9 <discord9@163.com>

feat: use real object store purger

Signed-off-by: discord9 <discord9@163.com>

wip: add to metasrv

Signed-off-by: discord9 <discord9@163.com>

feat: add to metasrv

Signed-off-by: discord9 <discord9@163.com>

feat: datanode gc worker handler

Signed-off-by: discord9 <discord9@163.com>

fix: no partition col fix

Signed-off-by: discord9 <discord9@163.com>

fix: RegionId json deser workaround

Signed-off-by: discord9 <discord9@163.com>

fix: find access layer

Signed-off-by: discord9 <discord9@163.com>

fix: on host dn

Signed-off-by: discord9 <discord9@163.com>

fix: stat dedup

Signed-off-by: discord9 <discord9@163.com>

refactor: rm load-based

Signed-off-by: discord9 <discord9@163.com>

chore: aft rebase fix

Signed-off-by: discord9 <discord9@163.com>

feat: not full scan

Signed-off-by: discord9 <discord9@163.com>

chore: after rebase fix

Signed-off-by: discord9 <discord9@163.com>

feat: clean tracker

Signed-off-by: discord9 <discord9@163.com>

after rebase fix

Signed-off-by: discord9 <discord9@163.com>

clippy

Signed-off-by: discord9 <discord9@163.com>

refactor: split gc scheduler

Signed-off-by: discord9 <discord9@163.com>

feat: smaller linger time

Signed-off-by: discord9 <discord9@163.com>

feat: parallel region gc instr

Signed-off-by: discord9 <discord9@163.com>

chore: rename

Signed-off-by: discord9 <discord9@163.com>

chore: rename

Signed-off-by: discord9 <discord9@163.com>

enable is false

Signed-off-by: discord9 <discord9@163.com>

feat: update removed files precisely

Signed-off-by: discord9 <discord9@163.com>

all default to false&use local file purger

Signed-off-by: discord9 <discord9@163.com>

feat: not evict if gc enabled

Signed-off-by: discord9 <discord9@163.com>

per review

Signed-off-by: discord9 <discord9@163.com>

fix: pass gc config in mito&test: after truncate gc

Signed-off-by: discord9 <discord9@163.com>

WIP: one more test

Signed-off-by: discord9 <discord9@163.com>

test: basic compact

Signed-off-by: discord9 <discord9@163.com>

test: compact with ref

Signed-off-by: discord9 <discord9@163.com>

refactor: for easier mock

Signed-off-by: discord9 <discord9@163.com>

docs: explain race condition

Signed-off-by: discord9 <discord9@163.com>

feat: gc region procedure

Signed-off-by: discord9 <discord9@163.com>

refactor: ctx send gc/ref instr with procedure

Signed-off-by: discord9 <discord9@163.com>

fix: config deser to default

Signed-off-by: discord9 <discord9@163.com>

refactor: gc report

Signed-off-by: discord9 <discord9@163.com>

wip: async index file rm

Signed-off-by: discord9 <discord9@163.com>

fixme?

Signed-off-by: discord9 <discord9@163.com>

typo

Signed-off-by: discord9 <discord9@163.com>

more ut

Signed-off-by: discord9 <discord9@163.com>

test: more mock test

Signed-off-by: discord9 <discord9@163.com>

more

Signed-off-by: discord9 <discord9@163.com>

refactor: split mock test

Signed-off-by: discord9 <discord9@163.com>

clippy

Signed-off-by: discord9 <discord9@163.com>

refactor: rm stuff

Signed-off-by: discord9 <discord9@163.com>

test: mock add gc report per region

Signed-off-by: discord9 <discord9@163.com>

fix: stricter table failure condition

Signed-off-by: discord9 <discord9@163.com>

sutff

Signed-off-by: discord9 <discord9@163.com>

feat: can do different table gc same time&more todos

Signed-off-by: discord9 <discord9@163.com>

after rebase check

Signed-off-by: discord9 <discord9@163.com>

* chore

Signed-off-by: discord9 <discord9@163.com>

* chore

Signed-off-by: discord9 <discord9@163.com>

* wip: refactoring test

Signed-off-by: discord9 <discord9@163.com>

* fix: also get from follower peer

Signed-off-by: discord9 <discord9@163.com>

* test: update mock test

Signed-off-by: discord9 <discord9@163.com>

* revert some change&clean up

Signed-off-by: discord9 <discord9@163.com>

* typo

Signed-off-by: discord9 <discord9@163.com>

* chore: after rebase fix

Signed-off-by: discord9 <discord9@163.com>

* choer: more fix

Signed-off-by: discord9 <discord9@163.com>

* revert

Signed-off-by: discord9 <discord9@163.com>

* revert change to handler.rs

Signed-off-by: discord9 <discord9@163.com>

* test: fix mock test

Signed-off-by: discord9 <discord9@163.com>

* chore: rm retry

Signed-off-by: discord9 <discord9@163.com>

* Apply suggestions from code review

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Signed-off-by: discord9 <discord9@163.com>

* after rebase fix

Signed-off-by: discord9 <discord9@163.com>

* pcr

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
discord9
2025-12-04 20:00:25 +08:00
committed by GitHub
parent f2288a86b0
commit f02bdf5428
10 changed files with 2699 additions and 0 deletions

View File

@@ -23,6 +23,8 @@ use store_api::storage::RegionId;
mod candidate;
mod ctx;
mod handler;
#[cfg(test)]
mod mock;
mod options;
mod procedure;
mod scheduler;

458
src/meta-srv/src/gc/mock.rs Normal file
View File

@@ -0,0 +1,458 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod basic;
mod candidate_select;
mod con;
mod config;
mod err_handle;
mod full_list;
mod integration;
mod misc;
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use common_meta::datanode::{RegionManifestInfo, RegionStat};
use common_meta::key::table_route::PhysicalTableRouteValue;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_telemetry::debug;
use ordered_float::OrderedFloat;
use store_api::region_engine::RegionRole;
use store_api::storage::{FileRefsManifest, GcReport, RegionId};
use table::metadata::TableId;
use tokio::sync::mpsc::Sender;
use crate::error::{Result, UnexpectedSnafu};
use crate::gc::candidate::GcCandidate;
use crate::gc::ctx::SchedulerCtx;
use crate::gc::handler::Region2Peers;
use crate::gc::options::GcSchedulerOptions;
use crate::gc::scheduler::{Event, GcScheduler};
pub const TEST_REGION_SIZE_200MB: u64 = 200_000_000;
/// Helper function to create an empty GcReport for the given region IDs
pub fn new_empty_report_with(region_ids: impl IntoIterator<Item = RegionId>) -> GcReport {
let mut deleted_files = HashMap::new();
for region_id in region_ids {
deleted_files.insert(region_id, vec![]);
}
GcReport {
deleted_files,
need_retry_regions: HashSet::new(),
}
}
#[allow(clippy::type_complexity)]
#[derive(Debug, Default)]
pub struct MockSchedulerCtx {
pub table_to_region_stats: Arc<Mutex<Option<HashMap<TableId, Vec<RegionStat>>>>>,
pub table_routes: Arc<Mutex<HashMap<TableId, (TableId, PhysicalTableRouteValue)>>>,
pub file_refs: Arc<Mutex<Option<FileRefsManifest>>>,
pub gc_reports: Arc<Mutex<HashMap<RegionId, GcReport>>>,
pub candidates: Arc<Mutex<Option<HashMap<TableId, Vec<GcCandidate>>>>>,
pub get_table_to_region_stats_calls: Arc<Mutex<usize>>,
pub get_file_references_calls: Arc<Mutex<usize>>,
pub gc_regions_calls: Arc<Mutex<usize>>,
// Error injection fields for testing
pub get_table_to_region_stats_error: Arc<Mutex<Option<crate::error::Error>>>,
pub get_table_route_error: Arc<Mutex<Option<crate::error::Error>>>,
pub get_file_references_error: Arc<Mutex<Option<crate::error::Error>>>,
pub gc_regions_error: Arc<Mutex<Option<crate::error::Error>>>,
// Retry testing fields
pub gc_regions_retry_count: Arc<Mutex<HashMap<RegionId, usize>>>,
pub gc_regions_error_sequence: Arc<Mutex<Vec<crate::error::Error>>>,
pub gc_regions_success_after_retries: Arc<Mutex<HashMap<RegionId, usize>>>,
// Per-region error injection
pub gc_regions_per_region_errors: Arc<Mutex<HashMap<RegionId, crate::error::Error>>>,
}
impl MockSchedulerCtx {
pub fn with_table_routes(
self,
table_routes: HashMap<TableId, (TableId, Vec<(RegionId, Peer)>)>,
) -> Self {
*self.table_routes.lock().unwrap() = table_routes
.into_iter()
.map(|(k, (phy_id, region2peer))| {
let phy = PhysicalTableRouteValue::new(
region2peer
.into_iter()
.map(|(region_id, peer)| RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(peer),
..Default::default()
})
.collect(),
);
(k, (phy_id, phy))
})
.collect();
self
}
/// Set an error to be returned by `get_table_to_region_stats`
#[allow(dead_code)]
pub fn with_get_table_to_region_stats_error(self, error: crate::error::Error) -> Self {
*self.get_table_to_region_stats_error.lock().unwrap() = Some(error);
self
}
/// Set an error to be returned by `get_table_route`
pub fn set_table_route_error(&self, error: crate::error::Error) {
*self.get_table_route_error.lock().unwrap() = Some(error);
}
/// Set an error to be returned by `get_file_references`
#[allow(dead_code)]
pub fn with_get_file_references_error(self, error: crate::error::Error) -> Self {
*self.get_file_references_error.lock().unwrap() = Some(error);
self
}
/// Set an error to be returned by `gc_regions`
pub fn with_gc_regions_error(self, error: crate::error::Error) -> Self {
*self.gc_regions_error.lock().unwrap() = Some(error);
self
}
/// Set a sequence of errors to be returned by `gc_regions` for retry testing
pub fn set_gc_regions_error_sequence(&self, errors: Vec<crate::error::Error>) {
*self.gc_regions_error_sequence.lock().unwrap() = errors;
}
/// Set success after a specific number of retries for a region
pub fn set_gc_regions_success_after_retries(&self, region_id: RegionId, retries: usize) {
self.gc_regions_success_after_retries
.lock()
.unwrap()
.insert(region_id, retries);
}
/// Get the retry count for a specific region
pub fn get_retry_count(&self, region_id: RegionId) -> usize {
self.gc_regions_retry_count
.lock()
.unwrap()
.get(&region_id)
.copied()
.unwrap_or(0)
}
/// Reset all retry tracking
pub fn reset_retry_tracking(&self) {
*self.gc_regions_retry_count.lock().unwrap() = HashMap::new();
*self.gc_regions_error_sequence.lock().unwrap() = Vec::new();
*self.gc_regions_success_after_retries.lock().unwrap() = HashMap::new();
}
/// Set an error to be returned for a specific region
pub fn set_gc_regions_error_for_region(&self, region_id: RegionId, error: crate::error::Error) {
self.gc_regions_per_region_errors
.lock()
.unwrap()
.insert(region_id, error);
}
/// Clear per-region errors
#[allow(unused)]
pub fn clear_gc_regions_per_region_errors(&self) {
self.gc_regions_per_region_errors.lock().unwrap().clear();
}
}
#[async_trait::async_trait]
impl SchedulerCtx for MockSchedulerCtx {
async fn get_table_to_region_stats(&self) -> Result<HashMap<TableId, Vec<RegionStat>>> {
*self.get_table_to_region_stats_calls.lock().unwrap() += 1;
// Check if we should return an injected error
if let Some(error) = self.get_table_to_region_stats_error.lock().unwrap().take() {
return Err(error);
}
Ok(self
.table_to_region_stats
.lock()
.unwrap()
.clone()
.unwrap_or_default())
}
async fn get_table_route(
&self,
table_id: TableId,
) -> Result<(TableId, PhysicalTableRouteValue)> {
// Check if we should return an injected error
if let Some(error) = self.get_table_route_error.lock().unwrap().take() {
return Err(error);
}
Ok(self
.table_routes
.lock()
.unwrap()
.get(&table_id)
.cloned()
.unwrap_or_else(|| (table_id, PhysicalTableRouteValue::default())))
}
async fn get_file_references(
&self,
query_regions: &[RegionId],
_related_regions: HashMap<RegionId, Vec<RegionId>>,
region_to_peer: &Region2Peers,
_timeout: Duration,
) -> Result<FileRefsManifest> {
*self.get_file_references_calls.lock().unwrap() += 1;
// Check if we should return an injected error
if let Some(error) = self.get_file_references_error.lock().unwrap().take() {
return Err(error);
}
if query_regions
.iter()
.any(|region_id| !region_to_peer.contains_key(region_id))
{
UnexpectedSnafu {
violated: format!(
"region_to_peer{region_to_peer:?} does not contain all region_ids requested: {:?}",
query_regions
),
}.fail()?;
}
Ok(self.file_refs.lock().unwrap().clone().unwrap_or_default())
}
async fn gc_regions(
&self,
_peer: Peer,
region_ids: &[RegionId],
_file_refs_manifest: &FileRefsManifest,
_full_file_listing: bool,
_timeout: Duration,
) -> Result<GcReport> {
*self.gc_regions_calls.lock().unwrap() += 1;
// Check per-region error injection first (for any region)
for &region_id in region_ids {
if let Some(error) = self
.gc_regions_per_region_errors
.lock()
.unwrap()
.remove(&region_id)
{
*self
.gc_regions_retry_count
.lock()
.unwrap()
.entry(region_id)
.or_insert(0) += 1;
return Err(error);
}
}
// Check if we should return an injected error
if let Some(error) = self.gc_regions_error.lock().unwrap().take() {
for region_id in region_ids {
*self
.gc_regions_retry_count
.lock()
.unwrap()
.entry(*region_id)
.or_insert(0) += 1;
}
return Err(error);
}
// Handle error sequence for retry testing
{
let mut error_sequence = self.gc_regions_error_sequence.lock().unwrap();
if !error_sequence.is_empty() {
let error = error_sequence.remove(0);
for region_id in region_ids {
*self
.gc_regions_retry_count
.lock()
.unwrap()
.entry(*region_id)
.or_insert(0) += 1;
}
return Err(error);
}
}
// Build the final report by processing each region individually
let mut final_report = GcReport::default();
let gc_reports = self.gc_reports.lock().unwrap();
let success_after_retries = self.gc_regions_success_after_retries.lock().unwrap();
for &region_id in region_ids {
// Get current retry count for this region
let retry_count = self
.gc_regions_retry_count
.lock()
.unwrap()
.get(&region_id)
.copied()
.unwrap_or(0);
// Check if this region should succeed or need retry
if let Some(&required_retries) = success_after_retries.get(&region_id) {
if retry_count < required_retries {
debug!(
"Region {} needs retry (attempt {}/{})",
region_id,
retry_count + 1,
required_retries
);
// This region needs more retries - add to need_retry_regions
final_report.need_retry_regions.insert(region_id);
// Track the retry attempt
let mut retry_count_map = self.gc_regions_retry_count.lock().unwrap();
*retry_count_map.entry(region_id).or_insert(0) += 1;
} else {
debug!(
"Region {} has completed retries - succeeding now",
region_id
);
// This region has completed all required retries - succeed
if let Some(report) = gc_reports.get(&region_id) {
final_report.merge(report.clone());
}
// Track the success attempt
let mut retry_count_map = self.gc_regions_retry_count.lock().unwrap();
*retry_count_map.entry(region_id).or_insert(0) += 1;
}
} else {
// No retry requirement - check if we have a GC report for this region
if let Some(report) = gc_reports.get(&region_id) {
// We have a GC report - succeed immediately
final_report.merge(report.clone());
// Track the success attempt
let mut retry_count_map = self.gc_regions_retry_count.lock().unwrap();
*retry_count_map.entry(region_id).or_insert(0) += 1;
} else {
// No GC report available - this region should be marked for retry
final_report.need_retry_regions.insert(region_id);
// Track the attempt
let mut retry_count_map = self.gc_regions_retry_count.lock().unwrap();
*retry_count_map.entry(region_id).or_insert(0) += 1;
}
}
}
// Return the report with need_retry_regions populated - let the caller handle retry logic
Ok(final_report)
}
}
pub struct TestEnv {
pub scheduler: GcScheduler,
pub ctx: Arc<MockSchedulerCtx>,
#[allow(dead_code)]
tx: Sender<Event>,
}
#[allow(unused)]
impl TestEnv {
pub fn new() -> Self {
let ctx = Arc::new(MockSchedulerCtx::default());
let (tx, rx) = GcScheduler::channel();
let config = GcSchedulerOptions::default();
let scheduler = GcScheduler {
ctx: ctx.clone(),
receiver: rx,
config,
region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())),
};
Self { scheduler, ctx, tx }
}
pub fn with_candidates(self, candidates: HashMap<TableId, Vec<GcCandidate>>) -> Self {
*self.ctx.candidates.lock().unwrap() = Some(candidates);
self
}
#[allow(dead_code)]
pub async fn run_scheduler(mut self) {
self.scheduler.run().await;
}
#[allow(dead_code)]
pub async fn tick(&self) {
self.tx.send(Event::Tick).await.unwrap();
}
}
/// Helper function to create a mock GC candidate that will pass the GC threshold
fn new_candidate(region_id: RegionId, score: f64) -> GcCandidate {
// will pass threshold for gc
let region_stat = mock_region_stat(region_id, RegionRole::Leader, 10_000, 10);
GcCandidate {
region_id,
score: OrderedFloat(score),
region_stat,
}
}
/// Helper function to create a mock GC candidate
fn mock_candidate(region_id: RegionId) -> GcCandidate {
let region_stat = mock_region_stat(region_id, RegionRole::Leader, TEST_REGION_SIZE_200MB, 10);
GcCandidate {
region_id,
score: ordered_float::OrderedFloat(1.0),
region_stat,
}
}
/// Helper function to create a mock RegionStat
fn mock_region_stat(
id: RegionId,
role: RegionRole,
approximate_bytes: u64,
sst_num: u64,
) -> RegionStat {
RegionStat {
id,
role,
approximate_bytes,
sst_num,
region_manifest: RegionManifestInfo::Mito {
manifest_version: 0,
flushed_entry_id: 0,
file_removed_cnt: 0,
},
rcus: 0,
wcus: 0,
engine: "mito".to_string(),
num_rows: 0,
memtable_size: 0,
manifest_size: 0,
sst_size: 0,
index_size: 0,
data_topic_latest_entry_id: 0,
metadata_topic_latest_entry_id: 0,
written_bytes: 0,
}
}

View File

@@ -0,0 +1,164 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Instant;
use common_meta::peer::Peer;
use common_telemetry::init_default_ut_logging;
use store_api::region_engine::RegionRole;
use store_api::storage::{FileId, FileRefsManifest, GcReport, RegionId};
use crate::gc::mock::{
MockSchedulerCtx, TEST_REGION_SIZE_200MB, TestEnv, mock_region_stat, new_candidate,
};
use crate::gc::{GcScheduler, GcSchedulerOptions};
#[tokio::test]
async fn test_parallel_process_datanodes_empty() {
let env = TestEnv::new();
let report = env
.scheduler
.parallel_process_datanodes(HashMap::new())
.await;
assert_eq!(report.per_datanode_reports.len(), 0);
assert_eq!(report.failed_datanodes.len(), 0);
}
#[tokio::test]
async fn test_parallel_process_datanodes_with_candidates() {
init_default_ut_logging();
let table_id = 1;
let region_id = RegionId::new(table_id, 1);
let peer = Peer::new(1, "");
let candidates = HashMap::from([(table_id, vec![new_candidate(region_id, 1.0)])]);
let mut gc_reports = HashMap::new();
let deleted_files = vec![FileId::random()];
gc_reports.insert(
region_id,
GcReport {
deleted_files: HashMap::from([(region_id, deleted_files.clone())]),
..Default::default()
},
);
let file_refs = FileRefsManifest {
manifest_version: HashMap::from([(region_id, 1)]),
..Default::default()
};
let ctx = MockSchedulerCtx {
gc_reports: Arc::new(Mutex::new(gc_reports)),
file_refs: Arc::new(Mutex::new(Some(file_refs))),
..Default::default()
}
.with_table_routes(HashMap::from([(
table_id,
(table_id, vec![(region_id, peer.clone())]),
)]));
let env = TestEnv::new();
// We need to replace the ctx with the one with gc_reports
let mut scheduler = env.scheduler;
scheduler.ctx = Arc::new(ctx);
// Convert table-based candidates to datanode-based candidates
let datanode_to_candidates = HashMap::from([(
peer,
candidates
.into_iter()
.flat_map(|(table_id, candidates)| candidates.into_iter().map(move |c| (table_id, c)))
.collect(),
)]);
let report = scheduler
.parallel_process_datanodes(datanode_to_candidates)
.await;
assert_eq!(report.per_datanode_reports.len(), 1);
assert_eq!(report.failed_datanodes.len(), 0);
}
#[tokio::test]
async fn test_handle_tick() {
init_default_ut_logging();
let table_id = 1;
let region_id = RegionId::new(table_id, 1);
let peer = Peer::new(1, "");
let candidates = HashMap::from([(table_id, vec![new_candidate(region_id, 1.0)])]);
let mut gc_reports = HashMap::new();
gc_reports.insert(region_id, GcReport::default());
let file_refs = FileRefsManifest {
manifest_version: HashMap::from([(region_id, 1)]),
..Default::default()
};
let ctx = Arc::new(
MockSchedulerCtx {
table_to_region_stats: Arc::new(Mutex::new(Some(HashMap::from([(
table_id,
vec![mock_region_stat(
region_id,
RegionRole::Leader,
TEST_REGION_SIZE_200MB,
10,
)],
)])))),
gc_reports: Arc::new(Mutex::new(gc_reports)),
candidates: Arc::new(Mutex::new(Some(candidates))),
file_refs: Arc::new(Mutex::new(Some(file_refs))),
..Default::default()
}
.with_table_routes(HashMap::from([(
table_id,
(table_id, vec![(region_id, peer)]),
)])),
);
let scheduler = GcScheduler {
ctx: ctx.clone(),
receiver: GcScheduler::channel().1,
config: GcSchedulerOptions::default(),
region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())),
};
let report = scheduler.handle_tick().await.unwrap();
// Validate the returned GcJobReport
assert_eq!(
report.per_datanode_reports.len(),
1,
"Should process 1 datanode"
);
assert_eq!(
report.failed_datanodes.len(),
0,
"Should have 0 failed datanodes"
);
assert_eq!(*ctx.get_table_to_region_stats_calls.lock().unwrap(), 1);
assert_eq!(*ctx.get_file_references_calls.lock().unwrap(), 1);
assert_eq!(*ctx.gc_regions_calls.lock().unwrap(), 1);
let tracker = scheduler.region_gc_tracker.lock().await;
assert!(
tracker.contains_key(&region_id),
"Tracker should have one region: {:?}",
tracker
);
}

View File

@@ -0,0 +1,390 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Instant;
use common_meta::datanode::RegionManifestInfo;
use common_telemetry::init_default_ut_logging;
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;
use crate::gc::mock::{MockSchedulerCtx, TEST_REGION_SIZE_200MB, mock_region_stat};
use crate::gc::{GcScheduler, GcSchedulerOptions};
/// Candidate Selection Tests
#[tokio::test]
async fn test_gc_candidate_filtering_by_role() {
init_default_ut_logging();
let table_id = 1;
let leader_region = RegionId::new(table_id, 1);
let follower_region = RegionId::new(table_id, 2);
let mut leader_stat = mock_region_stat(
leader_region,
RegionRole::Leader,
TEST_REGION_SIZE_200MB,
10,
); // 200MB
let mut follower_stat = mock_region_stat(
follower_region,
RegionRole::Follower,
TEST_REGION_SIZE_200MB,
10,
); // 200MB
// Set up manifest info for scoring
if let RegionManifestInfo::Mito {
file_removed_cnt, ..
} = &mut leader_stat.region_manifest
{
*file_removed_cnt = 5;
}
if let RegionManifestInfo::Mito {
file_removed_cnt, ..
} = &mut follower_stat.region_manifest
{
*file_removed_cnt = 5;
}
let table_stats = HashMap::from([(table_id, vec![leader_stat.clone(), follower_stat.clone()])]);
let ctx = Arc::new(MockSchedulerCtx {
table_to_region_stats: Arc::new(Mutex::new(Some(table_stats))),
..Default::default()
});
let scheduler = GcScheduler {
ctx: ctx.clone(),
receiver: GcScheduler::channel().1,
config: GcSchedulerOptions::default(),
region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())),
};
let stats = ctx
.table_to_region_stats
.lock()
.unwrap()
.clone()
.unwrap_or_default();
let candidates = scheduler.select_gc_candidates(&stats).await.unwrap();
// Should only select leader regions
assert_eq!(
candidates.len(),
1,
"Expected 1 table with candidates, got {}",
candidates.len()
);
if let Some(table_candidates) = candidates.get(&table_id) {
assert_eq!(
table_candidates.len(),
1,
"Expected 1 candidate for table {}, got {}",
table_id,
table_candidates.len()
);
assert_eq!(
table_candidates[0].region_id, leader_region,
"Expected leader region {}, got {}",
leader_region, table_candidates[0].region_id
);
} else {
panic!("Expected table {} to have candidates", table_id);
}
}
#[tokio::test]
async fn test_gc_candidate_size_threshold() {
init_default_ut_logging();
let table_id = 1;
let small_region = RegionId::new(table_id, 1);
let large_region = RegionId::new(table_id, 2);
let mut small_stat = mock_region_stat(small_region, RegionRole::Leader, 50_000_000, 5); // 50MB
if let RegionManifestInfo::Mito {
file_removed_cnt, ..
} = &mut small_stat.region_manifest
{
*file_removed_cnt = 3;
}
let mut large_stat =
mock_region_stat(large_region, RegionRole::Leader, TEST_REGION_SIZE_200MB, 20); // 200MB
if let RegionManifestInfo::Mito {
file_removed_cnt, ..
} = &mut large_stat.region_manifest
{
*file_removed_cnt = 5;
}
let table_stats = HashMap::from([(table_id, vec![small_stat, large_stat])]);
let ctx = Arc::new(MockSchedulerCtx {
table_to_region_stats: Arc::new(Mutex::new(Some(table_stats))),
..Default::default()
});
let config = GcSchedulerOptions {
min_region_size_threshold: 100 * 1024 * 1024, // 100MB (default)
..Default::default()
};
let scheduler = GcScheduler {
ctx: ctx.clone(),
receiver: GcScheduler::channel().1,
config,
region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())),
};
let stats = ctx
.table_to_region_stats
.lock()
.unwrap()
.clone()
.unwrap_or_default();
let candidates = scheduler.select_gc_candidates(&stats).await.unwrap();
// Should only select large region
assert_eq!(
candidates.len(),
1,
"Expected 1 table with candidates, got {}",
candidates.len()
);
if let Some(table_candidates) = candidates.get(&table_id) {
assert_eq!(
table_candidates.len(),
1,
"Expected 1 candidate for table {}, got {}",
table_id,
table_candidates.len()
);
assert_eq!(
table_candidates[0].region_id, large_region,
"Expected large region {}, got {}",
large_region, table_candidates[0].region_id
);
} else {
panic!("Expected table {} to have candidates", table_id);
}
}
#[tokio::test]
async fn test_gc_candidate_scoring() {
init_default_ut_logging();
let table_id = 1;
let low_score_region = RegionId::new(table_id, 1);
let high_score_region = RegionId::new(table_id, 2);
let mut low_stat = mock_region_stat(
low_score_region,
RegionRole::Leader,
TEST_REGION_SIZE_200MB,
5,
); // 200MB
// Set low file removal rate for low_score_region
if let RegionManifestInfo::Mito {
file_removed_cnt, ..
} = &mut low_stat.region_manifest
{
*file_removed_cnt = 2;
}
let mut high_stat = mock_region_stat(
high_score_region,
RegionRole::Leader,
TEST_REGION_SIZE_200MB,
50,
); // 200MB
// Set high file removal rate for high_score_region
if let RegionManifestInfo::Mito {
file_removed_cnt, ..
} = &mut high_stat.region_manifest
{
*file_removed_cnt = 20;
}
let table_stats = HashMap::from([(table_id, vec![low_stat, high_stat])]);
let ctx = Arc::new(MockSchedulerCtx {
table_to_region_stats: Arc::new(Mutex::new(Some(table_stats))),
..Default::default()
});
let config = GcSchedulerOptions {
sst_count_weight: 1.0,
file_removed_count_weight: 0.5,
min_region_size_threshold: 100 * 1024 * 1024, // 100MB (default)
..Default::default()
};
let scheduler = GcScheduler {
ctx: ctx.clone(),
receiver: GcScheduler::channel().1,
config,
region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())),
};
let stats = ctx
.table_to_region_stats
.lock()
.unwrap()
.clone()
.unwrap_or_default();
let candidates = scheduler.select_gc_candidates(&stats).await.unwrap();
// Should select both regions but high score region should be first
assert_eq!(
candidates.len(),
1,
"Expected 1 table with candidates, got {}",
candidates.len()
);
if let Some(table_candidates) = candidates.get(&table_id) {
assert_eq!(
table_candidates.len(),
2,
"Expected 2 candidates for table {}, got {}",
table_id,
table_candidates.len()
);
// Higher score region should come first (sorted by score descending)
assert_eq!(
table_candidates[0].region_id, high_score_region,
"High score region should be first"
);
assert!(
table_candidates[0].score > table_candidates[1].score,
"High score region should have higher score: {} > {}",
table_candidates[0].score,
table_candidates[1].score
);
} else {
panic!("Expected table {} to have candidates", table_id);
}
}
#[tokio::test]
async fn test_gc_candidate_regions_per_table_threshold() {
init_default_ut_logging();
let table_id = 1;
// Create 10 regions for the same table
let mut region_stats = Vec::new();
for i in 0..10 {
let region_id = RegionId::new(table_id, i + 1);
let mut stat = mock_region_stat(region_id, RegionRole::Leader, TEST_REGION_SIZE_200MB, 20); // 200MB
// Set different file removal rates to create different scores
// Higher region IDs get higher scores (better GC candidates)
if let RegionManifestInfo::Mito {
file_removed_cnt, ..
} = &mut stat.region_manifest
{
*file_removed_cnt = (i as u64 + 1) * 2; // Region 1: 2, Region 2: 4, ..., Region 10: 20
}
region_stats.push(stat);
}
let table_stats = HashMap::from([(table_id, region_stats)]);
let ctx = Arc::new(MockSchedulerCtx {
table_to_region_stats: Arc::new(Mutex::new(Some(table_stats))),
..Default::default()
});
// Set regions_per_table_threshold to 3
let config = GcSchedulerOptions {
regions_per_table_threshold: 3,
min_region_size_threshold: 100 * 1024 * 1024, // 100MB (default)
..Default::default()
};
let scheduler = GcScheduler {
ctx: ctx.clone(),
receiver: GcScheduler::channel().1,
config,
region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())),
};
let stats = ctx
.table_to_region_stats
.lock()
.unwrap()
.clone()
.unwrap_or_default();
let candidates = scheduler.select_gc_candidates(&stats).await.unwrap();
// Should have 1 table with candidates
assert_eq!(
candidates.len(),
1,
"Expected 1 table with candidates, got {}",
candidates.len()
);
if let Some(table_candidates) = candidates.get(&table_id) {
// Should only have 3 candidates due to regions_per_table_threshold
assert_eq!(
table_candidates.len(),
3,
"Expected 3 candidates for table {} due to regions_per_table_threshold, got {}",
table_id,
table_candidates.len()
);
// Verify that the top 3 scoring regions are selected
// Regions 8, 9, 10 should have the highest scores (file_removed_cnt: 16, 18, 20)
// They should be returned in descending order by score
let expected_regions = vec![10, 9, 8];
let actual_regions: Vec<u32> = table_candidates
.iter()
.map(|c| c.region_id.region_number())
.collect();
assert_eq!(
actual_regions, expected_regions,
"Expected regions {:?} to be selected, got {:?}",
expected_regions, actual_regions
);
// Verify they are sorted by score in descending order
for i in 0..table_candidates.len() - 1 {
assert!(
table_candidates[i].score >= table_candidates[i + 1].score,
"Candidates should be sorted by score descending: {} >= {}",
table_candidates[i].score,
table_candidates[i + 1].score
);
}
} else {
panic!("Expected table {} to have candidates", table_id);
}
}

View File

@@ -0,0 +1,516 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use common_meta::key::table_route::PhysicalTableRouteValue;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_telemetry::{info, init_default_ut_logging};
use store_api::region_engine::RegionRole;
use store_api::storage::{FileId, FileRefsManifest, GcReport, RegionId};
use crate::gc::mock::{
MockSchedulerCtx, TEST_REGION_SIZE_200MB, mock_candidate, mock_region_stat, new_candidate,
};
use crate::gc::{GcScheduler, GcSchedulerOptions};
/// Concurrent Processing Tests
#[tokio::test]
async fn test_concurrent_table_processing_limits() {
init_default_ut_logging();
let mut candidates = HashMap::new();
let mut gc_reports = HashMap::new();
// Create many tables with candidates
for table_id in 1..=10 {
let region_id = RegionId::new(table_id, 1);
candidates.insert(table_id, vec![new_candidate(region_id, 1.0)]);
gc_reports.insert(
region_id,
GcReport {
deleted_files: HashMap::from([(region_id, vec![FileId::random()])]),
..Default::default()
},
);
}
let ctx = MockSchedulerCtx {
candidates: Arc::new(Mutex::new(Some(candidates))),
file_refs: Arc::new(Mutex::new(Some(FileRefsManifest {
manifest_version: (1..=10).map(|i| (RegionId::new(i, 1), 1)).collect(),
..Default::default()
}))),
gc_reports: Arc::new(Mutex::new(gc_reports)),
..Default::default()
}
.with_table_routes(
(1..=10)
.map(|table_id| {
let region_id = RegionId::new(table_id, 1);
(table_id, (table_id, vec![(region_id, Peer::new(1, ""))]))
})
.collect(),
);
let ctx = Arc::new(ctx);
let config = GcSchedulerOptions {
max_concurrent_tables: 3, // Set a low limit
retry_backoff_duration: Duration::from_millis(50), // for faster test
..Default::default()
};
let scheduler = GcScheduler {
ctx: ctx.clone(),
receiver: GcScheduler::channel().1,
config,
region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())),
};
let candidates = ctx.candidates.lock().unwrap().clone().unwrap_or_default();
// Convert table-based candidates to datanode-based candidates
let peer = Peer::new(1, "");
let datanode_to_candidates = HashMap::from([(
peer,
candidates
.into_iter()
.flat_map(|(table_id, candidates)| candidates.into_iter().map(move |c| (table_id, c)))
.collect(),
)]);
let report = scheduler
.parallel_process_datanodes(datanode_to_candidates)
.await;
// Should process all datanodes
assert_eq!(report.per_datanode_reports.len(), 1);
assert_eq!(report.failed_datanodes.len(), 0);
}
#[tokio::test]
async fn test_datanode_processes_tables_with_partial_gc_failures() {
init_default_ut_logging();
let table1 = 1;
let region1 = RegionId::new(table1, 1);
let table2 = 2;
let region2 = RegionId::new(table2, 1);
let peer = Peer::new(1, "");
let mut candidates = HashMap::new();
candidates.insert(table1, vec![new_candidate(region1, 1.0)]);
candidates.insert(table2, vec![new_candidate(region2, 1.0)]);
// Set up GC reports for success and failure
let mut gc_reports = HashMap::new();
gc_reports.insert(
region1,
GcReport {
deleted_files: HashMap::from([(region1, vec![])]),
..Default::default()
},
);
// region2 will have no GC report, simulating failure
let file_refs = FileRefsManifest {
manifest_version: HashMap::from([(region1, 1), (region2, 1)]),
..Default::default()
};
let ctx = Arc::new(
MockSchedulerCtx {
gc_reports: Arc::new(Mutex::new(gc_reports)),
file_refs: Arc::new(Mutex::new(Some(file_refs))),
candidates: Arc::new(Mutex::new(Some(candidates))),
..Default::default()
}
.with_table_routes(HashMap::from([
(table1, (table1, vec![(region1, peer.clone())])),
(table2, (table2, vec![(region2, peer.clone())])),
])),
);
let scheduler = GcScheduler {
ctx: ctx.clone(),
receiver: GcScheduler::channel().1,
config: GcSchedulerOptions::default(),
region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())),
};
let candidates = ctx.candidates.lock().unwrap().clone().unwrap_or_default();
// Convert table-based candidates to datanode-based candidates
let datanode_to_candidates = HashMap::from([(
peer,
candidates
.into_iter()
.flat_map(|(table_id, candidates)| candidates.into_iter().map(move |c| (table_id, c)))
.collect(),
)]);
let report = scheduler
.parallel_process_datanodes(datanode_to_candidates)
.await;
// Should have one datanode with mixed results
assert_eq!(report.per_datanode_reports.len(), 1);
// also check one failed region (region2 has no GC report, so it should be in need_retry_regions)
let datanode_report = report.per_datanode_reports.values().next().unwrap();
assert_eq!(datanode_report.need_retry_regions.len(), 1);
assert_eq!(report.failed_datanodes.len(), 0);
}
// Region Concurrency Tests
#[tokio::test]
async fn test_region_gc_concurrency_limit() {
init_default_ut_logging();
let table_id = 1;
let peer = Peer::new(1, "");
// Create multiple regions for the same table
let mut region_stats = Vec::new();
let mut candidates = Vec::new();
let mut gc_reports = HashMap::new();
for i in 1..=10 {
let region_id = RegionId::new(table_id, i as u32);
let region_stat =
mock_region_stat(region_id, RegionRole::Leader, TEST_REGION_SIZE_200MB, 10); // 200MB
region_stats.push(region_stat);
candidates.push(mock_candidate(region_id));
gc_reports.insert(
region_id,
GcReport {
deleted_files: HashMap::from([(
region_id,
vec![FileId::random(), FileId::random()],
)]),
..Default::default()
},
);
}
let table_stats = HashMap::from([(table_id, region_stats)]);
let file_refs = FileRefsManifest {
manifest_version: (1..=10)
.map(|i| (RegionId::new(table_id, i as u32), 1))
.collect(),
..Default::default()
};
let ctx = Arc::new(
MockSchedulerCtx {
table_to_region_stats: Arc::new(Mutex::new(Some(table_stats))),
gc_reports: Arc::new(Mutex::new(gc_reports)),
file_refs: Arc::new(Mutex::new(Some(file_refs))),
..Default::default()
}
.with_table_routes(HashMap::from([(
table_id,
(
table_id,
(1..=10)
.map(|i| (RegionId::new(table_id, i as u32), peer.clone()))
.collect(),
),
)])),
);
// Configure low concurrency limit
let config = GcSchedulerOptions {
region_gc_concurrency: 3, // Only 3 regions can be processed concurrently
retry_backoff_duration: Duration::from_millis(50), // for faster test
..Default::default()
};
let scheduler = GcScheduler {
ctx: ctx.clone(),
receiver: GcScheduler::channel().1,
config,
region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())),
};
let start_time = Instant::now();
let report = scheduler
.process_datanode_gc(
peer,
candidates.into_iter().map(|c| (table_id, c)).collect(),
)
.await
.unwrap();
let duration = start_time.elapsed();
// All regions should be processed successfully
// Check that all 10 regions have deleted files
assert_eq!(report.deleted_files.len(), 10);
for i in 1..=10 {
let region_id = RegionId::new(table_id, i as u32);
assert!(report.deleted_files.contains_key(&region_id));
assert_eq!(report.deleted_files[&region_id].len(), 2); // Each region has 2 deleted files
}
assert!(report.need_retry_regions.is_empty());
// Verify that concurrency limit was respected (this is hard to test directly,
// but we can verify that the processing completed successfully)
info!(
"Processed 10 regions with concurrency limit 3 in {:?}",
duration
);
}
#[tokio::test]
async fn test_region_gc_concurrency_with_partial_failures() {
init_default_ut_logging();
let table_id = 1;
let peer = Peer::new(1, "");
// Create multiple regions with mixed success/failure
let mut region_stats = Vec::new();
let mut candidates = Vec::new();
let mut gc_reports = HashMap::new();
// Create the context first so we can set errors on it
let ctx = Arc::new(MockSchedulerCtx::default());
for i in 1..=6 {
let region_id = RegionId::new(table_id, i as u32);
let region_stat =
mock_region_stat(region_id, RegionRole::Leader, TEST_REGION_SIZE_200MB, 10); // 200MB
region_stats.push(region_stat);
candidates.push(mock_candidate(region_id));
if i % 2 == 0 {
// Even regions will succeed
gc_reports.insert(
region_id,
GcReport {
deleted_files: HashMap::from([(
region_id,
vec![FileId::random(), FileId::random()],
)]),
..Default::default()
},
);
} else {
// Odd regions will fail - don't add them to gc_reports
// This will cause them to be marked as needing retry
}
}
let table_stats = HashMap::from([(table_id, region_stats)]);
let file_refs = FileRefsManifest {
manifest_version: (1..=6)
.map(|i| (RegionId::new(table_id, i as u32), 1))
.collect(),
..Default::default()
};
// Update the context with the data
*ctx.table_to_region_stats.lock().unwrap() = Some(table_stats);
*ctx.gc_reports.lock().unwrap() = gc_reports;
*ctx.file_refs.lock().unwrap() = Some(file_refs);
let region_routes = (1..=6)
.map(|i| RegionRoute {
region: Region::new_test(RegionId::new(table_id, i as u32)),
leader_peer: Some(peer.clone()),
..Default::default()
})
.collect();
*ctx.table_routes.lock().unwrap() = HashMap::from([(
table_id,
(table_id, PhysicalTableRouteValue::new(region_routes)),
)]);
// Configure concurrency limit
let config = GcSchedulerOptions {
region_gc_concurrency: 2, // Process 2 regions concurrently
retry_backoff_duration: Duration::from_millis(50), // for faster test
..Default::default()
};
let scheduler = GcScheduler {
ctx: ctx.clone(),
receiver: GcScheduler::channel().1,
config,
region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())),
};
let datanode_to_candidates = HashMap::from([(
peer.clone(),
candidates.into_iter().map(|c| (table_id, c)).collect(),
)]);
let report = scheduler
.parallel_process_datanodes(datanode_to_candidates)
.await;
let report = report.per_datanode_reports.get(&peer.id).unwrap();
// Should have 3 successful and 3 failed regions
// Even regions (2, 4, 6) should succeed, odd regions (1, 3, 5) should fail
let mut successful_regions = 0;
let mut failed_regions = 0;
for i in 1..=6 {
let region_id = RegionId::new(table_id, i as u32);
if i % 2 == 0 {
// Even regions should succeed
if report.deleted_files.contains_key(&region_id) {
successful_regions += 1;
}
} else {
// Odd regions should fail - they should be in need_retry_regions
if report.need_retry_regions.contains(&region_id) {
failed_regions += 1;
}
}
}
// In the new implementation, regions that cause gc_regions to return an error
// are added to need_retry_regions. Let's check if we have the expected mix.
info!(
"Successful regions: {}, Failed regions: {}",
successful_regions, failed_regions
);
info!(
"Deleted files: {:?}",
report.deleted_files.keys().collect::<Vec<_>>()
);
info!("Need retry regions: {:?}", report.need_retry_regions);
// The exact count might vary depending on how the mock handles errors,
// but we should have some successful and some failed regions
assert!(
successful_regions > 0,
"Should have at least some successful regions"
);
assert!(
failed_regions > 0,
"Should have at least some failed regions"
);
}
#[tokio::test]
async fn test_region_gc_concurrency_with_retryable_errors() {
init_default_ut_logging();
let table_id = 1;
let peer = Peer::new(1, "");
// Create multiple regions
let mut region_stats = Vec::new();
let mut candidates = Vec::new();
for i in 1..=5 {
let region_id = RegionId::new(table_id, i as u32);
let region_stat =
mock_region_stat(region_id, RegionRole::Leader, TEST_REGION_SIZE_200MB, 10); // 200MB
region_stats.push(region_stat);
candidates.push(mock_candidate(region_id));
}
let table_stats = HashMap::from([(table_id, region_stats)]);
let file_refs = FileRefsManifest {
manifest_version: (1..=5)
.map(|i| (RegionId::new(table_id, i as u32), 1))
.collect(),
..Default::default()
};
let gc_report = (1..=5)
.map(|i| {
let region_id = RegionId::new(table_id, i as u32);
(
region_id,
// mock the actual gc report with deleted files when succeeded(even no files to delete)
GcReport::new(HashMap::from([(region_id, vec![])]), HashSet::new()),
)
})
.collect();
let ctx = Arc::new(
MockSchedulerCtx {
table_to_region_stats: Arc::new(Mutex::new(Some(table_stats))),
file_refs: Arc::new(Mutex::new(Some(file_refs))),
gc_reports: Arc::new(Mutex::new(gc_report)),
..Default::default()
}
.with_table_routes(HashMap::from([(
table_id,
(
table_id,
(1..=5)
.map(|i| (RegionId::new(table_id, i as u32), peer.clone()))
.collect(),
),
)])),
);
// Configure concurrency limit
let config = GcSchedulerOptions {
region_gc_concurrency: 2, // Process 2 regions concurrently
retry_backoff_duration: Duration::from_millis(50),
..Default::default()
};
let scheduler = GcScheduler {
ctx: ctx.clone(),
receiver: GcScheduler::channel().1,
config,
region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())),
};
let datanode_to_candidates = HashMap::from([(
peer.clone(),
candidates.into_iter().map(|c| (table_id, c)).collect(),
)]);
let report = scheduler
.parallel_process_datanodes(datanode_to_candidates)
.await;
let report = report.per_datanode_reports.get(&peer.id).unwrap();
// In the new implementation without retry logic, all regions should be processed
// The exact behavior depends on how the mock handles the regions
info!(
"Deleted files: {:?}",
report.deleted_files.keys().collect::<Vec<_>>()
);
info!("Need retry regions: {:?}", report.need_retry_regions);
// We should have processed all 5 regions in some way
let total_processed = report.deleted_files.len() + report.need_retry_regions.len();
assert_eq!(total_processed, 5, "Should have processed all 5 regions");
}

View File

@@ -0,0 +1,197 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Instant;
use common_meta::datanode::RegionManifestInfo;
use common_telemetry::init_default_ut_logging;
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;
use crate::gc::mock::{MockSchedulerCtx, TEST_REGION_SIZE_200MB, mock_region_stat};
use crate::gc::{GcScheduler, GcSchedulerOptions};
/// Configuration Tests
#[tokio::test]
async fn test_different_gc_weights() {
init_default_ut_logging();
let table_id = 1;
let region_id = RegionId::new(table_id, 1);
let mut region_stat =
mock_region_stat(region_id, RegionRole::Leader, TEST_REGION_SIZE_200MB, 10); // 200MB to pass size threshold
if let RegionManifestInfo::Mito {
file_removed_cnt, ..
} = &mut region_stat.region_manifest
{
*file_removed_cnt = 5;
}
let table_stats = HashMap::from([(table_id, vec![region_stat])]);
let ctx = Arc::new(MockSchedulerCtx {
table_to_region_stats: Arc::new(Mutex::new(Some(table_stats))),
..Default::default()
});
// Test with different weights
let config1 = GcSchedulerOptions {
sst_count_weight: 2.0,
file_removed_count_weight: 0.5,
min_region_size_threshold: 100 * 1024 * 1024, // 100MB (default)
..Default::default()
};
let scheduler1 = GcScheduler {
ctx: ctx.clone(),
receiver: GcScheduler::channel().1,
config: config1,
region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())),
};
let stats = ctx
.table_to_region_stats
.lock()
.unwrap()
.clone()
.unwrap_or_default();
let candidates1 = scheduler1.select_gc_candidates(&stats).await.unwrap();
let config2 = GcSchedulerOptions {
sst_count_weight: 0.5,
file_removed_count_weight: 2.0,
min_region_size_threshold: 100 * 1024 * 1024, // 100MB (default)
..Default::default()
};
let scheduler2 = GcScheduler {
ctx: ctx.clone(),
receiver: GcScheduler::channel().1,
config: config2,
region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())),
};
let stats = &ctx
.table_to_region_stats
.lock()
.unwrap()
.clone()
.unwrap_or_default();
let candidates2 = scheduler2.select_gc_candidates(stats).await.unwrap();
// Both should select the region but with different scores
assert_eq!(
candidates1.len(),
1,
"Expected 1 table with candidates for config1, got {}",
candidates1.len()
);
assert_eq!(
candidates2.len(),
1,
"Expected 1 table with candidates for config2, got {}",
candidates2.len()
);
// Verify the region is actually selected
assert!(
candidates1.contains_key(&table_id),
"Config1 should contain table_id {}",
table_id
);
assert!(
candidates2.contains_key(&table_id),
"Config2 should contain table_id {}",
table_id
);
}
#[tokio::test]
async fn test_regions_per_table_threshold() {
init_default_ut_logging();
let table_id = 1;
let mut region_stats = Vec::new();
// Create many regions
for i in 1..=10 {
let region_id = RegionId::new(table_id, i as u32);
let mut stat = mock_region_stat(region_id, RegionRole::Leader, TEST_REGION_SIZE_200MB, 10); // 200MB
if let RegionManifestInfo::Mito {
file_removed_cnt, ..
} = &mut stat.region_manifest
{
*file_removed_cnt = 5;
}
region_stats.push(stat);
}
let table_stats = HashMap::from([(table_id, region_stats)]);
let ctx = Arc::new(MockSchedulerCtx {
table_to_region_stats: Arc::new(Mutex::new(Some(table_stats))),
..Default::default()
});
let config = GcSchedulerOptions {
regions_per_table_threshold: 3, // Limit to 3 regions per table
min_region_size_threshold: 100 * 1024 * 1024, // 100MB (default)
..Default::default()
};
let scheduler = GcScheduler {
ctx: ctx.clone(),
receiver: GcScheduler::channel().1,
config,
region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())),
};
let stats = ctx
.table_to_region_stats
.lock()
.unwrap()
.clone()
.unwrap_or_default();
let candidates = scheduler.select_gc_candidates(&stats).await.unwrap();
assert_eq!(
candidates.len(),
1,
"Expected 1 table with candidates, got {}",
candidates.len()
);
if let Some(table_candidates) = candidates.get(&table_id) {
// Should be limited to 3 regions
assert_eq!(
table_candidates.len(),
3,
"Expected 3 candidates for table {}, got {}",
table_id,
table_candidates.len()
);
} else {
panic!("Expected table {} to have candidates", table_id);
}
}

View File

@@ -0,0 +1,293 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use common_meta::datanode::RegionManifestInfo;
use common_meta::peer::Peer;
use common_telemetry::init_default_ut_logging;
use store_api::region_engine::RegionRole;
use store_api::storage::{FileId, FileRefsManifest, GcReport, RegionId};
use crate::gc::mock::{
MockSchedulerCtx, TEST_REGION_SIZE_200MB, mock_region_stat, new_empty_report_with,
};
use crate::gc::{GcScheduler, GcSchedulerOptions};
/// Error Handling Tests
#[tokio::test]
async fn test_gc_regions_failure_handling() {
init_default_ut_logging();
let table_id = 1;
let region_id = RegionId::new(table_id, 1);
let peer = Peer::new(1, "");
// Create region stat with proper size and file_removed_cnt to ensure it gets selected as candidate
let mut region_stat =
mock_region_stat(region_id, RegionRole::Leader, TEST_REGION_SIZE_200MB, 10); // 200MB
if let RegionManifestInfo::Mito {
file_removed_cnt, ..
} = &mut region_stat.region_manifest
{
*file_removed_cnt = 5;
}
let table_stats = HashMap::from([(table_id, vec![region_stat])]);
// Create a context that will return an error for gc_regions
let mut gc_reports = HashMap::new();
gc_reports.insert(region_id, GcReport::default());
// Inject an error for gc_regions method
let gc_error = crate::error::UnexpectedSnafu {
violated: "Simulated GC failure for testing".to_string(),
}
.build();
let file_refs = FileRefsManifest {
manifest_version: HashMap::from([(region_id, 1)]),
file_refs: HashMap::from([(region_id, HashSet::from([FileId::random()]))]),
};
let ctx = Arc::new(
MockSchedulerCtx {
table_to_region_stats: Arc::new(Mutex::new(Some(table_stats))),
gc_reports: Arc::new(Mutex::new(gc_reports)),
file_refs: Arc::new(Mutex::new(Some(file_refs))),
..Default::default()
}
.with_table_routes(HashMap::from([(
table_id,
(table_id, vec![(region_id, peer)]),
)]))
.with_gc_regions_error(gc_error),
);
let scheduler = GcScheduler {
ctx: ctx.clone(),
receiver: GcScheduler::channel().1,
config: GcSchedulerOptions::default(),
region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())),
};
// This should handle the failure gracefully
let report = scheduler.handle_tick().await.unwrap();
// Validate the report shows the failure handling
assert_eq!(
report.per_datanode_reports.len(),
1,
"Should process 1 datanode despite failure"
);
assert_eq!(
report.failed_datanodes.len(),
0,
"Should have 0 failed datanodes (failure handled via need_retry_regions)"
);
// Check that the region is in need_retry_regions due to the failure
let datanode_report = report.per_datanode_reports.values().next().unwrap();
assert_eq!(
datanode_report.need_retry_regions.len(),
1,
"Should have 1 region in need_retry_regions due to failure"
);
assert!(
datanode_report.need_retry_regions.contains(&region_id),
"Region should be in need_retry_regions"
);
// Verify that calls were made despite potential failures
assert_eq!(
*ctx.get_table_to_region_stats_calls.lock().unwrap(),
1,
"Expected 1 call to get_table_to_region_stats"
);
assert!(
*ctx.get_file_references_calls.lock().unwrap() >= 1,
"Expected at least 1 call to get_file_references"
);
assert!(
*ctx.gc_regions_calls.lock().unwrap() >= 1,
"Expected at least 1 call to gc_regions"
);
}
#[tokio::test]
async fn test_get_file_references_failure() {
init_default_ut_logging();
let table_id = 1;
let region_id = RegionId::new(table_id, 1);
let peer = Peer::new(1, "");
// Create region stat with proper size and file_removed_cnt to ensure it gets selected as candidate
let mut region_stat =
mock_region_stat(region_id, RegionRole::Leader, TEST_REGION_SIZE_200MB, 10); // 200MB
if let RegionManifestInfo::Mito {
file_removed_cnt, ..
} = &mut region_stat.region_manifest
{
*file_removed_cnt = 5;
}
let table_stats = HashMap::from([(table_id, vec![region_stat])]);
// Create context with empty file refs (simulating failure)
let ctx = Arc::new(
MockSchedulerCtx {
table_to_region_stats: Arc::new(Mutex::new(Some(table_stats))),
file_refs: Arc::new(Mutex::new(Some(FileRefsManifest::default()))),
gc_reports: Arc::new(Mutex::new(HashMap::from([(
region_id,
new_empty_report_with([region_id]),
)]))),
..Default::default()
}
.with_table_routes(HashMap::from([(
table_id,
(table_id, vec![(region_id, peer)]),
)])),
);
let scheduler = GcScheduler {
ctx: ctx.clone(),
receiver: GcScheduler::channel().1,
config: GcSchedulerOptions {
retry_backoff_duration: Duration::from_millis(10), // shorten for test
..Default::default()
},
region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())),
};
let report = scheduler.handle_tick().await.unwrap();
// Validate the report shows the expected results
// In the new implementation, even if get_file_references fails, we still create a datanode report
assert_eq!(
report.per_datanode_reports.len(),
1,
"Should process 1 datanode"
);
assert_eq!(
report.failed_datanodes.len(),
0,
"Should have 0 failed datanodes (failure handled gracefully)"
);
// The region should be processed but may have empty results due to file refs failure
let datanode_report = report.per_datanode_reports.values().next().unwrap();
// The current implementation still processes the region even with file refs failure
// and creates an empty entry in deleted_files
assert!(
datanode_report.deleted_files.contains_key(&region_id),
"Should have region in deleted_files (even if empty)"
);
assert!(
datanode_report.deleted_files[&region_id].is_empty(),
"Should have empty deleted files due to file refs failure"
);
// Should still attempt to get file references (may be called multiple times due to retry logic)
assert!(
*ctx.get_file_references_calls.lock().unwrap() >= 1,
"Expected at least 1 call to get_file_references, got {}",
*ctx.get_file_references_calls.lock().unwrap()
);
}
#[tokio::test]
async fn test_get_table_route_failure() {
init_default_ut_logging();
let table_id = 1;
let region_id = RegionId::new(table_id, 1);
// Create region stat with proper size and file_removed_cnt to ensure it gets selected as candidate
let mut region_stat =
mock_region_stat(region_id, RegionRole::Leader, TEST_REGION_SIZE_200MB, 10); // 200MB
if let RegionManifestInfo::Mito {
file_removed_cnt, ..
} = &mut region_stat.region_manifest
{
*file_removed_cnt = 5;
}
let table_stats = HashMap::from([(table_id, vec![region_stat])]);
// Inject an error for get_table_route method to simulate failure
let route_error = crate::error::UnexpectedSnafu {
violated: "Simulated table route failure for testing".to_string(),
}
.build();
// Create context with table route error injection
let ctx = Arc::new(MockSchedulerCtx {
table_to_region_stats: Arc::new(Mutex::new(Some(table_stats))),
..Default::default()
});
ctx.set_table_route_error(route_error);
let scheduler = GcScheduler {
ctx: ctx.clone(),
receiver: GcScheduler::channel().1,
config: GcSchedulerOptions::default(),
region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())),
};
// Get candidates first
let stats = &ctx
.table_to_region_stats
.lock()
.unwrap()
.clone()
.unwrap_or_default();
let candidates = scheduler.select_gc_candidates(stats).await.unwrap();
// Convert table-based candidates to datanode-based candidates
let datanode_to_candidates = HashMap::from([(
Peer::new(1, ""),
candidates
.into_iter()
.flat_map(|(table_id, candidates)| candidates.into_iter().map(move |c| (table_id, c)))
.collect(),
)]);
// This should handle table route failure gracefully
let report = scheduler
.parallel_process_datanodes(datanode_to_candidates)
.await;
// Should process the datanode but handle route error gracefully
assert_eq!(
report.per_datanode_reports.len(),
0,
"Expected 0 datanode report"
);
assert_eq!(
report.failed_datanodes.len(),
1,
"Expected 1 failed datanodes (route error handled gracefully)"
);
assert!(
report.failed_datanodes.contains_key(&1),
"Failed datanodes should contain the datanode with route error"
);
}

View File

@@ -0,0 +1,272 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use common_meta::peer::Peer;
use common_telemetry::init_default_ut_logging;
use store_api::region_engine::RegionRole;
use store_api::storage::{FileId, FileRefsManifest, GcReport, RegionId};
use crate::gc::mock::{MockSchedulerCtx, TEST_REGION_SIZE_200MB, mock_candidate, mock_region_stat};
use crate::gc::{GcScheduler, GcSchedulerOptions};
// Full File Listing Tests
#[tokio::test]
async fn test_full_file_listing_first_time_gc() {
init_default_ut_logging();
let table_id = 1;
let region_id = RegionId::new(table_id, 1);
let peer = Peer::new(1, "");
let region_stat = mock_region_stat(region_id, RegionRole::Leader, TEST_REGION_SIZE_200MB, 10); // 200MB
let table_stats = HashMap::from([(table_id, vec![region_stat])]);
let gc_report = GcReport {
deleted_files: HashMap::from([(region_id, vec![FileId::random(), FileId::random()])]),
..Default::default()
};
let file_refs = FileRefsManifest {
manifest_version: HashMap::from([(region_id, 1)]),
..Default::default()
};
let ctx = Arc::new(
MockSchedulerCtx {
table_to_region_stats: Arc::new(Mutex::new(Some(table_stats))),
gc_reports: Arc::new(Mutex::new(HashMap::from([(region_id, gc_report)]))),
file_refs: Arc::new(Mutex::new(Some(file_refs))),
..Default::default()
}
.with_table_routes(HashMap::from([(
table_id,
(table_id, vec![(region_id, peer.clone())]),
)])),
);
// Configure short full file listing interval for testing
let config = GcSchedulerOptions {
full_file_listing_interval: Duration::from_secs(3600), // 1 hour
..Default::default()
};
let scheduler = GcScheduler {
ctx: ctx.clone(),
receiver: GcScheduler::channel().1,
config,
region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())),
};
// First GC - should use full listing since region has never been GC'd
let reports = scheduler
.process_datanode_gc(peer.clone(), vec![(table_id, mock_candidate(region_id))])
.await
.unwrap();
assert_eq!(reports.deleted_files.len(), 1);
// Verify that full listing was used by checking the tracker
let tracker = scheduler.region_gc_tracker.lock().await;
let gc_info = tracker
.get(&region_id)
.expect("Region should be in tracker");
assert!(
gc_info.last_full_listing_time.is_some(),
"First GC should use full listing"
);
}
#[tokio::test]
async fn test_full_file_listing_interval_enforcement() {
init_default_ut_logging();
let table_id = 1;
let region_id = RegionId::new(table_id, 1);
let peer = Peer::new(1, "");
let region_stat = mock_region_stat(region_id, RegionRole::Leader, TEST_REGION_SIZE_200MB, 10); // 200MB
let table_stats = HashMap::from([(table_id, vec![region_stat])]);
let gc_report = GcReport {
deleted_files: HashMap::from([(region_id, vec![FileId::random(), FileId::random()])]),
..Default::default()
};
let file_refs = FileRefsManifest {
manifest_version: HashMap::from([(region_id, 1)]),
..Default::default()
};
let ctx = Arc::new(
MockSchedulerCtx {
table_to_region_stats: Arc::new(Mutex::new(Some(table_stats))),
gc_reports: Arc::new(Mutex::new(HashMap::from([(region_id, gc_report)]))),
file_refs: Arc::new(Mutex::new(Some(file_refs))),
..Default::default()
}
.with_table_routes(HashMap::from([(
table_id,
(table_id, vec![(region_id, peer.clone())]),
)])),
);
// Configure very short full file listing interval for testing
let config = GcSchedulerOptions {
full_file_listing_interval: Duration::from_millis(100), // 100ms
..Default::default()
};
let scheduler = GcScheduler {
ctx: ctx.clone(),
receiver: GcScheduler::channel().1,
config,
region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())),
};
// First GC - should use full listing
let reports1 = scheduler
.process_datanode_gc(peer.clone(), vec![(table_id, mock_candidate(region_id))])
.await
.unwrap();
assert_eq!(reports1.deleted_files.len(), 1);
// Get the first full listing time
let first_full_listing_time = {
let tracker = scheduler.region_gc_tracker.lock().await;
let gc_info = tracker
.get(&region_id)
.expect("Region should be in tracker");
gc_info
.last_full_listing_time
.expect("Should have full listing time")
};
// Wait for interval to pass
tokio::time::sleep(Duration::from_millis(150)).await;
// Second GC - should use full listing again since interval has passed
let _reports2 = scheduler
.process_datanode_gc(peer.clone(), vec![(table_id, mock_candidate(region_id))])
.await
.unwrap();
// Verify that full listing was used again
let tracker = scheduler.region_gc_tracker.lock().await;
let gc_info = tracker
.get(&region_id)
.expect("Region should be in tracker");
let second_full_listing_time = gc_info
.last_full_listing_time
.expect("Should have full listing time");
assert!(
second_full_listing_time > first_full_listing_time,
"Second GC should update full listing time"
);
}
#[tokio::test]
async fn test_full_file_listing_no_interval_passed() {
init_default_ut_logging();
let table_id = 1;
let region_id = RegionId::new(table_id, 1);
let peer = Peer::new(1, "");
let region_stat = mock_region_stat(region_id, RegionRole::Leader, TEST_REGION_SIZE_200MB, 10); // 200MB
let table_stats = HashMap::from([(table_id, vec![region_stat])]);
let gc_report = GcReport {
deleted_files: HashMap::from([(region_id, vec![FileId::random(), FileId::random()])]),
..Default::default()
};
let file_refs = FileRefsManifest {
manifest_version: HashMap::from([(region_id, 1)]),
..Default::default()
};
let ctx = Arc::new(
MockSchedulerCtx {
table_to_region_stats: Arc::new(Mutex::new(Some(table_stats))),
gc_reports: Arc::new(Mutex::new(HashMap::from([(region_id, gc_report)]))),
file_refs: Arc::new(Mutex::new(Some(file_refs))),
..Default::default()
}
.with_table_routes(HashMap::from([(
table_id,
(table_id, vec![(region_id, peer.clone())]),
)])),
);
// Configure long full file listing interval
let config = GcSchedulerOptions {
full_file_listing_interval: Duration::from_secs(3600), // 1 hour
..Default::default()
};
let scheduler = GcScheduler {
ctx: ctx.clone(),
receiver: GcScheduler::channel().1,
config,
region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())),
};
// First GC - should use full listing
let reports1 = scheduler
.process_datanode_gc(peer.clone(), vec![(table_id, mock_candidate(region_id))])
.await
.unwrap();
assert_eq!(reports1.deleted_files.len(), 1);
// Get the first full listing time
let first_full_listing_time = {
let tracker = scheduler.region_gc_tracker.lock().await;
let gc_info = tracker
.get(&region_id)
.expect("Region should be in tracker");
gc_info
.last_full_listing_time
.expect("Should have full listing time")
};
// Second GC immediately - should NOT use full listing since interval hasn't passed
let reports2 = scheduler
.process_datanode_gc(peer.clone(), vec![(table_id, mock_candidate(region_id))])
.await
.unwrap();
assert_eq!(reports2.deleted_files.len(), 1);
// Verify that full listing time was NOT updated
let tracker = scheduler.region_gc_tracker.lock().await;
let gc_info = tracker
.get(&region_id)
.expect("Region should be in tracker");
let second_full_listing_time = gc_info
.last_full_listing_time
.expect("Should have full listing time");
assert_eq!(
second_full_listing_time, first_full_listing_time,
"Second GC should not update full listing time when interval hasn't passed"
);
}

View File

@@ -0,0 +1,252 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use common_meta::datanode::RegionManifestInfo;
use common_meta::peer::Peer;
use common_telemetry::init_default_ut_logging;
use store_api::region_engine::RegionRole;
use store_api::storage::{FileId, FileRefsManifest, GcReport, RegionId};
use crate::gc::mock::{
MockSchedulerCtx, TEST_REGION_SIZE_200MB, mock_region_stat, new_empty_report_with,
};
use crate::gc::{GcScheduler, GcSchedulerOptions};
// Integration Flow Tests
#[tokio::test]
async fn test_full_gc_workflow() {
init_default_ut_logging();
let table_id = 1;
let region_id = RegionId::new(table_id, 1);
let peer = Peer::new(1, "");
let mut region_stat =
mock_region_stat(region_id, RegionRole::Leader, TEST_REGION_SIZE_200MB, 10); // 200MB
if let RegionManifestInfo::Mito {
file_removed_cnt, ..
} = &mut region_stat.region_manifest
{
*file_removed_cnt = 5;
}
let table_stats = HashMap::from([(table_id, vec![region_stat])]);
let mut gc_reports = HashMap::new();
gc_reports.insert(
region_id,
GcReport {
deleted_files: HashMap::from([(region_id, vec![FileId::random(), FileId::random()])]),
..Default::default()
},
);
let file_refs = FileRefsManifest {
manifest_version: HashMap::from([(region_id, 1)]),
..Default::default()
};
let ctx = Arc::new(
MockSchedulerCtx {
table_to_region_stats: Arc::new(Mutex::new(Some(table_stats))),
gc_reports: Arc::new(Mutex::new(gc_reports)),
file_refs: Arc::new(Mutex::new(Some(file_refs))),
..Default::default()
}
.with_table_routes(HashMap::from([(
table_id,
(table_id, vec![(region_id, peer)]),
)])),
);
let scheduler = GcScheduler {
ctx: ctx.clone(),
receiver: GcScheduler::channel().1,
config: GcSchedulerOptions::default(),
region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())),
};
// Run the full workflow
let report = scheduler.handle_tick().await.unwrap();
// Validate the returned GcJobReport - should have 1 datanode report
assert_eq!(
report.per_datanode_reports.len(),
1,
"Should process 1 datanode"
);
assert_eq!(
report.failed_datanodes.len(),
0,
"Should have no failed datanodes"
);
// Get the datanode report
let datanode_report = report.per_datanode_reports.values().next().unwrap();
// Check that the region was processed successfully
assert!(
datanode_report.deleted_files.contains_key(&region_id),
"Should have deleted files for region"
);
assert_eq!(
datanode_report.deleted_files[&region_id].len(),
2,
"Should have 2 deleted files"
);
assert!(
datanode_report.need_retry_regions.is_empty(),
"Should have no retry regions"
);
// Verify all steps were executed
assert_eq!(
*ctx.get_table_to_region_stats_calls.lock().unwrap(),
1,
"Expected 1 call to get_table_to_region_stats"
);
assert_eq!(
*ctx.get_file_references_calls.lock().unwrap(),
1,
"Expected 1 call to get_file_references"
);
assert_eq!(
*ctx.gc_regions_calls.lock().unwrap(),
1,
"Expected 1 call to gc_regions"
);
}
#[tokio::test]
async fn test_tracker_cleanup() {
init_default_ut_logging();
let table_id = 1;
let region_id = RegionId::new(table_id, 1);
let peer = Peer::new(1, "");
// Create region stat with proper file_removed_cnt to ensure it gets selected as candidate
let mut region_stat =
mock_region_stat(region_id, RegionRole::Leader, TEST_REGION_SIZE_200MB, 10); // 200MB
if let RegionManifestInfo::Mito {
file_removed_cnt, ..
} = &mut region_stat.region_manifest
{
*file_removed_cnt = 5;
}
let table_stats = HashMap::from([(table_id, vec![region_stat])]);
let mut gc_reports = HashMap::new();
gc_reports.insert(region_id, new_empty_report_with([region_id]));
let file_refs = FileRefsManifest {
manifest_version: HashMap::from([(region_id, 1)]),
..Default::default()
};
let ctx = Arc::new(
MockSchedulerCtx {
table_to_region_stats: Arc::new(Mutex::new(Some(table_stats))),
gc_reports: Arc::new(Mutex::new(gc_reports)),
file_refs: Arc::new(Mutex::new(Some(file_refs))),
..Default::default()
}
.with_table_routes(HashMap::from([(
table_id,
(table_id, vec![(region_id, peer)]),
)])),
);
let old_region_gc_tracker = {
let mut tracker = HashMap::new();
tracker.insert(
region_id,
crate::gc::tracker::RegionGcInfo {
last_full_listing_time: Some(Instant::now() - Duration::from_secs(7200)), // 2 hours ago
last_gc_time: Instant::now() - Duration::from_secs(7200), // 2 hours ago
},
);
// also insert a different table that should also be cleaned up
tracker.insert(
RegionId::new(2, 1),
crate::gc::tracker::RegionGcInfo {
last_full_listing_time: Some(Instant::now() - Duration::from_secs(7200)), // 2 hours ago
last_gc_time: Instant::now() - Duration::from_secs(7200), // 2 hours ago
},
);
tracker
};
// Use a custom config with shorter cleanup interval to trigger cleanup
let config = GcSchedulerOptions {
// 30 minutes
tracker_cleanup_interval: Duration::from_secs(1800),
..Default::default()
};
let scheduler = GcScheduler {
ctx: ctx.clone(),
receiver: GcScheduler::channel().1,
config,
region_gc_tracker: Arc::new(tokio::sync::Mutex::new(old_region_gc_tracker)),
last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(
Instant::now() - Duration::from_secs(3600), // Old cleanup time (1 hour ago)
)),
};
let report = scheduler.handle_tick().await.unwrap();
// Validate the returned GcJobReport - should have 1 datanode report
assert_eq!(
report.per_datanode_reports.len(),
1,
"Should process 1 datanode"
);
assert_eq!(
report.failed_datanodes.len(),
0,
"Should have no failed datanodes"
);
// Get the datanode report
let datanode_report = report.per_datanode_reports.values().next().unwrap();
// Check that the region was processed successfully
assert!(
datanode_report.deleted_files.contains_key(&region_id),
"Should have deleted files for region"
);
assert!(
datanode_report.need_retry_regions.is_empty(),
"Should have no retry regions"
);
// Verify tracker was updated
let tracker = scheduler.region_gc_tracker.lock().await;
assert!(
tracker.contains_key(&region_id),
"Tracker should contain region {}",
region_id
);
// only one entry
assert_eq!(tracker.len(), 1, "Tracker should only have 1 entry");
}

View File

@@ -0,0 +1,155 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Instant;
use common_meta::peer::Peer;
use common_telemetry::init_default_ut_logging;
use store_api::storage::{FileRefsManifest, GcReport, RegionId};
use crate::gc::mock::{MockSchedulerCtx, new_candidate};
use crate::gc::{GcScheduler, GcSchedulerOptions};
/// Edge Case Tests
#[tokio::test]
async fn test_empty_file_refs_manifest() {
init_default_ut_logging();
let table_id = 1;
let region_id = RegionId::new(table_id, 1);
let peer = Peer::new(1, "");
let candidates = HashMap::from([(table_id, vec![new_candidate(region_id, 1.0)])]);
// Empty file refs manifest
let file_refs = FileRefsManifest::default();
let ctx = Arc::new(
MockSchedulerCtx {
file_refs: Arc::new(Mutex::new(Some(file_refs))),
candidates: Arc::new(Mutex::new(Some(candidates))),
..Default::default()
}
.with_table_routes(HashMap::from([(
table_id,
(table_id, vec![(region_id, peer)]),
)])),
);
let scheduler = GcScheduler {
ctx: ctx.clone(),
receiver: GcScheduler::channel().1,
config: GcSchedulerOptions::default(),
region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())),
};
let candidates = ctx.candidates.lock().unwrap().clone().unwrap_or_default();
// Convert table-based candidates to datanode-based candidates
let peer = Peer::new(1, "");
let datanode_to_candidates = HashMap::from([(
peer,
candidates
.into_iter()
.flat_map(|(table_id, candidates)| candidates.into_iter().map(move |c| (table_id, c)))
.collect(),
)]);
let report = scheduler
.parallel_process_datanodes(datanode_to_candidates)
.await;
assert_eq!(report.per_datanode_reports.len(), 1);
assert_eq!(report.failed_datanodes.len(), 0);
// Should handle empty file refs gracefully
}
#[tokio::test]
async fn test_multiple_regions_per_table() {
init_default_ut_logging();
let table_id = 1;
let region1 = RegionId::new(table_id, 1);
let region2 = RegionId::new(table_id, 2);
let region3 = RegionId::new(table_id, 3);
let peer = Peer::new(1, "");
let candidates = HashMap::from([(
table_id,
vec![
new_candidate(region1, 1.0),
new_candidate(region2, 2.0),
new_candidate(region3, 3.0),
],
)]);
let mut gc_reports = HashMap::new();
gc_reports.insert(region1, GcReport::default());
gc_reports.insert(region2, GcReport::default());
gc_reports.insert(region3, GcReport::default());
let file_refs = FileRefsManifest {
manifest_version: HashMap::from([(region1, 1), (region2, 1), (region3, 1)]),
..Default::default()
};
let ctx = Arc::new(
MockSchedulerCtx {
gc_reports: Arc::new(Mutex::new(gc_reports)),
file_refs: Arc::new(Mutex::new(Some(file_refs))),
candidates: Arc::new(Mutex::new(Some(candidates))),
..Default::default()
}
.with_table_routes(HashMap::from([(
table_id,
(
table_id,
vec![
(region1, peer.clone()),
(region2, peer.clone()),
(region3, peer.clone()),
],
),
)])),
);
let scheduler = GcScheduler {
ctx: ctx.clone(),
receiver: GcScheduler::channel().1,
config: GcSchedulerOptions::default(),
region_gc_tracker: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())),
};
let candidates = ctx.candidates.lock().unwrap().clone().unwrap_or_default();
// Convert table-based candidates to datanode-based candidates
let datanode_to_candidates = HashMap::from([(
peer.clone(),
candidates
.into_iter()
.flat_map(|(table_id, candidates)| candidates.into_iter().map(move |c| (table_id, c)))
.collect(),
)]);
let report = scheduler
.parallel_process_datanodes(datanode_to_candidates)
.await;
assert_eq!(report.per_datanode_reports.len(), 1);
assert_eq!(report.failed_datanodes.len(), 0);
}