diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index e5f8595786..c1eb600fd8 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -420,13 +420,18 @@ where /// Instruction to get file references for specified regions. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct GetFileRefs { - /// List of region IDs to get file references for. - pub region_ids: Vec, + /// List of region IDs to get file references from active FileHandles (in-memory). + pub query_regions: Vec, + /// Mapping from the source region ID (where to read the manifest) to + /// the target region IDs (whose file references to look for). + /// Key: The region ID of the manifest. + /// Value: The list of region IDs to find references for in that manifest. + pub related_regions: HashMap>, } impl Display for GetFileRefs { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "GetFileRefs(region_ids={:?})", self.region_ids) + write!(f, "GetFileRefs(region_ids={:?})", self.query_regions) } } diff --git a/src/datanode/src/heartbeat/handler/file_ref.rs b/src/datanode/src/heartbeat/handler/file_ref.rs index 433970a7c4..4d2ac325a8 100644 --- a/src/datanode/src/heartbeat/handler/file_ref.rs +++ b/src/datanode/src/heartbeat/handler/file_ref.rs @@ -40,7 +40,7 @@ impl InstructionHandler for GetFileRefsHandler { })); }; match mito_engine - .get_snapshot_of_file_refs(get_file_refs.region_ids) + .get_snapshot_of_file_refs(get_file_refs.query_regions, get_file_refs.related_regions) .await { Ok(all_file_refs) => { diff --git a/src/meta-srv/src/gc/ctx.rs b/src/meta-srv/src/gc/ctx.rs index ca5fbafce3..848e7f2c28 100644 --- a/src/meta-srv/src/gc/ctx.rs +++ b/src/meta-srv/src/gc/ctx.rs @@ -50,7 +50,8 @@ pub(crate) trait SchedulerCtx: Send + Sync { async fn get_file_references( &self, - region_ids: &[RegionId], + query_regions: &[RegionId], + related_regions: HashMap>, region_routes: &Region2Peers, timeout: Duration, ) -> Result; @@ -168,24 +169,28 @@ impl SchedulerCtx for DefaultGcSchedulerCtx { async fn get_file_references( &self, - region_ids: &[RegionId], + query_regions: &[RegionId], + related_regions: HashMap>, region_routes: &Region2Peers, timeout: Duration, ) -> Result { - debug!("Getting file references for {} regions", region_ids.len()); + debug!( + "Getting file references for {} regions", + query_regions.len() + ); // Group regions by datanode to minimize RPC calls - let mut datanode_regions: HashMap> = HashMap::new(); + let mut datanode2query_regions: HashMap> = HashMap::new(); - for region_id in region_ids { + for region_id in query_regions { if let Some((leader, followers)) = region_routes.get(region_id) { - datanode_regions + datanode2query_regions .entry(leader.clone()) .or_default() .push(*region_id); // also need to send for follower regions for file refs in case query is running on follower for follower in followers { - datanode_regions + datanode2query_regions .entry(follower.clone()) .or_default() .push(*region_id); @@ -199,14 +204,25 @@ impl SchedulerCtx for DefaultGcSchedulerCtx { .fail(); } } + let mut datanode2related_regions: HashMap>> = + HashMap::new(); + for (related_region, queries) in related_regions { + if let Some((leader, followers)) = region_routes.get(&related_region) { + datanode2related_regions + .entry(leader.clone()) + .or_default() + .insert(related_region, queries.clone()); + } // since read from manifest, no need to send to followers + } // Send GetFileRefs instructions to each datanode let mut all_file_refs: HashMap> = HashMap::new(); let mut all_manifest_versions = HashMap::new(); - for (peer, regions) in datanode_regions { + for (peer, regions) in datanode2query_regions { + let related_regions = datanode2related_regions.remove(&peer).unwrap_or_default(); match self - .send_get_file_refs_instruction(&peer, ®ions, timeout) + .send_get_file_refs_instruction(&peer, ®ions, related_regions, timeout) .await { Ok(manifest) => { @@ -301,17 +317,19 @@ impl DefaultGcSchedulerCtx { async fn send_get_file_refs_instruction( &self, peer: &Peer, - region_ids: &[RegionId], + query_regions: &[RegionId], + related_regions: HashMap>, timeout: Duration, ) -> Result { debug!( "Sending GetFileRefs instruction to datanode {} for {} regions", peer, - region_ids.len() + query_regions.len() ); let instruction = Instruction::GetFileRefs(GetFileRefs { - region_ids: region_ids.to_vec(), + query_regions: query_regions.to_vec(), + related_regions, }); let reply = self diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index df7e11dbd3..456d33a7f5 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -292,19 +292,29 @@ impl MitoEngine { /// Get all tmp ref files for given region ids, excluding files that's already in manifest. pub async fn get_snapshot_of_file_refs( &self, - region_ids: impl IntoIterator, + file_handle_regions: impl IntoIterator, + manifest_regions: HashMap>, ) -> Result { let file_ref_mgr = self.file_ref_manager(); - let region_ids = region_ids.into_iter().collect::>(); + let file_handle_regions = file_handle_regions.into_iter().collect::>(); // Convert region IDs to MitoRegionRef objects, ignore regions that do not exist on current datanode // as regions on other datanodes are not managed by this engine. - let regions: Vec = region_ids + let query_regions: Vec = file_handle_regions .into_iter() .filter_map(|region_id| self.find_region(region_id)) .collect(); - file_ref_mgr.get_snapshot_of_file_refs(regions).await + let related_regions: Vec<(MitoRegionRef, Vec)> = manifest_regions + .into_iter() + .filter_map(|(related_region, queries)| { + self.find_region(related_region).map(|r| (r, queries)) + }) + .collect(); + + file_ref_mgr + .get_snapshot_of_file_refs(query_regions, related_regions) + .await } /// Returns true if the specific region exists. diff --git a/src/mito2/src/sst/file_ref.rs b/src/mito2/src/sst/file_ref.rs index f98b439879..a281aeb5fa 100644 --- a/src/mito2/src/sst/file_ref.rs +++ b/src/mito2/src/sst/file_ref.rs @@ -81,13 +81,15 @@ impl FileReferenceManager { } /// Gets all ref files for the given regions, meaning all open FileHandles for those regions - #[allow(unused)] + /// and from related regions' manifests. pub(crate) async fn get_snapshot_of_file_refs( &self, - regions: Vec, + query_regions: Vec, + related_regions: Vec<(MitoRegionRef, Vec)>, ) -> Result { let mut ref_files = HashMap::new(); - for region_id in regions.iter().map(|r| r.region_id()) { + // get from in memory file handles + for region_id in query_regions.iter().map(|r| r.region_id()) { if let Some(files) = self.ref_file_set(region_id) { ref_files.insert(region_id, files.into_iter().map(|f| f.file_id).collect()); } @@ -95,12 +97,27 @@ impl FileReferenceManager { let mut manifest_version = HashMap::new(); - for r in ®ions { + for r in &query_regions { let manifest = r.manifest_ctx.manifest().await; - let files = manifest.files.keys().cloned().collect::>(); manifest_version.insert(r.region_id(), manifest.manifest_version); } + // get file refs from related regions' manifests + for (related_region, queries) in &related_regions { + let queries = queries.iter().cloned().collect::>(); + let manifest = related_region.manifest_ctx.manifest().await; + for meta in manifest.files.values() { + if queries.contains(&meta.region_id) { + ref_files + .entry(meta.region_id) + .or_insert_with(HashSet::new) + .insert(meta.file_id); + } + } + // not sure if related region's manifest version is needed, but record it for now. + manifest_version.insert(related_region.region_id(), manifest.manifest_version); + } + // simply return all ref files, no manifest version filtering for now. Ok(FileRefsManifest { file_refs: ref_files,