fix: transitive closure for related regions

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-01-28 18:47:28 +08:00
parent 00f568ed28
commit 4ce38bca4e

View File

@@ -280,25 +280,40 @@ impl BatchGcProcedure {
/// The returned map uses the source regions (where those files originally came from) as the key,
/// and the destination regions (where files are currently stored) as the value.
/// If a region is not found in the repartition manager, the returned map still have this region as key,
/// just empty value
/// just empty value.
///
/// This function computes the transitive closure of the destination regions:
/// for each discovered dst_region, it continues to find get_dst_regions(dst_region)
/// until no new regions are discovered.
async fn find_related_regions(
&self,
regions: &[RegionId],
) -> Result<HashMap<RegionId, HashSet<RegionId>>> {
let repart_mgr = self.table_metadata_manager.table_repart_manager();
let mut related_regions: HashMap<RegionId, HashSet<RegionId>> = HashMap::new();
for src_region in regions {
// TODO(discord9): batch get
if let Some(dst_regions) = repart_mgr
.get_dst_regions(*src_region)
.await
.context(KvBackendSnafu)?
{
related_regions.insert(*src_region, dst_regions.into_iter().collect());
} else {
related_regions.insert(*src_region, Default::default());
let mut all_dst_regions: HashSet<RegionId> = HashSet::new();
let mut pending: Vec<RegionId> = vec![*src_region];
while let Some(current) = pending.pop() {
// TODO(discord9): batch get
if let Some(dst_regions) = repart_mgr
.get_dst_regions(current)
.await
.context(KvBackendSnafu)?
{
for dst in dst_regions {
if all_dst_regions.insert(dst) {
pending.push(dst);
}
}
}
}
related_regions.insert(*src_region, all_dst_regions);
}
Ok(related_regions)
}
@@ -385,7 +400,42 @@ impl BatchGcProcedure {
// have tmp refs are preserved; removing it would lose the repartition trace.
new_value.src_to_dst.insert(src_region, BTreeSet::new());
} else {
new_value.src_to_dst.remove(&src_region);
// Before removing a mapping, we must check if any transitive destination
// still has downstream mappings. This handles the chain case:
//
// Example: A → B → C (files moved from A to B, then from B to C)
// - Stored mappings: A → {B}, B → {C}
// - GC batch only includes [A]
// - A has no cross_refs or tmp_refs (appears clean)
//
// Problem: If we remove A → {B} now, future GC of A won't find C
// (the transitive destination where A's files may still exist).
//
// Solution: Check if any direct destination (B) still has its own
// downstream mapping (B → {C}). If so, the chain is still active
// and we must preserve A → {B} as a tombstone.
//
// Note: We only check direct destinations (not full transitive closure)
// because:
// 1. If B → {C} exists, B's cleanup will preserve it until C is clean
// 2. This creates a "domino effect" - each edge is preserved until
// its direct downstream is clean
// 3. Eventually the entire chain collapses when the leaf is cleaned
//
// We use `new_value.src_to_dst` (the in-progress state) to check if
// a dst region still has downstream mappings in the current table.
let direct_dsts = new_value.src_to_dst.get(&src_region);
let any_dst_has_downstream = direct_dsts
.map(|dsts| {
dsts.iter()
.any(|dst| new_value.src_to_dst.contains_key(dst))
})
.unwrap_or(false);
if !any_dst_has_downstream {
new_value.src_to_dst.remove(&src_region);
}
// Otherwise keep original mapping (e.g., A → {B}) unchanged
}
}