diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index d581e3a295..ef7a79c61d 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fmt::{Display, Formatter}; use std::time::Duration; @@ -432,11 +432,11 @@ where pub struct GetFileRefs { /// List of region IDs to get file references from active FileHandles (in-memory). pub query_regions: Vec, - /// Mapping from the source region ID (where to read the manifest) to - /// the target region IDs (whose file references to look for). - /// Key: The region ID of the manifest. - /// Value: The list of region IDs to find references for in that manifest. - pub related_regions: HashMap>, + /// Mapping from the src region IDs (whose file references to look for) to + /// the dst region IDs (where to read the manifests). + /// Key: The source region IDs (where files originally came from). + /// Value: The set of destination region IDs (whose manifests need to be read). + pub related_regions: HashMap>, } impl Display for GetFileRefs { diff --git a/src/meta-srv/src/gc.rs b/src/meta-srv/src/gc.rs index d8c0adb204..b1bed015f3 100644 --- a/src/meta-srv/src/gc.rs +++ b/src/meta-srv/src/gc.rs @@ -12,9 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -// TODO(discord9): remove this once gc scheduler is fully merged -#![allow(unused)] - use std::collections::{HashMap, HashSet}; use common_meta::peer::Peer; @@ -29,6 +26,7 @@ mod options; mod procedure; mod scheduler; mod tracker; +mod util; pub use options::GcSchedulerOptions; pub use procedure::BatchGcProcedure; diff --git a/src/meta-srv/src/gc/ctx.rs b/src/meta-srv/src/gc/ctx.rs index a4c75d4cd6..c1bd3a372c 100644 --- a/src/meta-srv/src/gc/ctx.rs +++ b/src/meta-srv/src/gc/ctx.rs @@ -12,29 +12,22 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::time::Duration; -use api::v1::meta::MailboxMessage; use common_meta::datanode::RegionStat; -use common_meta::instruction::{ - GcRegions, GetFileRefs, GetFileRefsReply, Instruction, InstructionReply, -}; use common_meta::key::TableMetadataManagerRef; use common_meta::key::table_route::PhysicalTableRouteValue; -use common_meta::peer::Peer; use common_procedure::{ProcedureManagerRef, ProcedureWithId, watcher}; -use common_telemetry::{debug, error, warn}; +use common_telemetry::debug; use snafu::{OptionExt as _, ResultExt as _}; -use store_api::storage::{FileId, FileRefsManifest, GcReport, RegionId}; +use store_api::storage::{GcReport, RegionId}; use table::metadata::TableId; use crate::cluster::MetaPeerClientRef; -use crate::error::{self, Result, TableMetadataManagerSnafu, UnexpectedSnafu}; -use crate::gc::Region2Peers; -use crate::gc::procedure::{BatchGcProcedure, GcRegionProcedure}; -use crate::handler::HeartbeatMailbox; -use crate::service::mailbox::{Channel, MailboxRef}; +use crate::error::{self, Result, TableMetadataManagerSnafu}; +use crate::gc::procedure::BatchGcProcedure; +use crate::service::mailbox::MailboxRef; #[async_trait::async_trait] pub(crate) trait SchedulerCtx: Send + Sync { @@ -45,19 +38,9 @@ pub(crate) trait SchedulerCtx: Send + Sync { table_id: TableId, ) -> Result<(TableId, PhysicalTableRouteValue)>; - async fn get_file_references( - &self, - query_regions: &[RegionId], - related_regions: HashMap>, - region_routes: &Region2Peers, - timeout: Duration, - ) -> Result; - async fn gc_regions( &self, - peer: Peer, region_ids: &[RegionId], - file_refs_manifest: &FileRefsManifest, full_file_listing: bool, timeout: Duration, ) -> Result; @@ -100,7 +83,7 @@ impl SchedulerCtx for DefaultGcSchedulerCtx { let dn_stats = self.meta_peer_client.get_all_dn_stat_kvs().await?; let mut table_to_region_stats: HashMap> = HashMap::new(); for (_dn_id, stats) in dn_stats { - let mut stats = stats.stats; + let stats = stats.stats; let Some(latest_stat) = stats.iter().max_by_key(|s| s.timestamp_millis).cloned() else { continue; @@ -129,142 +112,34 @@ impl SchedulerCtx for DefaultGcSchedulerCtx { async fn gc_regions( &self, - peer: Peer, region_ids: &[RegionId], - file_refs_manifest: &FileRefsManifest, full_file_listing: bool, timeout: Duration, ) -> Result { - self.gc_regions_inner( - peer, - region_ids, - file_refs_manifest, - full_file_listing, - timeout, - ) - .await - } - - async fn get_file_references( - &self, - query_regions: &[RegionId], - related_regions: HashMap>, - region_routes: &Region2Peers, - timeout: Duration, - ) -> Result { - debug!( - "Getting file references for {} regions", - query_regions.len() - ); - - // Group regions by datanode to minimize RPC calls - let mut datanode2query_regions: HashMap> = HashMap::new(); - - for region_id in query_regions { - if let Some((leader, followers)) = region_routes.get(region_id) { - datanode2query_regions - .entry(leader.clone()) - .or_default() - .push(*region_id); - // also need to send for follower regions for file refs in case query is running on follower - for follower in followers { - datanode2query_regions - .entry(follower.clone()) - .or_default() - .push(*region_id); - } - } else { - return error::UnexpectedSnafu { - violated: format!( - "region_routes: {region_routes:?} does not contain region_id: {region_id}", - ), - } - .fail(); - } - } - let mut datanode2related_regions: HashMap>> = - HashMap::new(); - for (related_region, queries) in related_regions { - if let Some((leader, followers)) = region_routes.get(&related_region) { - datanode2related_regions - .entry(leader.clone()) - .or_default() - .insert(related_region, queries.clone()); - } // since read from manifest, no need to send to followers - } - - // Send GetFileRefs instructions to each datanode - let mut all_file_refs: HashMap> = HashMap::new(); - let mut all_manifest_versions = HashMap::new(); - - for (peer, regions) in datanode2query_regions { - let related_regions = datanode2related_regions.remove(&peer).unwrap_or_default(); - match self - .send_get_file_refs_instruction(&peer, ®ions, related_regions, timeout) - .await - { - Ok(manifest) => { - // TODO(discord9): if other regions provide file refs for one region on other datanode, and no version, - // is it correct to merge manifest_version directly? - // FIXME: follower region how to merge version??? - - for (region_id, file_refs) in manifest.file_refs { - all_file_refs - .entry(region_id) - .or_default() - .extend(file_refs); - } - // region manifest version should be the smallest one among all peers, so outdated region can be detected - for (region_id, version) in manifest.manifest_version { - let entry = all_manifest_versions.entry(region_id).or_insert(version); - *entry = (*entry).min(version); - } - } - Err(e) => { - warn!( - "Failed to get file refs from datanode {}: {}. Skipping regions on this datanode.", - peer, e - ); - // Continue processing other datanodes instead of failing the entire operation - continue; - } - } - } - - Ok(FileRefsManifest { - file_refs: all_file_refs, - manifest_version: all_manifest_versions, - }) + self.gc_regions_inner(region_ids, full_file_listing, timeout) + .await } } impl DefaultGcSchedulerCtx { async fn gc_regions_inner( &self, - peer: Peer, region_ids: &[RegionId], - file_refs_manifest: &FileRefsManifest, full_file_listing: bool, timeout: Duration, ) -> Result { debug!( - "Sending GC instruction to datanode {} for {} regions (full_file_listing: {})", - peer, + "Sending GC instruction for {} regions (full_file_listing: {})", region_ids.len(), full_file_listing ); - let gc_regions = GcRegions { - regions: region_ids.to_vec(), - file_refs_manifest: file_refs_manifest.clone(), - full_file_listing, - }; - let procedure = GcRegionProcedure::new( + let procedure = BatchGcProcedure::new( self.mailbox.clone(), + self.table_metadata_manager.clone(), self.server_addr.clone(), - peer, - gc_regions, - format!("GC for {} regions", region_ids.len()), + region_ids.to_vec(), + full_file_listing, timeout, ); let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); @@ -285,96 +160,8 @@ impl DefaultGcSchedulerCtx { ), })?; - let gc_report = GcRegionProcedure::cast_result(res)?; + let gc_report = BatchGcProcedure::cast_result(res)?; Ok(gc_report) } - - /// TODO(discord9): add support to read manifest of related regions for file refs too - /// (now it's only reading active FileHandles) - async fn send_get_file_refs_instruction( - &self, - peer: &Peer, - query_regions: &[RegionId], - related_regions: HashMap>, - timeout: Duration, - ) -> Result { - debug!( - "Sending GetFileRefs instruction to datanode {} for {} regions", - peer, - query_regions.len() - ); - - let instruction = Instruction::GetFileRefs(GetFileRefs { - query_regions: query_regions.to_vec(), - related_regions, - }); - - let reply = self - .send_instruction(peer, instruction, "Get file references", timeout) - .await?; - - let InstructionReply::GetFileRefs(GetFileRefsReply { - file_refs_manifest, - success, - error, - }) = reply - else { - return error::UnexpectedInstructionReplySnafu { - mailbox_message: format!("{:?}", reply), - reason: "Unexpected reply of the GetFileRefs instruction", - } - .fail(); - }; - - if !success { - return error::UnexpectedSnafu { - violated: format!( - "Failed to get file references from datanode {}: {:?}", - peer, error - ), - } - .fail(); - } - - Ok(file_refs_manifest) - } - - async fn send_instruction( - &self, - peer: &Peer, - instruction: Instruction, - description: &str, - timeout: Duration, - ) -> Result { - let msg = MailboxMessage::json_message( - &format!("{}: {}", description, instruction), - &format!("Metasrv@{}", self.server_addr), - &format!("Datanode-{}@{}", peer.id, peer.addr), - common_time::util::current_time_millis(), - &instruction, - ) - .with_context(|_| error::SerializeToJsonSnafu { - input: instruction.to_string(), - })?; - - let mailbox_rx = self - .mailbox - .send(&Channel::Datanode(peer.id), msg, timeout) - .await?; - - match mailbox_rx.await { - Ok(reply_msg) => { - let reply = HeartbeatMailbox::json_reply(&reply_msg)?; - Ok(reply) - } - Err(e) => { - error!( - "Failed to receive reply from datanode {} for {}: {}", - peer, description, e - ); - Err(e) - } - } - } } diff --git a/src/meta-srv/src/gc/handler.rs b/src/meta-srv/src/gc/handler.rs index 4085f6289c..ddf6d3d977 100644 --- a/src/meta-srv/src/gc/handler.rs +++ b/src/meta-srv/src/gc/handler.rs @@ -15,24 +15,17 @@ use std::collections::{HashMap, HashSet}; use std::time::Instant; -use common_meta::key::table_route::PhysicalTableRouteValue; use common_meta::peer::Peer; use common_telemetry::{debug, error, info, warn}; use futures::StreamExt; use itertools::Itertools; -use store_api::storage::{FileRefsManifest, GcReport, RegionId}; +use store_api::storage::{GcReport, RegionId}; use table::metadata::TableId; -use tokio::time::sleep; use crate::error::Result; use crate::gc::candidate::GcCandidate; use crate::gc::scheduler::{GcJobReport, GcScheduler}; use crate::gc::tracker::RegionGcInfo; -use crate::region; - -pub(crate) type Region2Peers = HashMap)>; - -pub(crate) type Peer2Regions = HashMap>; impl GcScheduler { /// Iterate through all region stats, find region that might need gc, and send gc instruction to @@ -61,6 +54,8 @@ impl GcScheduler { .aggregate_candidates_by_datanode(per_table_candidates) .await?; + // TODO(discord9): add deleted regions from repartition mapping + if datanode_to_candidates.is_empty() { info!("No valid datanode candidates found, skipping GC cycle"); return Ok(Default::default()); @@ -83,17 +78,6 @@ impl GcScheduler { Ok(report) } - /// Find related regions that might share files with the candidate regions. - /// Currently returns the same regions since repartition is not implemented yet. - /// TODO(discord9): When repartition is implemented, this should also find src/dst regions - /// that might share files with the candidate regions. - pub(crate) async fn find_related_regions( - &self, - candidate_region_ids: &[RegionId], - ) -> Result>> { - Ok(candidate_region_ids.iter().map(|&r| (r, vec![r])).collect()) - } - /// Aggregate GC candidates by their corresponding datanode peer. pub(crate) async fn aggregate_candidates_by_datanode( &self, @@ -210,28 +194,11 @@ impl GcScheduler { let all_region_ids: Vec = candidates.iter().map(|(_, c)| c.region_id).collect(); - let all_related_regions = self.find_related_regions(&all_region_ids).await?; - - let (region_to_peer, _) = self - .discover_datanodes_for_regions(&all_related_regions.keys().cloned().collect_vec()) - .await?; - - // Step 1: Get file references for all regions on this datanode - let file_refs_manifest = self - .ctx - .get_file_references( - &all_region_ids, - all_related_regions, - ®ion_to_peer, - self.config.mailbox_timeout, - ) - .await?; - - // Step 2: Create a single GcRegionProcedure for all regions on this datanode + // Step 2: Run GC for all regions on this datanode in a single batch let (gc_report, fully_listed_regions) = { // Partition regions into full listing and fast listing in a single pass - let mut batch_full_listing_decisions = + let batch_full_listing_decisions = self.batch_should_use_full_listing(&all_region_ids).await; let need_full_list_regions = batch_full_listing_decisions @@ -242,7 +209,7 @@ impl GcScheduler { }, ) .collect_vec(); - let mut fast_list_regions = batch_full_listing_decisions + let fast_list_regions = batch_full_listing_decisions .iter() .filter_map( |(®ion_id, &need_full)| { @@ -257,13 +224,7 @@ impl GcScheduler { if !fast_list_regions.is_empty() { match self .ctx - .gc_regions( - peer.clone(), - &fast_list_regions, - &file_refs_manifest, - false, - self.config.mailbox_timeout, - ) + .gc_regions(&fast_list_regions, false, self.config.mailbox_timeout) .await { Ok(report) => combined_report.merge(report), @@ -284,13 +245,7 @@ impl GcScheduler { if !need_full_list_regions.is_empty() { match self .ctx - .gc_regions( - peer.clone(), - &need_full_list_regions, - &file_refs_manifest, - true, - self.config.mailbox_timeout, - ) + .gc_regions(&need_full_list_regions, true, self.config.mailbox_timeout) .await { Ok(report) => combined_report.merge(report), @@ -330,98 +285,6 @@ impl GcScheduler { Ok(gc_report) } - /// Discover datanodes for the given regions(and it's related regions) by fetching table routes in batches. - /// Returns mappings from region to peer(leader, Vec) and peer to regions. - async fn discover_datanodes_for_regions( - &self, - regions: &[RegionId], - ) -> Result<(Region2Peers, Peer2Regions)> { - let all_related_regions = self - .find_related_regions(regions) - .await? - .into_iter() - .flat_map(|(k, mut v)| { - v.push(k); - v - }) - .collect_vec(); - let mut region_to_peer = HashMap::new(); - let mut peer_to_regions = HashMap::new(); - - // Group regions by table ID for batch processing - let mut table_to_regions: HashMap> = HashMap::new(); - for region_id in all_related_regions { - let table_id = region_id.table_id(); - table_to_regions - .entry(table_id) - .or_default() - .push(region_id); - } - - // Process each table's regions together for efficiency - for (table_id, table_regions) in table_to_regions { - match self.ctx.get_table_route(table_id).await { - Ok((_phy_table_id, table_route)) => { - self.get_table_regions_peer( - &table_route, - &table_regions, - &mut region_to_peer, - &mut peer_to_regions, - ); - } - Err(e) => { - // Continue with other tables instead of failing completely - // TODO(discord9): consider failing here instead - warn!( - "Failed to get table route for table {}: {}, skipping its regions", - table_id, e - ); - continue; - } - } - } - - Ok((region_to_peer, peer_to_regions)) - } - - /// Process regions for a single table to find their current leader peers. - fn get_table_regions_peer( - &self, - table_route: &PhysicalTableRouteValue, - table_regions: &[RegionId], - region_to_peer: &mut Region2Peers, - peer_to_regions: &mut Peer2Regions, - ) { - for ®ion_id in table_regions { - let mut found = false; - - // Find the region in the table route - for region_route in &table_route.region_routes { - if region_route.region.id == region_id - && let Some(leader_peer) = ®ion_route.leader_peer - { - region_to_peer.insert( - region_id, - (leader_peer.clone(), region_route.follower_peers.clone()), - ); - peer_to_regions - .entry(leader_peer.clone()) - .or_default() - .insert(region_id); - found = true; - break; - } - } - - if !found { - warn!( - "Failed to find region {} in table route or no leader peer found", - region_id, - ); - } - } - } - async fn batch_should_use_full_listing( &self, region_ids: &[RegionId], diff --git a/src/meta-srv/src/gc/mock.rs b/src/meta-srv/src/gc/mock.rs index 96eb1cdf51..c5f840f391 100644 --- a/src/meta-srv/src/gc/mock.rs +++ b/src/meta-srv/src/gc/mock.rs @@ -36,10 +36,9 @@ use store_api::storage::{FileRefsManifest, GcReport, RegionId}; use table::metadata::TableId; use tokio::sync::mpsc::Sender; -use crate::error::{Result, UnexpectedSnafu}; +use crate::error::Result; use crate::gc::candidate::GcCandidate; use crate::gc::ctx::SchedulerCtx; -use crate::gc::handler::Region2Peers; use crate::gc::options::GcSchedulerOptions; use crate::gc::scheduler::{Event, GcScheduler}; @@ -67,12 +66,10 @@ pub struct MockSchedulerCtx { pub gc_reports: Arc>>, pub candidates: Arc>>>>, pub get_table_to_region_stats_calls: Arc>, - pub get_file_references_calls: Arc>, pub gc_regions_calls: Arc>, // Error injection fields for testing pub get_table_to_region_stats_error: Arc>>, pub get_table_route_error: Arc>>, - pub get_file_references_error: Arc>>, pub gc_regions_error: Arc>>, // Retry testing fields pub gc_regions_retry_count: Arc>>, @@ -119,57 +116,12 @@ impl MockSchedulerCtx { *self.get_table_route_error.lock().unwrap() = Some(error); } - /// Set an error to be returned by `get_file_references` - #[allow(dead_code)] - pub fn with_get_file_references_error(self, error: crate::error::Error) -> Self { - *self.get_file_references_error.lock().unwrap() = Some(error); - self - } - /// Set an error to be returned by `gc_regions` pub fn with_gc_regions_error(self, error: crate::error::Error) -> Self { *self.gc_regions_error.lock().unwrap() = Some(error); self } - /// Set a sequence of errors to be returned by `gc_regions` for retry testing - pub fn set_gc_regions_error_sequence(&self, errors: Vec) { - *self.gc_regions_error_sequence.lock().unwrap() = errors; - } - - /// Set success after a specific number of retries for a region - pub fn set_gc_regions_success_after_retries(&self, region_id: RegionId, retries: usize) { - self.gc_regions_success_after_retries - .lock() - .unwrap() - .insert(region_id, retries); - } - - /// Get the retry count for a specific region - pub fn get_retry_count(&self, region_id: RegionId) -> usize { - self.gc_regions_retry_count - .lock() - .unwrap() - .get(®ion_id) - .copied() - .unwrap_or(0) - } - - /// Reset all retry tracking - pub fn reset_retry_tracking(&self) { - *self.gc_regions_retry_count.lock().unwrap() = HashMap::new(); - *self.gc_regions_error_sequence.lock().unwrap() = Vec::new(); - *self.gc_regions_success_after_retries.lock().unwrap() = HashMap::new(); - } - - /// Set an error to be returned for a specific region - pub fn set_gc_regions_error_for_region(&self, region_id: RegionId, error: crate::error::Error) { - self.gc_regions_per_region_errors - .lock() - .unwrap() - .insert(region_id, error); - } - /// Clear per-region errors #[allow(unused)] pub fn clear_gc_regions_per_region_errors(&self) { @@ -213,39 +165,9 @@ impl SchedulerCtx for MockSchedulerCtx { .unwrap_or_else(|| (table_id, PhysicalTableRouteValue::default()))) } - async fn get_file_references( - &self, - query_regions: &[RegionId], - _related_regions: HashMap>, - region_to_peer: &Region2Peers, - _timeout: Duration, - ) -> Result { - *self.get_file_references_calls.lock().unwrap() += 1; - - // Check if we should return an injected error - if let Some(error) = self.get_file_references_error.lock().unwrap().take() { - return Err(error); - } - if query_regions - .iter() - .any(|region_id| !region_to_peer.contains_key(region_id)) - { - UnexpectedSnafu { - violated: format!( - "region_to_peer{region_to_peer:?} does not contain all region_ids requested: {:?}", - query_regions - ), - }.fail()?; - } - - Ok(self.file_refs.lock().unwrap().clone().unwrap_or_default()) - } - async fn gc_regions( &self, - _peer: Peer, region_ids: &[RegionId], - _file_refs_manifest: &FileRefsManifest, _full_file_listing: bool, _timeout: Duration, ) -> Result { diff --git a/src/meta-srv/src/gc/mock/basic.rs b/src/meta-srv/src/gc/mock/basic.rs index 2cf3679245..40cb932287 100644 --- a/src/meta-srv/src/gc/mock/basic.rs +++ b/src/meta-srv/src/gc/mock/basic.rs @@ -152,7 +152,6 @@ async fn test_handle_tick() { ); assert_eq!(*ctx.get_table_to_region_stats_calls.lock().unwrap(), 1); - assert_eq!(*ctx.get_file_references_calls.lock().unwrap(), 1); assert_eq!(*ctx.gc_regions_calls.lock().unwrap(), 1); let tracker = scheduler.region_gc_tracker.lock().await; diff --git a/src/meta-srv/src/gc/mock/err_handle.rs b/src/meta-srv/src/gc/mock/err_handle.rs index a447904440..77671fd177 100644 --- a/src/meta-srv/src/gc/mock/err_handle.rs +++ b/src/meta-srv/src/gc/mock/err_handle.rs @@ -64,6 +64,7 @@ async fn test_gc_regions_failure_handling() { region_id, HashSet::from([FileRef::new(region_id, FileId::random(), None)]), )]), + cross_region_refs: HashMap::new(), }; let ctx = Arc::new( @@ -121,10 +122,6 @@ async fn test_gc_regions_failure_handling() { 1, "Expected 1 call to get_table_to_region_stats" ); - assert!( - *ctx.get_file_references_calls.lock().unwrap() >= 1, - "Expected at least 1 call to get_file_references" - ); assert!( *ctx.gc_regions_calls.lock().unwrap() >= 1, "Expected at least 1 call to gc_regions" @@ -206,13 +203,6 @@ async fn test_get_file_references_failure() { datanode_report.deleted_files[®ion_id].is_empty(), "Should have empty deleted files due to file refs failure" ); - - // Should still attempt to get file references (may be called multiple times due to retry logic) - assert!( - *ctx.get_file_references_calls.lock().unwrap() >= 1, - "Expected at least 1 call to get_file_references, got {}", - *ctx.get_file_references_calls.lock().unwrap() - ); } #[tokio::test] @@ -255,42 +245,22 @@ async fn test_get_table_route_failure() { last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())), }; - // Get candidates first - let stats = &ctx - .table_to_region_stats - .lock() - .unwrap() - .clone() - .unwrap_or_default(); - let candidates = scheduler.select_gc_candidates(stats).await.unwrap(); + // Test the full workflow to trigger table route failure during aggregation + // The table route failure should cause the entire GC cycle to fail + let result = scheduler.handle_tick().await; - // Convert table-based candidates to datanode-based candidates - let datanode_to_candidates = HashMap::from([( - Peer::new(1, ""), - candidates - .into_iter() - .flat_map(|(table_id, candidates)| candidates.into_iter().map(move |c| (table_id, c))) - .collect(), - )]); - - // This should handle table route failure gracefully - let report = scheduler - .parallel_process_datanodes(datanode_to_candidates) - .await; - - // Should process the datanode but handle route error gracefully - assert_eq!( - report.per_datanode_reports.len(), - 0, - "Expected 0 datanode report" - ); - assert_eq!( - report.failed_datanodes.len(), - 1, - "Expected 1 failed datanodes (route error handled gracefully)" - ); + // The table route failure should be propagated as an error assert!( - report.failed_datanodes.contains_key(&1), - "Failed datanodes should contain the datanode with route error" + result.is_err(), + "Expected table route failure to propagate as error" + ); + + // Verify the error message contains our simulated failure + let error = result.unwrap_err(); + let error_msg = format!("{}", error); + assert!( + error_msg.contains("Simulated table route failure for testing"), + "Error message should contain our simulated failure: {}", + error_msg ); } diff --git a/src/meta-srv/src/gc/mock/integration.rs b/src/meta-srv/src/gc/mock/integration.rs index 484871bb5e..1c0b90222c 100644 --- a/src/meta-srv/src/gc/mock/integration.rs +++ b/src/meta-srv/src/gc/mock/integration.rs @@ -123,11 +123,6 @@ async fn test_full_gc_workflow() { 1, "Expected 1 call to get_table_to_region_stats" ); - assert_eq!( - *ctx.get_file_references_calls.lock().unwrap(), - 1, - "Expected 1 call to get_file_references" - ); assert_eq!( *ctx.gc_regions_calls.lock().unwrap(), 1, diff --git a/src/meta-srv/src/gc/procedure.rs b/src/meta-srv/src/gc/procedure.rs index 039e542cd0..0a6c82bfbd 100644 --- a/src/meta-srv/src/gc/procedure.rs +++ b/src/meta-srv/src/gc/procedure.rs @@ -19,6 +19,8 @@ use std::time::Duration; use api::v1::meta::MailboxMessage; use common_meta::instruction::{self, GcRegions, GetFileRefs, GetFileRefsReply, InstructionReply}; +use common_meta::key::TableMetadataManagerRef; +use common_meta::key::table_route::PhysicalTableRouteValue; use common_meta::lock_key::RegionLock; use common_meta::peer::Peer; use common_procedure::error::ToJsonSnafu; @@ -26,14 +28,16 @@ use common_procedure::{ Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, Result as ProcedureResult, Status, }; -use common_telemetry::{debug, error, info, warn}; +use common_telemetry::{error, info, warn}; use itertools::Itertools as _; use serde::{Deserialize, Serialize}; use snafu::ResultExt as _; use store_api::storage::{FileRefsManifest, GcReport, RegionId}; +use table::metadata::TableId; -use crate::error::{self, Result, SerializeToJsonSnafu}; -use crate::gc::Region2Peers; +use crate::error::{self, KvBackendSnafu, Result, SerializeToJsonSnafu, TableMetadataManagerSnafu}; +use crate::gc::util::table_route_to_region; +use crate::gc::{Peer2Regions, Region2Peers}; use crate::handler::HeartbeatMailbox; use crate::service::mailbox::{Channel, MailboxRef}; @@ -146,56 +150,73 @@ async fn send_gc_regions( } } -/// TODO(discord9): another procedure which do both get file refs and gc regions. -pub struct GcRegionProcedure { +/// Procedure to perform get file refs then batch GC for multiple regions, +/// it holds locks for all regions during the whole procedure. +pub struct BatchGcProcedure { mailbox: MailboxRef, - data: GcRegionData, + table_metadata_manager: TableMetadataManagerRef, + data: BatchGcData, } #[derive(Serialize, Deserialize)] -pub struct GcRegionData { +pub struct BatchGcData { + state: State, + /// Meta server address server_addr: String, - peer: Peer, - gc_regions: GcRegions, - description: String, + /// The regions to be GC-ed + regions: Vec, + full_file_listing: bool, + region_routes: Region2Peers, + /// Related regions (e.g., for shared files after repartition). + /// The source regions (where those files originally came from) are used as the key, and the destination regions (where files are currently stored) are used as the value. + related_regions: HashMap>, + /// Acquired file references (Populated in Acquiring state) + file_refs: FileRefsManifest, + /// mailbox timeout duration timeout: Duration, + gc_report: Option, } -impl GcRegionProcedure { - pub const TYPE_NAME: &'static str = "metasrv-procedure::GcRegionProcedure"; +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub enum State { + /// Initial state + Start, + /// Fetching file references from datanodes + Acquiring, + /// Sending GC instruction to the target datanode + Gcing, + /// Updating region repartition info in kvbackend after GC based on the GC result + UpdateRepartition, +} + +impl BatchGcProcedure { + pub const TYPE_NAME: &'static str = "metasrv-procedure::BatchGcProcedure"; pub fn new( mailbox: MailboxRef, + table_metadata_manager: TableMetadataManagerRef, server_addr: String, - peer: Peer, - gc_regions: GcRegions, - description: String, + regions: Vec, + full_file_listing: bool, timeout: Duration, ) -> Self { Self { mailbox, - data: GcRegionData { - peer, + table_metadata_manager, + data: BatchGcData { + state: State::Start, server_addr, - gc_regions, - description, + regions, + full_file_listing, timeout, + region_routes: HashMap::new(), + related_regions: HashMap::new(), + file_refs: FileRefsManifest::default(), + gc_report: None, }, } } - async fn send_gc_instr(&self) -> Result { - send_gc_regions( - &self.mailbox, - &self.data.peer, - self.data.gc_regions.clone(), - &self.data.server_addr, - self.data.timeout, - &self.data.description, - ) - .await - } - pub fn cast_result(res: Arc) -> Result { res.downcast_ref::().cloned().ok_or_else(|| { error::UnexpectedSnafu { @@ -207,111 +228,129 @@ impl GcRegionProcedure { .build() }) } -} -#[async_trait::async_trait] -impl Procedure for GcRegionProcedure { - fn type_name(&self) -> &str { - Self::TYPE_NAME - } - - async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { - // Send GC instruction to the datanode. This procedure only handle lock&send, results or other kind of - // errors will be reported back via the oneshot channel. - let reply = self - .send_gc_instr() + async fn get_table_route( + &self, + table_id: TableId, + ) -> Result<(TableId, PhysicalTableRouteValue)> { + self.table_metadata_manager + .table_route_manager() + .get_physical_table_route(table_id) .await - .map_err(ProcedureError::external)?; - - Ok(Status::done_with_output(reply)) + .context(TableMetadataManagerSnafu) } - fn dump(&self) -> ProcedureResult { - serde_json::to_string(&self.data).context(ToJsonSnafu) - } - - /// Read lock all regions involved in this GC procedure. - /// So i.e. region migration won't happen during GC and cause race conditions. - /// - /// only read lock the regions not catatlog/schema because it can run concurrently with other procedures(i.e. drop database/table) - /// TODO:(discord9): integration test to verify this - fn lock_key(&self) -> LockKey { - let lock_key: Vec<_> = self - .data - .gc_regions - .regions - .iter() - .sorted() // sort to have a deterministic lock order - .map(|id| RegionLock::Read(*id).into()) - .collect(); - - LockKey::new(lock_key) - } -} - -/// Procedure to perform get file refs then batch GC for multiple regions, should only be used by admin function -/// for triggering manual gc, as it holds locks for too long and for all regions during the procedure. -pub struct BatchGcProcedure { - mailbox: MailboxRef, - data: BatchGcData, -} - -#[derive(Serialize, Deserialize)] -pub struct BatchGcData { - state: State, - server_addr: String, - /// The regions to be GC-ed - regions: Vec, - full_file_listing: bool, - region_routes: Region2Peers, - /// Related regions (e.g., for shared files). Map: RegionId -> List of related RegionIds. - related_regions: HashMap>, - /// Acquired file references (Populated in Acquiring state) - file_refs: FileRefsManifest, - /// mailbox timeout duration - timeout: Duration, -} - -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -pub enum State { - /// Initial state - Start, - /// Fetching file references from datanodes - Acquiring, - /// Sending GC instruction to the target datanode - Gcing, -} - -impl BatchGcProcedure { - pub const TYPE_NAME: &'static str = "metasrv-procedure::BatchGcProcedure"; - - pub fn new( - mailbox: MailboxRef, - server_addr: String, - regions: Vec, - full_file_listing: bool, - region_routes: Region2Peers, - related_regions: HashMap>, - timeout: Duration, - ) -> Self { - Self { - mailbox, - data: BatchGcData { - state: State::Start, - server_addr, - regions, - full_file_listing, - region_routes, - related_regions, - file_refs: FileRefsManifest::default(), - timeout, - }, + /// Return related regions for the given regions. + /// The returned map uses the source regions (where those files originally came from) as the key, + /// and the destination regions (where files are currently stored) as the value. + /// If a region is not found in the repartition manager, the returned map still have this region as key, + /// just empty value + async fn find_related_regions( + &self, + regions: &[RegionId], + ) -> Result>> { + let repart_mgr = self.table_metadata_manager.table_repart_manager(); + let mut related_regions: HashMap> = HashMap::new(); + for src_region in regions { + // TODO(discord9): batch get + if let Some(dst_regions) = repart_mgr + .get_dst_regions(*src_region) + .await + .context(KvBackendSnafu)? + { + related_regions.insert(*src_region, dst_regions.into_iter().collect()); + } else { + related_regions.insert(*src_region, Default::default()); + } } + Ok(related_regions) + } + + /// Clean up region repartition info in kvbackend after GC + /// according to cross reference in `FileRefsManifest`. + async fn cleanup_region_repartition(&self) -> Result<()> { + for (src_region, dst_regions) in self.data.file_refs.cross_region_refs.iter() { + // TODO(discord9): batch update + self.table_metadata_manager + .table_repart_manager() + .update_mappings(*src_region, &dst_regions.iter().cloned().collect_vec()) + .await + .context(KvBackendSnafu)?; + } + Ok(()) + } + + /// Discover region routes for the given regions. + async fn discover_route_for_regions( + &self, + regions: &[RegionId], + ) -> Result<(Region2Peers, Peer2Regions)> { + let mut region_to_peer = HashMap::new(); + let mut peer_to_regions = HashMap::new(); + + // Group regions by table ID for batch processing + let mut table_to_regions: HashMap> = HashMap::new(); + for region_id in regions { + let table_id = region_id.table_id(); + table_to_regions + .entry(table_id) + .or_default() + .push(*region_id); + } + + // Process each table's regions together for efficiency + for (table_id, table_regions) in table_to_regions { + match self.get_table_route(table_id).await { + Ok((_phy_table_id, table_route)) => { + table_route_to_region( + &table_route, + &table_regions, + &mut region_to_peer, + &mut peer_to_regions, + ); + } + Err(e) => { + // Continue with other tables instead of failing completely + // TODO(discord9): consider failing here instead + warn!( + "Failed to get table route for table {}: {}, skipping its regions", + table_id, e + ); + continue; + } + } + } + + Ok((region_to_peer, peer_to_regions)) + } + + /// Set region routes and related regions for GC procedure + async fn set_routes_and_related_regions(&mut self) -> Result<()> { + let related_regions = self.find_related_regions(&self.data.regions).await?; + + self.data.related_regions = related_regions.clone(); + + // Discover routes for all regions involved in GC, including both the + // primary GC regions and their related regions. + let mut regions_set: HashSet = self.data.regions.iter().cloned().collect(); + + regions_set.extend(related_regions.keys().cloned()); + regions_set.extend(related_regions.values().flat_map(|v| v.iter()).cloned()); + + let regions_to_discover = regions_set.into_iter().collect_vec(); + + let (region_to_peer, _) = self + .discover_route_for_regions(®ions_to_discover) + .await?; + + self.data.region_routes = region_to_peer; + + Ok(()) } /// Get file references from all datanodes that host the regions - async fn get_file_references(&self) -> Result { - use std::collections::{HashMap, HashSet}; + async fn get_file_references(&mut self) -> Result { + self.set_routes_and_related_regions().await?; let query_regions = &self.data.regions; let related_regions = &self.data.related_regions; @@ -344,20 +383,25 @@ impl BatchGcProcedure { } } - let mut datanode2related_regions: HashMap>> = + let mut datanode2related_regions: HashMap>> = HashMap::new(); - for (related_region, queries) in related_regions { - if let Some((leader, _followers)) = region_routes.get(related_region) { - datanode2related_regions - .entry(leader.clone()) - .or_default() - .insert(*related_region, queries.clone()); - } // since read from manifest, no need to send to followers + for (src_region, dst_regions) in related_regions { + for dst_region in dst_regions { + if let Some((leader, _followers)) = region_routes.get(dst_region) { + datanode2related_regions + .entry(leader.clone()) + .or_default() + .entry(*src_region) + .or_default() + .insert(*dst_region); + } // since read from manifest, no need to send to followers + } } // Send GetFileRefs instructions to each datanode let mut all_file_refs: HashMap> = HashMap::new(); let mut all_manifest_versions = HashMap::new(); + let mut all_cross_region_refs = HashMap::new(); for (peer, regions) in datanode2query_regions { let related_regions_for_peer = @@ -400,17 +444,25 @@ impl BatchGcProcedure { let entry = all_manifest_versions.entry(region_id).or_insert(version); *entry = (*entry).min(version); } + + for (region_id, related_region_ids) in reply.file_refs_manifest.cross_region_refs { + let entry = all_cross_region_refs + .entry(region_id) + .or_insert_with(HashSet::new); + entry.extend(related_region_ids); + } } Ok(FileRefsManifest { file_refs: all_file_refs, manifest_version: all_manifest_versions, + cross_region_refs: all_cross_region_refs, }) } /// Send GC instruction to all datanodes that host the regions, /// returns regions that need retry. - async fn send_gc_instructions(&self) -> Result> { + async fn send_gc_instructions(&self) -> Result { let regions = &self.data.regions; let region_routes = &self.data.region_routes; let file_refs = &self.data.file_refs; @@ -418,6 +470,7 @@ impl BatchGcProcedure { // Group regions by datanode let mut datanode2regions: HashMap> = HashMap::new(); + let mut all_report = GcReport::default(); for region_id in regions { if let Some((leader, _followers)) = region_routes.get(region_id) { @@ -469,10 +522,15 @@ impl BatchGcProcedure { peer, success, need_retry ); } - all_need_retry.extend(report.need_retry_regions); + all_need_retry.extend(report.need_retry_regions.clone()); + all_report.merge(report); } - Ok(all_need_retry.into_iter().collect()) + if !all_need_retry.is_empty() { + warn!("Regions need retry after batch GC: {:?}", all_need_retry); + } + + Ok(all_report) } } @@ -507,12 +565,10 @@ impl Procedure for BatchGcProcedure { // Send GC instructions to all datanodes // TODO(discord9): handle need-retry regions match self.send_gc_instructions().await { - Ok(_) => { - info!( - "Batch GC completed successfully for regions {:?}", - self.data.regions - ); - Ok(Status::done()) + Ok(report) => { + self.data.state = State::UpdateRepartition; + self.data.gc_report = Some(report); + Ok(Status::executing(false)) } Err(e) => { error!("Failed to send GC instructions: {}", e); @@ -520,6 +576,29 @@ impl Procedure for BatchGcProcedure { } } } + State::UpdateRepartition => match self.cleanup_region_repartition().await { + Ok(()) => { + info!( + "Cleanup region repartition info completed successfully for regions {:?}", + self.data.regions + ); + info!( + "Batch GC completed successfully for regions {:?}", + self.data.regions + ); + let Some(report) = self.data.gc_report.take() else { + return common_procedure::error::UnexpectedSnafu { + err_msg: "GC report should be present after GC completion".to_string(), + } + .fail(); + }; + Ok(Status::done_with_output(report)) + } + Err(e) => { + error!("Failed to cleanup region repartition info: {}", e); + Err(ProcedureError::external(e)) + } + }, } } diff --git a/src/meta-srv/src/gc/scheduler.rs b/src/meta-srv/src/gc/scheduler.rs index e3ed3834bb..4fd46755b7 100644 --- a/src/meta-srv/src/gc/scheduler.rs +++ b/src/meta-srv/src/gc/scheduler.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::sync::Arc; use std::time::Instant; @@ -38,29 +38,6 @@ pub struct GcJobReport { pub per_datanode_reports: HashMap, pub failed_datanodes: HashMap>, } -impl GcJobReport { - pub fn merge(&mut self, mut other: GcJobReport) { - // merge per_datanode_reports&failed_datanodes - for (dn_id, report) in other.per_datanode_reports { - let self_report = self.per_datanode_reports.entry(dn_id).or_default(); - self_report.merge(report); - } - let all_failed_dn_ids = self - .failed_datanodes - .keys() - .cloned() - .chain(other.failed_datanodes.keys().cloned()) - .collect::>(); - for dn_id in all_failed_dn_ids { - let entry = self.failed_datanodes.entry(dn_id).or_default(); - if let Some(other_errors) = other.failed_datanodes.remove(&dn_id) { - entry.extend(other_errors); - } - } - self.failed_datanodes - .retain(|dn_id, _| !self.per_datanode_reports.contains_key(dn_id)); - } -} /// [`Event`] represents various types of events that can be processed by the gc ticker. /// diff --git a/src/meta-srv/src/gc/tracker.rs b/src/meta-srv/src/gc/tracker.rs index c5f93483a6..836030c89f 100644 --- a/src/meta-srv/src/gc/tracker.rs +++ b/src/meta-srv/src/gc/tracker.rs @@ -30,15 +30,6 @@ pub(crate) struct RegionGcInfo { pub(crate) last_full_listing_time: Option, } -impl RegionGcInfo { - pub(crate) fn new(last_gc_time: Instant) -> Self { - Self { - last_gc_time, - last_full_listing_time: None, - } - } -} - /// Tracks the last GC time for regions to implement cooldown. pub(crate) type RegionGcTracker = HashMap; @@ -46,7 +37,7 @@ impl GcScheduler { /// Clean up stale entries from the region GC tracker if enough time has passed. /// This removes entries for regions that no longer exist in the current table routes. pub(crate) async fn cleanup_tracker_if_needed(&self) -> Result<()> { - let mut last_cleanup = *self.last_tracker_cleanup.lock().await; + let last_cleanup = *self.last_tracker_cleanup.lock().await; let now = Instant::now(); // Check if enough time has passed since last cleanup @@ -85,25 +76,6 @@ impl GcScheduler { Ok(()) } - /// Determine if full file listing should be used for a region based on the last full listing time. - pub(crate) async fn should_use_full_listing(&self, region_id: RegionId) -> bool { - let gc_tracker = self.region_gc_tracker.lock().await; - let now = Instant::now(); - - if let Some(gc_info) = gc_tracker.get(®ion_id) { - if let Some(last_full_listing) = gc_info.last_full_listing_time { - let elapsed = now.saturating_duration_since(last_full_listing); - elapsed >= self.config.full_file_listing_interval - } else { - // Never did full listing for this region, do it now - true - } - } else { - // First time GC for this region, do full listing - true - } - } - pub(crate) async fn update_full_listing_time( &self, region_id: RegionId, diff --git a/src/meta-srv/src/gc/util.rs b/src/meta-srv/src/gc/util.rs new file mode 100644 index 0000000000..f0308a6fc8 --- /dev/null +++ b/src/meta-srv/src/gc/util.rs @@ -0,0 +1,55 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_meta::key::table_route::PhysicalTableRouteValue; +use common_telemetry::warn; +use store_api::storage::RegionId; + +use crate::gc::{Peer2Regions, Region2Peers}; + +pub fn table_route_to_region( + table_route: &PhysicalTableRouteValue, + table_regions: &[RegionId], + region_to_peer: &mut Region2Peers, + peer_to_regions: &mut Peer2Regions, +) { + for ®ion_id in table_regions { + let mut found = false; + + // Find the region in the table route + for region_route in &table_route.region_routes { + if region_route.region.id == region_id + && let Some(leader_peer) = ®ion_route.leader_peer + { + region_to_peer.insert( + region_id, + (leader_peer.clone(), region_route.follower_peers.clone()), + ); + peer_to_regions + .entry(leader_peer.clone()) + .or_default() + .insert(region_id); + found = true; + break; + } + } + + if !found { + warn!( + "Failed to find region {} in table route or no leader peer found", + region_id, + ); + } + } +} diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 3c0b551741..49bd39d3cc 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -81,7 +81,7 @@ mod apply_staging_manifest_test; mod puffin_index; use std::any::Any; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::time::Instant; @@ -303,7 +303,7 @@ impl MitoEngine { pub async fn get_snapshot_of_file_refs( &self, file_handle_regions: impl IntoIterator, - manifest_regions: HashMap>, + related_regions: HashMap>, ) -> Result { let file_ref_mgr = self.file_ref_manager(); @@ -315,15 +315,30 @@ impl MitoEngine { .filter_map(|region_id| self.find_region(region_id)) .collect(); - let related_regions: Vec<(MitoRegionRef, Vec)> = manifest_regions - .into_iter() - .filter_map(|(related_region, queries)| { - self.find_region(related_region).map(|r| (r, queries)) - }) - .collect(); + let dst_region_to_src_regions: Vec<(MitoRegionRef, HashSet)> = { + let dst2src = related_regions + .into_iter() + .flat_map(|(src, dsts)| dsts.into_iter().map(move |dst| (dst, src))) + .fold( + HashMap::>::new(), + |mut acc, (k, v)| { + let entry = acc.entry(k).or_default(); + entry.insert(v); + acc + }, + ); + let mut dst_region_to_src_regions = Vec::with_capacity(dst2src.len()); + for (dst_region, srcs) in dst2src { + let Some(dst_region) = self.find_region(dst_region) else { + continue; + }; + dst_region_to_src_regions.push((dst_region, srcs)); + } + dst_region_to_src_regions + }; file_ref_mgr - .get_snapshot_of_file_refs(query_regions, related_regions) + .get_snapshot_of_file_refs(query_regions, dst_region_to_src_regions) .await } diff --git a/src/mito2/src/gc/worker_test.rs b/src/mito2/src/gc/worker_test.rs index 2d5a835aab..5e064b73b3 100644 --- a/src/mito2/src/gc/worker_test.rs +++ b/src/mito2/src/gc/worker_test.rs @@ -134,6 +134,7 @@ async fn test_gc_worker_basic_truncate() { let file_ref_manifest = FileRefsManifest { file_refs: Default::default(), manifest_version: [(region_id, version)].into(), + cross_region_refs: HashMap::new(), }; let gc_worker = create_gc_worker(&engine, regions, &file_ref_manifest, true).await; let report = gc_worker.run().await.unwrap(); @@ -232,6 +233,7 @@ async fn test_gc_worker_truncate_with_ref() { )] .into(), manifest_version: [(region_id, version)].into(), + cross_region_refs: HashMap::new(), }; let gc_worker = create_gc_worker(&engine, regions, &file_ref_manifest, true).await; let report = gc_worker.run().await.unwrap(); @@ -313,6 +315,7 @@ async fn test_gc_worker_basic_compact() { let file_ref_manifest = FileRefsManifest { file_refs: Default::default(), manifest_version: [(region_id, version)].into(), + cross_region_refs: HashMap::new(), }; let gc_worker = create_gc_worker(&engine, regions, &file_ref_manifest, true).await; @@ -399,6 +402,7 @@ async fn test_gc_worker_compact_with_ref() { .collect(), )]), manifest_version: [(region_id, version)].into(), + cross_region_refs: HashMap::new(), }; let gc_worker = create_gc_worker(&engine, regions, &file_ref_manifest, true).await; diff --git a/src/mito2/src/sst/file_ref.rs b/src/mito2/src/sst/file_ref.rs index b36895f919..384f071d61 100644 --- a/src/mito2/src/sst/file_ref.rs +++ b/src/mito2/src/sst/file_ref.rs @@ -87,7 +87,7 @@ impl FileReferenceManager { pub(crate) async fn get_snapshot_of_file_refs( &self, query_regions_for_mem: Vec, - related_regions_in_manifest: Vec<(MitoRegionRef, Vec)>, + dst_region_to_src_regions: Vec<(MitoRegionRef, HashSet)>, ) -> Result { let mut ref_files = HashMap::new(); // get from in memory file handles @@ -99,12 +99,17 @@ impl FileReferenceManager { let mut manifest_version = HashMap::new(); + let mut cross_region_refs = HashMap::new(); + // get file refs from related regions' manifests - for (related_region, queries) in &related_regions_in_manifest { - let queries = queries.iter().cloned().collect::>(); - let manifest = related_region.manifest_ctx.manifest().await; + for (dst_region, src_regions) in &dst_region_to_src_regions { + let manifest = dst_region.manifest_ctx.manifest().await; for meta in manifest.files.values() { - if queries.contains(&meta.region_id) { + if src_regions.contains(&meta.region_id) { + cross_region_refs + .entry(meta.region_id) + .or_insert_with(HashSet::new) + .insert(dst_region.region_id()); // since gc couldn't happen together with repartition // (both the queries and related_region acquire region read lock), no need to worry about // staging manifest in repartition here. @@ -119,7 +124,7 @@ impl FileReferenceManager { } } // not sure if related region's manifest version is needed, but record it for now. - manifest_version.insert(related_region.region_id(), manifest.manifest_version); + manifest_version.insert(dst_region.region_id(), manifest.manifest_version); } for r in &query_regions_for_mem { @@ -138,6 +143,7 @@ impl FileReferenceManager { Ok(FileRefsManifest { file_refs: ref_files, manifest_version, + cross_region_refs, }) } diff --git a/src/store-api/src/storage/file.rs b/src/store-api/src/storage/file.rs index f55f081224..341a4c301a 100644 --- a/src/store-api/src/storage/file.rs +++ b/src/store-api/src/storage/file.rs @@ -95,8 +95,15 @@ impl FileRef { #[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] pub struct FileRefsManifest { pub file_refs: HashMap>, - /// Manifest version when this manifest is read for it's files + /// Manifest version when this manifest is read for its files pub manifest_version: HashMap, + /// Cross-region file ownership mapping. + /// + /// Key is the source/original region id (before repartition); value is the set of + /// target/destination region ids (after repartition) that currently hold files + /// originally coming from that source region. + /// + pub cross_region_refs: HashMap>, } #[derive(Clone, Default, Debug, PartialEq, Eq, Serialize, Deserialize)] @@ -179,6 +186,8 @@ mod tests { .insert(r1, [FileRef::new(r1, FileId::random(), None)].into()); manifest.manifest_version.insert(r0, 10); manifest.manifest_version.insert(r1, 20); + manifest.cross_region_refs.insert(r0, [r1].into()); + manifest.cross_region_refs.insert(r1, [r0].into()); let json = serde_json::to_string(&manifest).unwrap(); let parsed: FileRefsManifest = serde_json::from_str(&json).unwrap(); diff --git a/tests-integration/src/tests/gc.rs b/tests-integration/src/tests/gc.rs index f627291c4a..6d72eb36db 100644 --- a/tests-integration/src/tests/gc.rs +++ b/tests-integration/src/tests/gc.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::time::Duration; use common_meta::key::TableMetadataManagerRef; @@ -105,8 +105,10 @@ async fn distributed_with_gc(store_type: &StorageType) -> (TestContext, TempDirG #[tokio::test] async fn test_gc_basic_different_store() { + let _ = dotenv::dotenv(); common_telemetry::init_default_ut_logging(); let store_type = StorageType::build_storage_types_based_on_env(); + info!("store type: {:?}", store_type); for store in store_type { if store == StorageType::File { continue; // no point in test gc in fs storage @@ -190,17 +192,16 @@ async fn test_gc_basic(store_type: &StorageType) { assert_eq!(sst_files_after_compaction.len(), 5); // 4 old + 1 new // Step 5: Get table route information for GC procedure - let (region_routes, regions) = + let (_region_routes, regions) = get_table_route(metasrv.table_metadata_manager(), table_id).await; // Step 6: Create and execute BatchGcProcedure let procedure = BatchGcProcedure::new( metasrv.mailbox().clone(), + metasrv.table_metadata_manager().clone(), metasrv.options().grpc.server_addr.clone(), regions.clone(), - false, // full_file_listing - region_routes, - HashMap::new(), // related_regions (empty for this simple test) + false, // full_file_listing Duration::from_secs(10), // timeout );