feat: gc get ref from manifest (#7260)

feat: get file ref from other manifest

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2025-11-19 20:13:28 +08:00
committed by GitHub
parent e59612043d
commit 0cee4fa115
5 changed files with 75 additions and 25 deletions

View File

@@ -420,13 +420,18 @@ where
/// Instruction to get file references for specified regions.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct GetFileRefs {
/// List of region IDs to get file references for.
pub region_ids: Vec<RegionId>,
/// List of region IDs to get file references from active FileHandles (in-memory).
pub query_regions: Vec<RegionId>,
/// Mapping from the source region ID (where to read the manifest) to
/// the target region IDs (whose file references to look for).
/// Key: The region ID of the manifest.
/// Value: The list of region IDs to find references for in that manifest.
pub related_regions: HashMap<RegionId, Vec<RegionId>>,
}
impl Display for GetFileRefs {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "GetFileRefs(region_ids={:?})", self.region_ids)
write!(f, "GetFileRefs(region_ids={:?})", self.query_regions)
}
}

View File

@@ -40,7 +40,7 @@ impl InstructionHandler for GetFileRefsHandler {
}));
};
match mito_engine
.get_snapshot_of_file_refs(get_file_refs.region_ids)
.get_snapshot_of_file_refs(get_file_refs.query_regions, get_file_refs.related_regions)
.await
{
Ok(all_file_refs) => {

View File

@@ -50,7 +50,8 @@ pub(crate) trait SchedulerCtx: Send + Sync {
async fn get_file_references(
&self,
region_ids: &[RegionId],
query_regions: &[RegionId],
related_regions: HashMap<RegionId, Vec<RegionId>>,
region_routes: &Region2Peers,
timeout: Duration,
) -> Result<FileRefsManifest>;
@@ -168,24 +169,28 @@ impl SchedulerCtx for DefaultGcSchedulerCtx {
async fn get_file_references(
&self,
region_ids: &[RegionId],
query_regions: &[RegionId],
related_regions: HashMap<RegionId, Vec<RegionId>>,
region_routes: &Region2Peers,
timeout: Duration,
) -> Result<FileRefsManifest> {
debug!("Getting file references for {} regions", region_ids.len());
debug!(
"Getting file references for {} regions",
query_regions.len()
);
// Group regions by datanode to minimize RPC calls
let mut datanode_regions: HashMap<Peer, Vec<RegionId>> = HashMap::new();
let mut datanode2query_regions: HashMap<Peer, Vec<RegionId>> = HashMap::new();
for region_id in region_ids {
for region_id in query_regions {
if let Some((leader, followers)) = region_routes.get(region_id) {
datanode_regions
datanode2query_regions
.entry(leader.clone())
.or_default()
.push(*region_id);
// also need to send for follower regions for file refs in case query is running on follower
for follower in followers {
datanode_regions
datanode2query_regions
.entry(follower.clone())
.or_default()
.push(*region_id);
@@ -199,14 +204,25 @@ impl SchedulerCtx for DefaultGcSchedulerCtx {
.fail();
}
}
let mut datanode2related_regions: HashMap<Peer, HashMap<RegionId, Vec<RegionId>>> =
HashMap::new();
for (related_region, queries) in related_regions {
if let Some((leader, followers)) = region_routes.get(&related_region) {
datanode2related_regions
.entry(leader.clone())
.or_default()
.insert(related_region, queries.clone());
} // since read from manifest, no need to send to followers
}
// Send GetFileRefs instructions to each datanode
let mut all_file_refs: HashMap<RegionId, HashSet<FileId>> = HashMap::new();
let mut all_manifest_versions = HashMap::new();
for (peer, regions) in datanode_regions {
for (peer, regions) in datanode2query_regions {
let related_regions = datanode2related_regions.remove(&peer).unwrap_or_default();
match self
.send_get_file_refs_instruction(&peer, &regions, timeout)
.send_get_file_refs_instruction(&peer, &regions, related_regions, timeout)
.await
{
Ok(manifest) => {
@@ -301,17 +317,19 @@ impl DefaultGcSchedulerCtx {
async fn send_get_file_refs_instruction(
&self,
peer: &Peer,
region_ids: &[RegionId],
query_regions: &[RegionId],
related_regions: HashMap<RegionId, Vec<RegionId>>,
timeout: Duration,
) -> Result<FileRefsManifest> {
debug!(
"Sending GetFileRefs instruction to datanode {} for {} regions",
peer,
region_ids.len()
query_regions.len()
);
let instruction = Instruction::GetFileRefs(GetFileRefs {
region_ids: region_ids.to_vec(),
query_regions: query_regions.to_vec(),
related_regions,
});
let reply = self

View File

@@ -292,19 +292,29 @@ impl MitoEngine {
/// Get all tmp ref files for given region ids, excluding files that's already in manifest.
pub async fn get_snapshot_of_file_refs(
&self,
region_ids: impl IntoIterator<Item = RegionId>,
file_handle_regions: impl IntoIterator<Item = RegionId>,
manifest_regions: HashMap<RegionId, Vec<RegionId>>,
) -> Result<FileRefsManifest> {
let file_ref_mgr = self.file_ref_manager();
let region_ids = region_ids.into_iter().collect::<Vec<_>>();
let file_handle_regions = file_handle_regions.into_iter().collect::<Vec<_>>();
// Convert region IDs to MitoRegionRef objects, ignore regions that do not exist on current datanode
// as regions on other datanodes are not managed by this engine.
let regions: Vec<MitoRegionRef> = region_ids
let query_regions: Vec<MitoRegionRef> = file_handle_regions
.into_iter()
.filter_map(|region_id| self.find_region(region_id))
.collect();
file_ref_mgr.get_snapshot_of_file_refs(regions).await
let related_regions: Vec<(MitoRegionRef, Vec<RegionId>)> = manifest_regions
.into_iter()
.filter_map(|(related_region, queries)| {
self.find_region(related_region).map(|r| (r, queries))
})
.collect();
file_ref_mgr
.get_snapshot_of_file_refs(query_regions, related_regions)
.await
}
/// Returns true if the specific region exists.

View File

@@ -81,13 +81,15 @@ impl FileReferenceManager {
}
/// Gets all ref files for the given regions, meaning all open FileHandles for those regions
#[allow(unused)]
/// and from related regions' manifests.
pub(crate) async fn get_snapshot_of_file_refs(
&self,
regions: Vec<MitoRegionRef>,
query_regions: Vec<MitoRegionRef>,
related_regions: Vec<(MitoRegionRef, Vec<RegionId>)>,
) -> Result<FileRefsManifest> {
let mut ref_files = HashMap::new();
for region_id in regions.iter().map(|r| r.region_id()) {
// get from in memory file handles
for region_id in query_regions.iter().map(|r| r.region_id()) {
if let Some(files) = self.ref_file_set(region_id) {
ref_files.insert(region_id, files.into_iter().map(|f| f.file_id).collect());
}
@@ -95,12 +97,27 @@ impl FileReferenceManager {
let mut manifest_version = HashMap::new();
for r in &regions {
for r in &query_regions {
let manifest = r.manifest_ctx.manifest().await;
let files = manifest.files.keys().cloned().collect::<Vec<_>>();
manifest_version.insert(r.region_id(), manifest.manifest_version);
}
// get file refs from related regions' manifests
for (related_region, queries) in &related_regions {
let queries = queries.iter().cloned().collect::<HashSet<_>>();
let manifest = related_region.manifest_ctx.manifest().await;
for meta in manifest.files.values() {
if queries.contains(&meta.region_id) {
ref_files
.entry(meta.region_id)
.or_insert_with(HashSet::new)
.insert(meta.file_id);
}
}
// not sure if related region's manifest version is needed, but record it for now.
manifest_version.insert(related_region.region_id(), manifest.manifest_version);
}
// simply return all ref files, no manifest version filtering for now.
Ok(FileRefsManifest {
file_refs: ref_files,