From 91c6be273ce374de3fcae82c46a876c3a3bcdbf5 Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 5 Feb 2026 16:10:04 +0800 Subject: [PATCH] test: non happy path Signed-off-by: discord9 --- .../src/heartbeat/handler/gc_worker.rs | 15 +- src/meta-srv/src/gc/mock.rs | 3 + src/meta-srv/src/gc/scheduler.rs | 119 +++++---- src/meta-srv/src/service/procedure.rs | 59 ++--- src/mito2/src/gc.rs | 3 + src/store-api/src/storage/file.rs | 13 + tests-integration/src/test_util.rs | 9 + tests-integration/src/tests/gc/admin.rs | 240 +++++++++++++++++- 8 files changed, 345 insertions(+), 116 deletions(-) diff --git a/src/datanode/src/heartbeat/handler/gc_worker.rs b/src/datanode/src/heartbeat/handler/gc_worker.rs index 9eb5fb368f..f005793d98 100644 --- a/src/datanode/src/heartbeat/handler/gc_worker.rs +++ b/src/datanode/src/heartbeat/handler/gc_worker.rs @@ -102,17 +102,10 @@ impl InstructionHandler for GcRegionsHandler { reports.push(report); } - // Merge reports - let mut merged_report = GcReport::default(); - for report in reports { - merged_report - .deleted_files - .extend(report.deleted_files.into_iter()); - merged_report - .deleted_indexes - .extend(report.deleted_indexes.into_iter()); - } - Ok(merged_report) + // Merge reports + let mut merged_report = GcReport::default(); + for report in reports { + merged_report.merge(report); } .instrument(common_telemetry::tracing::info_span!("gc_worker_run")), ), diff --git a/src/meta-srv/src/gc/mock.rs b/src/meta-srv/src/gc/mock.rs index 67e886fba3..6c736490ab 100644 --- a/src/meta-srv/src/gc/mock.rs +++ b/src/meta-srv/src/gc/mock.rs @@ -50,13 +50,16 @@ pub const TEST_REGION_SIZE_200MB: u64 = 200_000_000; /// Helper function to create an empty GcReport for the given region IDs pub fn new_empty_report_with(region_ids: impl IntoIterator) -> GcReport { let mut deleted_files = HashMap::new(); + let mut processed_regions = HashSet::new(); for region_id in region_ids { deleted_files.insert(region_id, vec![]); + processed_regions.insert(region_id); } GcReport { deleted_files, deleted_indexes: HashMap::new(), need_retry_regions: HashSet::new(), + processed_regions, } } diff --git a/src/meta-srv/src/gc/scheduler.rs b/src/meta-srv/src/gc/scheduler.rs index 9aa44397eb..a30ff5578e 100644 --- a/src/meta-srv/src/gc/scheduler.rs +++ b/src/meta-srv/src/gc/scheduler.rs @@ -197,76 +197,73 @@ impl GcScheduler { ) -> Result { info!("Start to handle manual gc request"); - let report = if let Some(regions) = region_ids { - let full_listing = full_file_listing.unwrap_or(false); - let gc_timeout = timeout.unwrap_or(self.config.mailbox_timeout); - let mut dropped_regions = Vec::new(); - let mut active_regions = Vec::new(); - let mut dropped_routes_override = Region2Peers::new(); + // No specific regions, use default tick behavior + let Some(regions) = region_ids else { + let report = self.trigger_gc().await?; + info!("Finished manual gc request"); + return Ok(report); + }; - if !regions.is_empty() { - let region_set: HashSet = regions.iter().copied().collect(); - let table_reparts = self.ctx.get_table_reparts().await?; - let dropped_collector = DroppedRegionCollector::new( - self.ctx.as_ref(), - &self.config, - &self.region_gc_tracker, - ); - let dropped_assignment = dropped_collector - .collect_and_assign_with_cooldown(&table_reparts, false) - .await?; + // Empty regions list, return empty report + if regions.is_empty() { + info!("Finished manual gc request"); + return Ok(GcJobReport::default()); + } - let mut dropped_region_set = HashSet::new(); - for (_peer, overrides) in dropped_assignment.region_routes_override { - for (region_id, route) in overrides { - if region_set.contains(®ion_id) { - dropped_region_set.insert(region_id); - dropped_routes_override.insert(region_id, route); - } - } - } + let full_listing = full_file_listing.unwrap_or(false); + let gc_timeout = timeout.unwrap_or(self.config.mailbox_timeout); - for region_id in regions { - if dropped_region_set.contains(®ion_id) { - dropped_regions.push(region_id); - } else { - active_regions.push(region_id); - } + let region_set: HashSet = regions.iter().copied().collect(); + let table_reparts = self.ctx.get_table_reparts().await?; + let dropped_collector = + DroppedRegionCollector::new(self.ctx.as_ref(), &self.config, &self.region_gc_tracker); + let dropped_assignment = dropped_collector + .collect_and_assign_with_cooldown(&table_reparts, false) + .await?; + + let mut dropped_region_set = HashSet::new(); + let mut dropped_routes_override = Region2Peers::new(); + for overrides in dropped_assignment.region_routes_override.into_values() { + for (region_id, route) in overrides { + if region_set.contains(®ion_id) { + dropped_region_set.insert(region_id); + dropped_routes_override.insert(region_id, route); } } + } - let mut combined_report = GcReport::default(); + let (dropped_regions, active_regions): (Vec<_>, Vec<_>) = regions + .into_iter() + .partition(|region_id| dropped_region_set.contains(region_id)); - if !active_regions.is_empty() { - let report = self - .ctx - .gc_regions( - &active_regions, - full_listing, - gc_timeout, - Region2Peers::new(), - ) - .await?; - combined_report.merge(report); - } + let mut combined_report = GcReport::default(); - if !dropped_regions.is_empty() { - let report = self - .ctx - .gc_regions(&dropped_regions, true, gc_timeout, dropped_routes_override) - .await?; - combined_report.merge(report); - } + if !active_regions.is_empty() { + let report = self + .ctx + .gc_regions( + &active_regions, + full_listing, + gc_timeout, + Region2Peers::new(), + ) + .await?; + combined_report.merge(report); + } - let mut per_datanode_reports = HashMap::new(); - per_datanode_reports.insert(0, combined_report); - GcJobReport { - per_datanode_reports, - failed_datanodes: HashMap::new(), - } - } else { - // No specific regions, use default tick behavior - self.trigger_gc().await? + if !dropped_regions.is_empty() { + let report = self + .ctx + .gc_regions(&dropped_regions, true, gc_timeout, dropped_routes_override) + .await?; + combined_report.merge(report); + } + + let mut per_datanode_reports = HashMap::new(); + per_datanode_reports.insert(0, combined_report); + let report = GcJobReport { + per_datanode_reports, + failed_datanodes: HashMap::new(), }; info!("Finished manual gc request"); diff --git a/src/meta-srv/src/service/procedure.rs b/src/meta-srv/src/service/procedure.rs index eb37bbddd9..269dfd7d1d 100644 --- a/src/meta-srv/src/service/procedure.rs +++ b/src/meta-srv/src/service/procedure.rs @@ -310,39 +310,8 @@ impl Metasrv { .into_iter() .map(RegionId::from_u64) .collect(); - - // Use GcTickerRef to trigger manual GC - let gc_ticker = self.gc_ticker().context(error::UnexpectedSnafu { - violated: "GC ticker not available".to_string(), - })?; - - let (tx, rx) = tokio::sync::oneshot::channel(); - gc_ticker - .sender - .send(gc::Event::Manually { - sender: tx, - region_ids: Some(region_ids.clone()), - full_file_listing: Some(request.full_file_listing), - timeout: Some(request.timeout), - }) + self.trigger_gc_for_regions(region_ids, request.full_file_listing, request.timeout) .await - .map_err(|_| { - error::UnexpectedSnafu { - violated: "Failed to send GC event".to_string(), - } - .build() - })?; - - let job_report = rx.await.map_err(|_| { - error::UnexpectedSnafu { - violated: "GC job channel closed unexpectedly".to_string(), - } - .build() - })?; - - let report = gc_job_report_to_gc_report(job_report); - - Ok(gc_report_to_response(&report, region_ids.len() as u64)) } async fn handle_gc_table(&self, request: MetaGcTableRequest) -> error::Result { @@ -370,8 +339,17 @@ impl Metasrv { .context(TableMetadataManagerSnafu)?; let region_ids: Vec = route.region_routes.iter().map(|r| r.region.id).collect(); + self.trigger_gc_for_regions(region_ids, request.full_file_listing, request.timeout) + .await + } - // Use GcTickerRef to trigger manual GC + /// Triggers manual GC for specified regions and returns the GC response. + async fn trigger_gc_for_regions( + &self, + region_ids: Vec, + full_file_listing: bool, + timeout: Duration, + ) -> error::Result { let gc_ticker = self.gc_ticker().context(error::UnexpectedSnafu { violated: "GC ticker not available".to_string(), })?; @@ -381,9 +359,9 @@ impl Metasrv { .sender .send(gc::Event::Manually { sender: tx, - region_ids: Some(region_ids.clone()), - full_file_listing: Some(request.full_file_listing), - timeout: Some(request.timeout), + region_ids: Some(region_ids), + full_file_listing: Some(full_file_listing), + timeout: Some(timeout), }) .await .map_err(|_| { @@ -402,7 +380,7 @@ impl Metasrv { let report = gc_job_report_to_gc_report(job_report); - Ok(gc_report_to_response(&report, region_ids.len() as u64)) + Ok(gc_report_to_response(&report)) } } @@ -415,10 +393,7 @@ fn gc_job_report_to_gc_report(job_report: crate::gc::GcJobReport) -> store_api:: gc_report } -fn gc_report_to_response( - report: &store_api::storage::GcReport, - processed_regions: u64, -) -> GcResponse { +fn gc_report_to_response(report: &store_api::storage::GcReport) -> GcResponse { let deleted_files = report.deleted_files.values().map(|v| v.len() as u64).sum(); let deleted_indexes = report .deleted_indexes @@ -426,7 +401,7 @@ fn gc_report_to_response( .map(|v| v.len() as u64) .sum(); GcResponse { - processed_regions, + processed_regions: report.processed_regions.len() as u64, need_retry_regions: report .need_retry_regions .iter() diff --git a/src/mito2/src/gc.rs b/src/mito2/src/gc.rs index cae67637be..30d0f617fa 100644 --- a/src/mito2/src/gc.rs +++ b/src/mito2/src/gc.rs @@ -282,6 +282,7 @@ impl LocalGcWorker { let mut deleted_files = HashMap::new(); let mut deleted_indexes = HashMap::new(); + let mut processed_regions = HashSet::new(); let tmp_ref_files = self.read_tmp_ref_files().await?; for (region_id, region) in &self.regions { let per_region_time = std::time::Instant::now(); @@ -309,6 +310,7 @@ impl LocalGcWorker { .collect_vec(); deleted_files.insert(*region_id, files.into_iter().map(|f| f.file_id()).collect()); deleted_indexes.insert(*region_id, index_files); + processed_regions.insert(*region_id); debug!( "GC for region {} took {} secs.", region_id, @@ -323,6 +325,7 @@ impl LocalGcWorker { deleted_files, deleted_indexes, need_retry_regions: HashSet::new(), + processed_regions, }; Ok(report) } diff --git a/src/store-api/src/storage/file.rs b/src/store-api/src/storage/file.rs index 341a4c301a..8acc0e3b93 100644 --- a/src/store-api/src/storage/file.rs +++ b/src/store-api/src/storage/file.rs @@ -114,6 +114,8 @@ pub struct GcReport { pub deleted_indexes: HashMap>, /// Regions that need retry in next gc round, usually because their tmp ref files are outdated pub need_retry_regions: HashSet, + /// Regions successfully processed in this GC run + pub processed_regions: HashSet, } impl GcReport { @@ -126,6 +128,7 @@ impl GcReport { deleted_files, deleted_indexes, need_retry_regions, + processed_regions: HashSet::new(), } } @@ -139,7 +142,17 @@ impl GcReport { ); *self_files = dedup.into_iter().collect(); } + for (region, files) in other.deleted_indexes { + let self_files = self.deleted_indexes.entry(region).or_default(); + let dedup: HashSet<(FileId, IndexVersion)> = HashSet::from_iter( + std::mem::take(self_files) + .into_iter() + .chain(files.iter().cloned()), + ); + *self_files = dedup.into_iter().collect(); + } self.need_retry_regions.extend(other.need_retry_regions); + self.processed_regions.extend(other.processed_regions); // Remove regions that have succeeded from need_retry_regions self.need_retry_regions .retain(|region| !self.deleted_files.contains_key(region)); diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 0c232c3c69..e5e6321412 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -927,6 +927,15 @@ pub async fn execute_sql(instance: &Arc, sql: &str) -> Output { .unwrap() } +pub async fn try_execute_sql( + instance: &Arc, + sql: &str, +) -> servers::error::Result { + SqlQueryHandler::do_query(instance.as_ref(), sql, QueryContext::arc()) + .await + .remove(0) +} + pub async fn execute_sql_and_expect(instance: &Arc, sql: &str, expected: &str) { let output = execute_sql(instance, sql).await; let output = output.data.pretty_print().await; diff --git a/tests-integration/src/tests/gc/admin.rs b/tests-integration/src/tests/gc/admin.rs index 4686915567..2ae4844a0a 100644 --- a/tests-integration/src/tests/gc/admin.rs +++ b/tests-integration/src/tests/gc/admin.rs @@ -17,14 +17,16 @@ use std::time::Duration; use client::OutputData; +use common_meta::key::table_repart::TableRepartValue; use common_recordbatch::RecordBatches; use common_telemetry::info; use common_test_util::recordbatch::check_output_stream; -use datatypes::arrow::array::AsArray; +use datatypes::arrow::array::{Array, AsArray}; use datatypes::arrow::datatypes::UInt64Type; +use store_api::storage::RegionId; use super::{distributed_with_gc, list_sst_files}; -use crate::test_util::{StorageType, execute_sql}; +use crate::test_util::{StorageType, execute_sql, try_execute_sql}; #[tokio::test] async fn test_admin_gc_table_different_store() { @@ -289,3 +291,237 @@ async fn test_admin_gc_regions(store_type: &StorageType) { info!("ADMIN GC_REGIONS test completed successfully"); } + +#[tokio::test] +async fn test_admin_gc_missing_cases_different_store() { + let _ = dotenv::dotenv(); + common_telemetry::init_default_ut_logging(); + let store_type = StorageType::build_storage_types_based_on_env(); + info!("store type: {:?}", store_type); + for store in store_type { + info!( + "Running admin GC missing cases test with storage type: {}", + store + ); + test_admin_gc_missing_cases(&store).await; + } +} + +async fn test_admin_gc_missing_cases(store_type: &StorageType) { + let (test_context, _guard) = distributed_with_gc(store_type).await; + let instance = test_context.frontend(); + let metasrv = test_context.metasrv(); + + // Case 1: Invalid table name + let invalid_table_sql = "ADMIN GC_TABLE('no_such_table')"; + let invalid_table_result = try_execute_sql(&instance, invalid_table_sql).await; + assert!( + invalid_table_result.is_err(), + "Expected error for invalid table name" + ); + + // Case 2: Invalid region id (same table, non-existent region number) + let invalid_region_table = "test_admin_gc_invalid_region"; + create_append_table(&instance, invalid_region_table, None).await; + let table_id = get_table_id(&instance, invalid_region_table).await; + let region_ids = fetch_region_ids(&instance, invalid_region_table).await; + let base_region = RegionId::from_u64(*region_ids.first().expect("region id exists")); + let invalid_region = RegionId::new(table_id, base_region.region_number().saturating_add(100)); + let invalid_region_sql = format!("ADMIN GC_REGIONS({})", invalid_region.as_u64()); + let sst_before_invalid_region = list_sst_files(&test_context).await; + let invalid_region_result = try_execute_sql(&instance, &invalid_region_sql).await; + match invalid_region_result { + Ok(output) => { + let processed_regions = extract_u64_output(output.data).await; + assert_eq!(processed_regions, 1); + let sst_after_invalid_region = list_sst_files(&test_context).await; + assert_eq!( + sst_before_invalid_region.len(), + sst_after_invalid_region.len() + ); + } + Err(_) => { + let sst_after_invalid_region = list_sst_files(&test_context).await; + assert_eq!( + sst_before_invalid_region.len(), + sst_after_invalid_region.len() + ); + } + } + + // Case 3: No garbage to collect (idempotent) + let no_garbage_table = "test_admin_gc_no_garbage"; + create_append_table(&instance, no_garbage_table, None).await; + insert_and_flush(&instance, no_garbage_table, 1).await; + let sst_before_gc = list_sst_files(&test_context).await; + let no_garbage_gc_sql = format!("ADMIN GC_TABLE('{no_garbage_table}')"); + let no_garbage_output = execute_sql(&instance, &no_garbage_gc_sql).await; + let processed_regions = extract_u64_output(no_garbage_output.data).await; + assert_eq!(processed_regions, 1); + let sst_after_gc = list_sst_files(&test_context).await; + assert_eq!(sst_before_gc.len(), sst_after_gc.len()); + + // Case 4: Multi-region table, multi-region GC + let multi_region_table = "test_admin_gc_multi_region"; + let partition_clause = + Some("PARTITION ON COLUMNS (host) (host < 'm', host >= 'm')".to_string()); + create_append_table(&instance, multi_region_table, partition_clause.as_deref()).await; + insert_and_flush(&instance, multi_region_table, 4).await; + let sst_before_compaction = list_sst_files(&test_context).await; + let compact_sql = format!("ADMIN COMPACT_TABLE('{multi_region_table}')"); + execute_sql(&instance, &compact_sql).await; + tokio::time::sleep(Duration::from_secs(2)).await; + let sst_after_compaction = list_sst_files(&test_context).await; + assert!(sst_after_compaction.len() >= sst_before_compaction.len()); + let region_ids = fetch_region_ids(&instance, multi_region_table).await; + let gc_regions_sql = format!( + "ADMIN GC_REGIONS({})", + region_ids + .iter() + .map(|id| id.to_string()) + .collect::>() + .join(",") + ); + let gc_regions_output = execute_sql(&instance, &gc_regions_sql).await; + let processed_regions = extract_u64_output(gc_regions_output.data).await; + assert_eq!(processed_regions as usize, region_ids.len()); + let sst_after_gc = list_sst_files(&test_context).await; + assert!(sst_after_gc.len() < sst_after_compaction.len()); + + // Case 5: Manual GC on dropped region + let dropped_region_table = "test_admin_gc_dropped_region"; + create_append_table(&instance, dropped_region_table, partition_clause.as_deref()).await; + let dropped_table_id = get_table_id(&instance, dropped_region_table).await; + let (_routes, regions) = + super::get_table_route(metasrv.table_metadata_manager(), dropped_table_id).await; + let base_region = *regions.first().expect("table has at least one region"); + let dropped_region = RegionId::new( + dropped_table_id, + base_region.region_number().saturating_add(100), + ); + let dst_region = RegionId::new( + dropped_table_id, + base_region.region_number().saturating_add(200), + ); + let repart_mgr = metasrv.table_metadata_manager().table_repart_manager(); + let current = repart_mgr + .get_with_raw_bytes(dropped_table_id) + .await + .unwrap(); + let mut repart_value = TableRepartValue::new(); + repart_value.update_mappings(dropped_region, &[dst_region]); + repart_mgr + .upsert_value(dropped_table_id, current, &repart_value) + .await + .unwrap(); + let dropped_gc_sql = format!("ADMIN GC_REGIONS({})", dropped_region.as_u64()); + let dropped_gc_output = execute_sql(&instance, &dropped_gc_sql).await; + let processed_regions = extract_u64_output(dropped_gc_output.data).await; + assert_eq!(processed_regions, 1); + + // Case 6: Cooldown bypass for manual GC (should still process regions on immediate re-run) + let cooldown_region = *region_ids.first().expect("region id exists"); + let cooldown_gc_sql = format!("ADMIN GC_REGIONS({cooldown_region})"); + execute_sql(&instance, &cooldown_gc_sql).await; + let cooldown_gc_output = execute_sql(&instance, &cooldown_gc_sql).await; + let processed_regions = extract_u64_output(cooldown_gc_output.data).await; + assert_eq!(processed_regions, 1); +} + +async fn create_append_table( + instance: &std::sync::Arc, + table_name: &str, + partition_clause: Option<&str>, +) { + let partition_clause = partition_clause.unwrap_or(""); + let create_table_sql = format!( + "\ + CREATE TABLE {table_name} (\ + ts TIMESTAMP TIME INDEX,\ + val DOUBLE,\ + host STRING\ + ) {partition_clause} WITH (append_mode = 'true')\ + " + ); + execute_sql(instance, &create_table_sql).await; +} + +async fn insert_and_flush( + instance: &std::sync::Arc, + table_name: &str, + rounds: usize, +) { + for i in 0..rounds { + let insert_sql = format!( + "\ + INSERT INTO {table_name} (ts, val, host) VALUES\ + ('2023-01-0{} 10:00:00', {}, 'host{}'),\ + ('2023-01-0{} 11:00:00', {}, 'host{}'),\ + ('2023-01-0{} 12:00:00', {}, 'host{}')\ + ", + i + 1, + 10.0 + i as f64, + i, + i + 1, + 20.0 + i as f64, + i, + i + 1, + 30.0 + i as f64, + i + ); + execute_sql(instance, &insert_sql).await; + let flush_sql = format!("ADMIN FLUSH_TABLE('{table_name}')"); + execute_sql(instance, &flush_sql).await; + } +} + +async fn fetch_region_ids( + instance: &std::sync::Arc, + table_name: &str, +) -> Vec { + let region_id_sql = format!( + "SELECT greptime_partition_id FROM information_schema.partitions WHERE table_name = '{}' ORDER BY greptime_partition_id", + table_name + ); + let region_output = execute_sql(instance, ®ion_id_sql).await; + let OutputData::Stream(region_stream) = region_output.data else { + panic!("Expected stream output for region id query"); + }; + let batches = RecordBatches::try_collect(region_stream).await.unwrap(); + let mut region_ids = Vec::new(); + for batch in batches.iter() { + let column = batch.column(0); + let array = column.as_primitive::(); + for idx in 0..array.len() { + if array.is_valid(idx) { + region_ids.push(array.value(idx)); + } + } + } + region_ids +} + +async fn get_table_id( + instance: &std::sync::Arc, + table_name: &str, +) -> table::metadata::TableId { + let table = instance + .catalog_manager() + .table("greptime", "public", table_name, None) + .await + .unwrap() + .unwrap(); + table.table_info().table_id() +} + +async fn extract_u64_output(output: OutputData) -> u64 { + let recordbatches = match output { + OutputData::Stream(stream) => RecordBatches::try_collect(stream).await.unwrap(), + OutputData::RecordBatches(recordbatches) => recordbatches, + _ => panic!("Unexpected output type"), + }; + let batch = recordbatches.iter().next().expect("non-empty recordbatch"); + let column = batch.column(0); + let array = column.as_primitive::(); + array.value(0) +}