test: non happy path

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-02-05 16:10:04 +08:00
parent 4dc9cd2e74
commit 91c6be273c
8 changed files with 345 additions and 116 deletions

View File

@@ -102,17 +102,10 @@ impl InstructionHandler for GcRegionsHandler {
reports.push(report);
}
// Merge reports
let mut merged_report = GcReport::default();
for report in reports {
merged_report
.deleted_files
.extend(report.deleted_files.into_iter());
merged_report
.deleted_indexes
.extend(report.deleted_indexes.into_iter());
}
Ok(merged_report)
// Merge reports
let mut merged_report = GcReport::default();
for report in reports {
merged_report.merge(report);
}
.instrument(common_telemetry::tracing::info_span!("gc_worker_run")),
),

View File

@@ -50,13 +50,16 @@ pub const TEST_REGION_SIZE_200MB: u64 = 200_000_000;
/// Helper function to create an empty GcReport for the given region IDs
pub fn new_empty_report_with(region_ids: impl IntoIterator<Item = RegionId>) -> GcReport {
let mut deleted_files = HashMap::new();
let mut processed_regions = HashSet::new();
for region_id in region_ids {
deleted_files.insert(region_id, vec![]);
processed_regions.insert(region_id);
}
GcReport {
deleted_files,
deleted_indexes: HashMap::new(),
need_retry_regions: HashSet::new(),
processed_regions,
}
}

View File

@@ -197,76 +197,73 @@ impl GcScheduler {
) -> Result<GcJobReport> {
info!("Start to handle manual gc request");
let report = if let Some(regions) = region_ids {
let full_listing = full_file_listing.unwrap_or(false);
let gc_timeout = timeout.unwrap_or(self.config.mailbox_timeout);
let mut dropped_regions = Vec::new();
let mut active_regions = Vec::new();
let mut dropped_routes_override = Region2Peers::new();
// No specific regions, use default tick behavior
let Some(regions) = region_ids else {
let report = self.trigger_gc().await?;
info!("Finished manual gc request");
return Ok(report);
};
if !regions.is_empty() {
let region_set: HashSet<RegionId> = regions.iter().copied().collect();
let table_reparts = self.ctx.get_table_reparts().await?;
let dropped_collector = DroppedRegionCollector::new(
self.ctx.as_ref(),
&self.config,
&self.region_gc_tracker,
);
let dropped_assignment = dropped_collector
.collect_and_assign_with_cooldown(&table_reparts, false)
.await?;
// Empty regions list, return empty report
if regions.is_empty() {
info!("Finished manual gc request");
return Ok(GcJobReport::default());
}
let mut dropped_region_set = HashSet::new();
for (_peer, overrides) in dropped_assignment.region_routes_override {
for (region_id, route) in overrides {
if region_set.contains(&region_id) {
dropped_region_set.insert(region_id);
dropped_routes_override.insert(region_id, route);
}
}
}
let full_listing = full_file_listing.unwrap_or(false);
let gc_timeout = timeout.unwrap_or(self.config.mailbox_timeout);
for region_id in regions {
if dropped_region_set.contains(&region_id) {
dropped_regions.push(region_id);
} else {
active_regions.push(region_id);
}
let region_set: HashSet<RegionId> = regions.iter().copied().collect();
let table_reparts = self.ctx.get_table_reparts().await?;
let dropped_collector =
DroppedRegionCollector::new(self.ctx.as_ref(), &self.config, &self.region_gc_tracker);
let dropped_assignment = dropped_collector
.collect_and_assign_with_cooldown(&table_reparts, false)
.await?;
let mut dropped_region_set = HashSet::new();
let mut dropped_routes_override = Region2Peers::new();
for overrides in dropped_assignment.region_routes_override.into_values() {
for (region_id, route) in overrides {
if region_set.contains(&region_id) {
dropped_region_set.insert(region_id);
dropped_routes_override.insert(region_id, route);
}
}
}
let mut combined_report = GcReport::default();
let (dropped_regions, active_regions): (Vec<_>, Vec<_>) = regions
.into_iter()
.partition(|region_id| dropped_region_set.contains(region_id));
if !active_regions.is_empty() {
let report = self
.ctx
.gc_regions(
&active_regions,
full_listing,
gc_timeout,
Region2Peers::new(),
)
.await?;
combined_report.merge(report);
}
let mut combined_report = GcReport::default();
if !dropped_regions.is_empty() {
let report = self
.ctx
.gc_regions(&dropped_regions, true, gc_timeout, dropped_routes_override)
.await?;
combined_report.merge(report);
}
if !active_regions.is_empty() {
let report = self
.ctx
.gc_regions(
&active_regions,
full_listing,
gc_timeout,
Region2Peers::new(),
)
.await?;
combined_report.merge(report);
}
let mut per_datanode_reports = HashMap::new();
per_datanode_reports.insert(0, combined_report);
GcJobReport {
per_datanode_reports,
failed_datanodes: HashMap::new(),
}
} else {
// No specific regions, use default tick behavior
self.trigger_gc().await?
if !dropped_regions.is_empty() {
let report = self
.ctx
.gc_regions(&dropped_regions, true, gc_timeout, dropped_routes_override)
.await?;
combined_report.merge(report);
}
let mut per_datanode_reports = HashMap::new();
per_datanode_reports.insert(0, combined_report);
let report = GcJobReport {
per_datanode_reports,
failed_datanodes: HashMap::new(),
};
info!("Finished manual gc request");

View File

@@ -310,39 +310,8 @@ impl Metasrv {
.into_iter()
.map(RegionId::from_u64)
.collect();
// Use GcTickerRef to trigger manual GC
let gc_ticker = self.gc_ticker().context(error::UnexpectedSnafu {
violated: "GC ticker not available".to_string(),
})?;
let (tx, rx) = tokio::sync::oneshot::channel();
gc_ticker
.sender
.send(gc::Event::Manually {
sender: tx,
region_ids: Some(region_ids.clone()),
full_file_listing: Some(request.full_file_listing),
timeout: Some(request.timeout),
})
self.trigger_gc_for_regions(region_ids, request.full_file_listing, request.timeout)
.await
.map_err(|_| {
error::UnexpectedSnafu {
violated: "Failed to send GC event".to_string(),
}
.build()
})?;
let job_report = rx.await.map_err(|_| {
error::UnexpectedSnafu {
violated: "GC job channel closed unexpectedly".to_string(),
}
.build()
})?;
let report = gc_job_report_to_gc_report(job_report);
Ok(gc_report_to_response(&report, region_ids.len() as u64))
}
async fn handle_gc_table(&self, request: MetaGcTableRequest) -> error::Result<GcResponse> {
@@ -370,8 +339,17 @@ impl Metasrv {
.context(TableMetadataManagerSnafu)?;
let region_ids: Vec<RegionId> = route.region_routes.iter().map(|r| r.region.id).collect();
self.trigger_gc_for_regions(region_ids, request.full_file_listing, request.timeout)
.await
}
// Use GcTickerRef to trigger manual GC
/// Triggers manual GC for specified regions and returns the GC response.
async fn trigger_gc_for_regions(
&self,
region_ids: Vec<RegionId>,
full_file_listing: bool,
timeout: Duration,
) -> error::Result<GcResponse> {
let gc_ticker = self.gc_ticker().context(error::UnexpectedSnafu {
violated: "GC ticker not available".to_string(),
})?;
@@ -381,9 +359,9 @@ impl Metasrv {
.sender
.send(gc::Event::Manually {
sender: tx,
region_ids: Some(region_ids.clone()),
full_file_listing: Some(request.full_file_listing),
timeout: Some(request.timeout),
region_ids: Some(region_ids),
full_file_listing: Some(full_file_listing),
timeout: Some(timeout),
})
.await
.map_err(|_| {
@@ -402,7 +380,7 @@ impl Metasrv {
let report = gc_job_report_to_gc_report(job_report);
Ok(gc_report_to_response(&report, region_ids.len() as u64))
Ok(gc_report_to_response(&report))
}
}
@@ -415,10 +393,7 @@ fn gc_job_report_to_gc_report(job_report: crate::gc::GcJobReport) -> store_api::
gc_report
}
fn gc_report_to_response(
report: &store_api::storage::GcReport,
processed_regions: u64,
) -> GcResponse {
fn gc_report_to_response(report: &store_api::storage::GcReport) -> GcResponse {
let deleted_files = report.deleted_files.values().map(|v| v.len() as u64).sum();
let deleted_indexes = report
.deleted_indexes
@@ -426,7 +401,7 @@ fn gc_report_to_response(
.map(|v| v.len() as u64)
.sum();
GcResponse {
processed_regions,
processed_regions: report.processed_regions.len() as u64,
need_retry_regions: report
.need_retry_regions
.iter()

View File

@@ -282,6 +282,7 @@ impl LocalGcWorker {
let mut deleted_files = HashMap::new();
let mut deleted_indexes = HashMap::new();
let mut processed_regions = HashSet::new();
let tmp_ref_files = self.read_tmp_ref_files().await?;
for (region_id, region) in &self.regions {
let per_region_time = std::time::Instant::now();
@@ -309,6 +310,7 @@ impl LocalGcWorker {
.collect_vec();
deleted_files.insert(*region_id, files.into_iter().map(|f| f.file_id()).collect());
deleted_indexes.insert(*region_id, index_files);
processed_regions.insert(*region_id);
debug!(
"GC for region {} took {} secs.",
region_id,
@@ -323,6 +325,7 @@ impl LocalGcWorker {
deleted_files,
deleted_indexes,
need_retry_regions: HashSet::new(),
processed_regions,
};
Ok(report)
}

View File

@@ -114,6 +114,8 @@ pub struct GcReport {
pub deleted_indexes: HashMap<RegionId, Vec<(FileId, IndexVersion)>>,
/// Regions that need retry in next gc round, usually because their tmp ref files are outdated
pub need_retry_regions: HashSet<RegionId>,
/// Regions successfully processed in this GC run
pub processed_regions: HashSet<RegionId>,
}
impl GcReport {
@@ -126,6 +128,7 @@ impl GcReport {
deleted_files,
deleted_indexes,
need_retry_regions,
processed_regions: HashSet::new(),
}
}
@@ -139,7 +142,17 @@ impl GcReport {
);
*self_files = dedup.into_iter().collect();
}
for (region, files) in other.deleted_indexes {
let self_files = self.deleted_indexes.entry(region).or_default();
let dedup: HashSet<(FileId, IndexVersion)> = HashSet::from_iter(
std::mem::take(self_files)
.into_iter()
.chain(files.iter().cloned()),
);
*self_files = dedup.into_iter().collect();
}
self.need_retry_regions.extend(other.need_retry_regions);
self.processed_regions.extend(other.processed_regions);
// Remove regions that have succeeded from need_retry_regions
self.need_retry_regions
.retain(|region| !self.deleted_files.contains_key(region));

View File

@@ -927,6 +927,15 @@ pub async fn execute_sql(instance: &Arc<Instance>, sql: &str) -> Output {
.unwrap()
}
pub async fn try_execute_sql(
instance: &Arc<Instance>,
sql: &str,
) -> servers::error::Result<Output> {
SqlQueryHandler::do_query(instance.as_ref(), sql, QueryContext::arc())
.await
.remove(0)
}
pub async fn execute_sql_and_expect(instance: &Arc<Instance>, sql: &str, expected: &str) {
let output = execute_sql(instance, sql).await;
let output = output.data.pretty_print().await;

View File

@@ -17,14 +17,16 @@
use std::time::Duration;
use client::OutputData;
use common_meta::key::table_repart::TableRepartValue;
use common_recordbatch::RecordBatches;
use common_telemetry::info;
use common_test_util::recordbatch::check_output_stream;
use datatypes::arrow::array::AsArray;
use datatypes::arrow::array::{Array, AsArray};
use datatypes::arrow::datatypes::UInt64Type;
use store_api::storage::RegionId;
use super::{distributed_with_gc, list_sst_files};
use crate::test_util::{StorageType, execute_sql};
use crate::test_util::{StorageType, execute_sql, try_execute_sql};
#[tokio::test]
async fn test_admin_gc_table_different_store() {
@@ -289,3 +291,237 @@ async fn test_admin_gc_regions(store_type: &StorageType) {
info!("ADMIN GC_REGIONS test completed successfully");
}
#[tokio::test]
async fn test_admin_gc_missing_cases_different_store() {
let _ = dotenv::dotenv();
common_telemetry::init_default_ut_logging();
let store_type = StorageType::build_storage_types_based_on_env();
info!("store type: {:?}", store_type);
for store in store_type {
info!(
"Running admin GC missing cases test with storage type: {}",
store
);
test_admin_gc_missing_cases(&store).await;
}
}
async fn test_admin_gc_missing_cases(store_type: &StorageType) {
let (test_context, _guard) = distributed_with_gc(store_type).await;
let instance = test_context.frontend();
let metasrv = test_context.metasrv();
// Case 1: Invalid table name
let invalid_table_sql = "ADMIN GC_TABLE('no_such_table')";
let invalid_table_result = try_execute_sql(&instance, invalid_table_sql).await;
assert!(
invalid_table_result.is_err(),
"Expected error for invalid table name"
);
// Case 2: Invalid region id (same table, non-existent region number)
let invalid_region_table = "test_admin_gc_invalid_region";
create_append_table(&instance, invalid_region_table, None).await;
let table_id = get_table_id(&instance, invalid_region_table).await;
let region_ids = fetch_region_ids(&instance, invalid_region_table).await;
let base_region = RegionId::from_u64(*region_ids.first().expect("region id exists"));
let invalid_region = RegionId::new(table_id, base_region.region_number().saturating_add(100));
let invalid_region_sql = format!("ADMIN GC_REGIONS({})", invalid_region.as_u64());
let sst_before_invalid_region = list_sst_files(&test_context).await;
let invalid_region_result = try_execute_sql(&instance, &invalid_region_sql).await;
match invalid_region_result {
Ok(output) => {
let processed_regions = extract_u64_output(output.data).await;
assert_eq!(processed_regions, 1);
let sst_after_invalid_region = list_sst_files(&test_context).await;
assert_eq!(
sst_before_invalid_region.len(),
sst_after_invalid_region.len()
);
}
Err(_) => {
let sst_after_invalid_region = list_sst_files(&test_context).await;
assert_eq!(
sst_before_invalid_region.len(),
sst_after_invalid_region.len()
);
}
}
// Case 3: No garbage to collect (idempotent)
let no_garbage_table = "test_admin_gc_no_garbage";
create_append_table(&instance, no_garbage_table, None).await;
insert_and_flush(&instance, no_garbage_table, 1).await;
let sst_before_gc = list_sst_files(&test_context).await;
let no_garbage_gc_sql = format!("ADMIN GC_TABLE('{no_garbage_table}')");
let no_garbage_output = execute_sql(&instance, &no_garbage_gc_sql).await;
let processed_regions = extract_u64_output(no_garbage_output.data).await;
assert_eq!(processed_regions, 1);
let sst_after_gc = list_sst_files(&test_context).await;
assert_eq!(sst_before_gc.len(), sst_after_gc.len());
// Case 4: Multi-region table, multi-region GC
let multi_region_table = "test_admin_gc_multi_region";
let partition_clause =
Some("PARTITION ON COLUMNS (host) (host < 'm', host >= 'm')".to_string());
create_append_table(&instance, multi_region_table, partition_clause.as_deref()).await;
insert_and_flush(&instance, multi_region_table, 4).await;
let sst_before_compaction = list_sst_files(&test_context).await;
let compact_sql = format!("ADMIN COMPACT_TABLE('{multi_region_table}')");
execute_sql(&instance, &compact_sql).await;
tokio::time::sleep(Duration::from_secs(2)).await;
let sst_after_compaction = list_sst_files(&test_context).await;
assert!(sst_after_compaction.len() >= sst_before_compaction.len());
let region_ids = fetch_region_ids(&instance, multi_region_table).await;
let gc_regions_sql = format!(
"ADMIN GC_REGIONS({})",
region_ids
.iter()
.map(|id| id.to_string())
.collect::<Vec<_>>()
.join(",")
);
let gc_regions_output = execute_sql(&instance, &gc_regions_sql).await;
let processed_regions = extract_u64_output(gc_regions_output.data).await;
assert_eq!(processed_regions as usize, region_ids.len());
let sst_after_gc = list_sst_files(&test_context).await;
assert!(sst_after_gc.len() < sst_after_compaction.len());
// Case 5: Manual GC on dropped region
let dropped_region_table = "test_admin_gc_dropped_region";
create_append_table(&instance, dropped_region_table, partition_clause.as_deref()).await;
let dropped_table_id = get_table_id(&instance, dropped_region_table).await;
let (_routes, regions) =
super::get_table_route(metasrv.table_metadata_manager(), dropped_table_id).await;
let base_region = *regions.first().expect("table has at least one region");
let dropped_region = RegionId::new(
dropped_table_id,
base_region.region_number().saturating_add(100),
);
let dst_region = RegionId::new(
dropped_table_id,
base_region.region_number().saturating_add(200),
);
let repart_mgr = metasrv.table_metadata_manager().table_repart_manager();
let current = repart_mgr
.get_with_raw_bytes(dropped_table_id)
.await
.unwrap();
let mut repart_value = TableRepartValue::new();
repart_value.update_mappings(dropped_region, &[dst_region]);
repart_mgr
.upsert_value(dropped_table_id, current, &repart_value)
.await
.unwrap();
let dropped_gc_sql = format!("ADMIN GC_REGIONS({})", dropped_region.as_u64());
let dropped_gc_output = execute_sql(&instance, &dropped_gc_sql).await;
let processed_regions = extract_u64_output(dropped_gc_output.data).await;
assert_eq!(processed_regions, 1);
// Case 6: Cooldown bypass for manual GC (should still process regions on immediate re-run)
let cooldown_region = *region_ids.first().expect("region id exists");
let cooldown_gc_sql = format!("ADMIN GC_REGIONS({cooldown_region})");
execute_sql(&instance, &cooldown_gc_sql).await;
let cooldown_gc_output = execute_sql(&instance, &cooldown_gc_sql).await;
let processed_regions = extract_u64_output(cooldown_gc_output.data).await;
assert_eq!(processed_regions, 1);
}
async fn create_append_table(
instance: &std::sync::Arc<frontend::instance::Instance>,
table_name: &str,
partition_clause: Option<&str>,
) {
let partition_clause = partition_clause.unwrap_or("");
let create_table_sql = format!(
"\
CREATE TABLE {table_name} (\
ts TIMESTAMP TIME INDEX,\
val DOUBLE,\
host STRING\
) {partition_clause} WITH (append_mode = 'true')\
"
);
execute_sql(instance, &create_table_sql).await;
}
async fn insert_and_flush(
instance: &std::sync::Arc<frontend::instance::Instance>,
table_name: &str,
rounds: usize,
) {
for i in 0..rounds {
let insert_sql = format!(
"\
INSERT INTO {table_name} (ts, val, host) VALUES\
('2023-01-0{} 10:00:00', {}, 'host{}'),\
('2023-01-0{} 11:00:00', {}, 'host{}'),\
('2023-01-0{} 12:00:00', {}, 'host{}')\
",
i + 1,
10.0 + i as f64,
i,
i + 1,
20.0 + i as f64,
i,
i + 1,
30.0 + i as f64,
i
);
execute_sql(instance, &insert_sql).await;
let flush_sql = format!("ADMIN FLUSH_TABLE('{table_name}')");
execute_sql(instance, &flush_sql).await;
}
}
async fn fetch_region_ids(
instance: &std::sync::Arc<frontend::instance::Instance>,
table_name: &str,
) -> Vec<u64> {
let region_id_sql = format!(
"SELECT greptime_partition_id FROM information_schema.partitions WHERE table_name = '{}' ORDER BY greptime_partition_id",
table_name
);
let region_output = execute_sql(instance, &region_id_sql).await;
let OutputData::Stream(region_stream) = region_output.data else {
panic!("Expected stream output for region id query");
};
let batches = RecordBatches::try_collect(region_stream).await.unwrap();
let mut region_ids = Vec::new();
for batch in batches.iter() {
let column = batch.column(0);
let array = column.as_primitive::<UInt64Type>();
for idx in 0..array.len() {
if array.is_valid(idx) {
region_ids.push(array.value(idx));
}
}
}
region_ids
}
async fn get_table_id(
instance: &std::sync::Arc<frontend::instance::Instance>,
table_name: &str,
) -> table::metadata::TableId {
let table = instance
.catalog_manager()
.table("greptime", "public", table_name, None)
.await
.unwrap()
.unwrap();
table.table_info().table_id()
}
async fn extract_u64_output(output: OutputData) -> u64 {
let recordbatches = match output {
OutputData::Stream(stream) => RecordBatches::try_collect(stream).await.unwrap(),
OutputData::RecordBatches(recordbatches) => recordbatches,
_ => panic!("Unexpected output type"),
};
let batch = recordbatches.iter().next().expect("non-empty recordbatch");
let column = batch.column(0);
let array = column.as_primitive::<UInt64Type>();
array.value(0)
}