mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-14 20:10:37 +00:00
@@ -386,6 +386,11 @@ impl BatchGcProcedure {
|
||||
.map(|v| (**v).clone())
|
||||
.unwrap_or_else(TableRepartValue::new);
|
||||
|
||||
let original_src_to_dst = current
|
||||
.as_ref()
|
||||
.map(|v| v.src_to_dst.clone())
|
||||
.unwrap_or_default();
|
||||
|
||||
// We only touch regions involved in this GC batch for the current table to avoid
|
||||
// clobbering unrelated repart entries. Start from the batch regions of this table.
|
||||
let batch_src_regions: HashSet<RegionId> = self
|
||||
@@ -436,14 +441,13 @@ impl BatchGcProcedure {
|
||||
// its direct downstream is clean
|
||||
// 3. Eventually the entire chain collapses when the leaf is cleaned
|
||||
//
|
||||
// We use `new_value.src_to_dst` (the in-progress state) to check if
|
||||
// a dst region still has downstream mappings in the current table.
|
||||
let direct_dsts = new_value.src_to_dst.get(&src_region);
|
||||
// We use `original_src_to_dst` (the snapshot before any modifications) to
|
||||
// check if a dst region still has downstream mappings. This ensures the
|
||||
// check is order-independent: even if B is processed before A and removed
|
||||
// from new_value, A's check still sees B in the original snapshot.
|
||||
let direct_dsts = original_src_to_dst.get(&src_region);
|
||||
let any_dst_has_downstream = direct_dsts
|
||||
.map(|dsts| {
|
||||
dsts.iter()
|
||||
.any(|dst| new_value.src_to_dst.contains_key(dst))
|
||||
})
|
||||
.map(|dsts| dsts.iter().any(|dst| original_src_to_dst.contains_key(dst)))
|
||||
.unwrap_or(false);
|
||||
|
||||
if !any_dst_has_downstream {
|
||||
|
||||
@@ -284,3 +284,82 @@ async fn test_cleanup_region_repartition_remove_when_tmp_refs_empty() {
|
||||
Some(&BTreeSet::from([dst_b_initial]))
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_cleanup_region_repartition_transitive_chain() {
|
||||
let _ = dotenv::dotenv();
|
||||
let (test_context, _guard) = distributed_with_gc(&StorageType::File).await;
|
||||
let instance = test_context.frontend();
|
||||
let metasrv = test_context.metasrv();
|
||||
|
||||
let create_table_sql = r#"
|
||||
CREATE TABLE test_cleanup_repartition_chain (
|
||||
ts TIMESTAMP TIME INDEX,
|
||||
val DOUBLE,
|
||||
host STRING
|
||||
)PARTITION ON COLUMNS (host) (
|
||||
host < 'a',
|
||||
host >= 'a' AND host < 'm',
|
||||
host >= 'm'
|
||||
) WITH (append_mode = 'true')
|
||||
"#;
|
||||
execute_sql(&instance, create_table_sql).await;
|
||||
|
||||
let table = instance
|
||||
.catalog_manager()
|
||||
.table(
|
||||
"greptime",
|
||||
"public",
|
||||
"test_cleanup_repartition_chain",
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let table_id = table.table_info().table_id();
|
||||
|
||||
let (_routes, regions) = get_table_route(metasrv.table_metadata_manager(), table_id).await;
|
||||
let base_region = *regions.first().expect("table has at least one region");
|
||||
|
||||
let region_a = base_region;
|
||||
let region_b = RegionId::new(table_id, base_region.region_number() + 1);
|
||||
let region_c = RegionId::new(table_id, base_region.region_number() + 2);
|
||||
|
||||
let repart_mgr = metasrv.table_metadata_manager().table_repart_manager();
|
||||
let current = repart_mgr.get_with_raw_bytes(table_id).await.unwrap();
|
||||
let mut initial_value = TableRepartValue::new();
|
||||
initial_value.update_mappings(region_a, &[region_b]);
|
||||
initial_value.update_mappings(region_b, &[region_c]);
|
||||
repart_mgr
|
||||
.upsert_value(table_id, current, &initial_value)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let manifest = FileRefsManifest::default();
|
||||
|
||||
let regions_to_gc = vec![region_a, region_b];
|
||||
|
||||
let mut procedure = BatchGcProcedure::new_update_repartition_for_test(
|
||||
metasrv.mailbox().clone(),
|
||||
metasrv.table_metadata_manager().clone(),
|
||||
metasrv.options().grpc.server_addr.clone(),
|
||||
regions_to_gc,
|
||||
manifest,
|
||||
Duration::from_secs(5),
|
||||
);
|
||||
|
||||
let procedure_ctx = new_test_procedure_context();
|
||||
let status = procedure.execute(&procedure_ctx).await.unwrap();
|
||||
assert!(matches!(status, Status::Done { .. }));
|
||||
|
||||
let repart_after = repart_mgr.get(table_id).await.unwrap().unwrap();
|
||||
assert_eq!(
|
||||
repart_after.src_to_dst.get(®ion_a),
|
||||
Some(&BTreeSet::from([region_b])),
|
||||
"A → {{B}} should be preserved because B had downstream (domino effect)"
|
||||
);
|
||||
assert!(
|
||||
!repart_after.src_to_dst.contains_key(®ion_b),
|
||||
"B → {{C}} should be removed because C has no downstream"
|
||||
);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user