diff --git a/src/meta-srv/src/gc/ctx.rs b/src/meta-srv/src/gc/ctx.rs index 8583b6375b..8ce83be362 100644 --- a/src/meta-srv/src/gc/ctx.rs +++ b/src/meta-srv/src/gc/ctx.rs @@ -32,7 +32,7 @@ use table::metadata::TableId; use crate::cluster::MetaPeerClientRef; use crate::error::{self, Result, TableMetadataManagerSnafu, UnexpectedSnafu}; use crate::gc::Region2Peers; -use crate::gc::procedure::GcRegionProcedure; +use crate::gc::procedure::{BatchGcProcedure, GcRegionProcedure}; use crate::handler::HeartbeatMailbox; use crate::service::mailbox::{Channel, MailboxRef}; @@ -101,7 +101,26 @@ impl DefaultGcSchedulerCtx { ) .context(error::RegisterProcedureLoaderSnafu { type_name: GcRegionProcedure::TYPE_NAME, - }); + })?; + + // register a noop loader for `BatchGcProcedure` to avoid error when deserializing procedure when rebooting + + procedure_manager + .register_loader( + BatchGcProcedure::TYPE_NAME, + Box::new(move |json| { + common_procedure::error::ProcedureLoaderNotImplementedSnafu { + type_name: BatchGcProcedure::TYPE_NAME.to_string(), + reason: + "Batch GC procedure should not be reloaded from storage, as it doesn't need to be retried if interrupted" + .to_string(), + } + .fail() + }), + ) + .context(error::RegisterProcedureLoaderSnafu { + type_name: BatchGcProcedure::TYPE_NAME, + })?; Ok(Self { table_metadata_manager, diff --git a/src/meta-srv/src/gc/procedure.rs b/src/meta-srv/src/gc/procedure.rs index 8a6a9ccff4..4ddd606630 100644 --- a/src/meta-srv/src/gc/procedure.rs +++ b/src/meta-srv/src/gc/procedure.rs @@ -13,11 +13,12 @@ // limitations under the License. use std::any::Any; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::time::Duration; use api::v1::meta::MailboxMessage; -use common_meta::instruction::{self, GcRegions, InstructionReply}; +use common_meta::instruction::{self, GcRegions, GetFileRefs, GetFileRefsReply, InstructionReply}; use common_meta::lock_key::RegionLock; use common_meta::peer::Peer; use common_procedure::error::ToJsonSnafu; @@ -25,16 +26,126 @@ use common_procedure::{ Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, Result as ProcedureResult, Status, }; -use common_telemetry::error; +use common_telemetry::{debug, error, info, warn}; use itertools::Itertools as _; use serde::{Deserialize, Serialize}; use snafu::ResultExt as _; -use store_api::storage::GcReport; +use store_api::storage::{FileRefsManifest, GcReport, RegionId}; use crate::error::{self, Result, SerializeToJsonSnafu}; +use crate::gc::Region2Peers; use crate::handler::HeartbeatMailbox; use crate::service::mailbox::{Channel, MailboxRef}; +/// Helper function to send GetFileRefs instruction and wait for reply. +async fn send_get_file_refs( + mailbox: &MailboxRef, + server_addr: &str, + peer: &Peer, + instruction: GetFileRefs, + timeout: Duration, +) -> Result { + let instruction = instruction::Instruction::GetFileRefs(instruction); + let msg = MailboxMessage::json_message( + &format!("Get file references: {}", instruction), + &format!("Metasrv@{}", server_addr), + &format!("Datanode-{}@{}", peer.id, peer.addr), + common_time::util::current_time_millis(), + &instruction, + ) + .with_context(|_| SerializeToJsonSnafu { + input: instruction.to_string(), + })?; + + let mailbox_rx = mailbox + .send(&Channel::Datanode(peer.id), msg, timeout) + .await?; + + let reply = match mailbox_rx.await { + Ok(reply_msg) => HeartbeatMailbox::json_reply(&reply_msg)?, + Err(e) => { + error!( + "Failed to receive reply from datanode {} for GetFileRefs: {}", + peer, e + ); + return Err(e); + } + }; + + let InstructionReply::GetFileRefs(reply) = reply else { + return error::UnexpectedInstructionReplySnafu { + mailbox_message: format!("{:?}", reply), + reason: "Unexpected reply of the GetFileRefs instruction", + } + .fail(); + }; + + Ok(reply) +} + +/// Helper function to send GcRegions instruction and wait for reply. +async fn send_gc_regions( + mailbox: &MailboxRef, + peer: &Peer, + gc_regions: GcRegions, + server_addr: &str, + timeout: Duration, + description: &str, +) -> Result { + let instruction = instruction::Instruction::GcRegions(gc_regions.clone()); + let msg = MailboxMessage::json_message( + &format!("{}: {}", description, instruction), + &format!("Metasrv@{}", server_addr), + &format!("Datanode-{}@{}", peer.id, peer.addr), + common_time::util::current_time_millis(), + &instruction, + ) + .with_context(|_| SerializeToJsonSnafu { + input: instruction.to_string(), + })?; + + let mailbox_rx = mailbox + .send(&Channel::Datanode(peer.id), msg, timeout) + .await?; + + let reply = match mailbox_rx.await { + Ok(reply_msg) => HeartbeatMailbox::json_reply(&reply_msg)?, + Err(e) => { + error!( + "Failed to receive reply from datanode {} for {}: {}", + peer, description, e + ); + return Err(e); + } + }; + + let InstructionReply::GcRegions(reply) = reply else { + return error::UnexpectedInstructionReplySnafu { + mailbox_message: format!("{:?}", reply), + reason: "Unexpected reply of the GcRegions instruction", + } + .fail(); + }; + + let res = reply.result; + match res { + Ok(report) => Ok(report), + Err(e) => { + error!( + "Datanode {} reported error during GC for regions {:?}: {}", + peer, gc_regions, e + ); + error::UnexpectedSnafu { + violated: format!( + "Datanode {} reported error during GC for regions {:?}: {}", + peer, gc_regions, e + ), + } + .fail() + } + } +} + /// TODO(discord9): another procedure which do both get file refs and gc regions. pub struct GcRegionProcedure { mailbox: MailboxRef, @@ -74,60 +185,15 @@ impl GcRegionProcedure { } async fn send_gc_instr(&self) -> Result { - let peer = &self.data.peer; - let instruction = instruction::Instruction::GcRegions(self.data.gc_regions.clone()); - let msg = MailboxMessage::json_message( - &format!("{}: {}", self.data.description, instruction), - &format!("Metasrv@{}", self.data.server_addr), - &format!("Datanode-{}@{}", peer.id, peer.addr), - common_time::util::current_time_millis(), - &instruction, + send_gc_regions( + &self.mailbox, + &self.data.peer, + self.data.gc_regions.clone(), + &self.data.server_addr, + self.data.timeout, + &self.data.description, ) - .with_context(|_| SerializeToJsonSnafu { - input: instruction.to_string(), - })?; - - let mailbox_rx = self - .mailbox - .send(&Channel::Datanode(peer.id), msg, self.data.timeout) - .await?; - - let reply = match mailbox_rx.await { - Ok(reply_msg) => HeartbeatMailbox::json_reply(&reply_msg)?, - Err(e) => { - error!( - "Failed to receive reply from datanode {} for {}: {}", - peer, self.data.description, e - ); - return Err(e); - } - }; - - let InstructionReply::GcRegions(reply) = reply else { - return error::UnexpectedInstructionReplySnafu { - mailbox_message: format!("{:?}", reply), - reason: "Unexpected reply of the GcRegions instruction", - } - .fail(); - }; - - let res = reply.result; - match res { - Ok(report) => Ok(report), - Err(e) => { - error!( - "Datanode {} reported error during GC for regions {:?}: {}", - peer, self.data.gc_regions, e - ); - Err(error::UnexpectedSnafu { - violated: format!( - "Datanode {} reported error during GC for regions {:?}: {}", - peer, self.data.gc_regions, e - ), - } - .fail()?) - } - } + .await } pub fn cast_result(res: Arc) -> Result { @@ -164,10 +230,10 @@ impl Procedure for GcRegionProcedure { serde_json::to_string(&self.data).context(ToJsonSnafu) } - /// Write lock all regions involved in this GC procedure. + /// Read lock all regions involved in this GC procedure. /// So i.e. region migration won't happen during GC and cause race conditions. /// - /// only write lock the regions not catatlog/schema because it can run concurrently with other procedures(i.e. drop database/table) + /// only read lock the regions not catatlog/schema because it can run concurrently with other procedures(i.e. drop database/table) /// TODO:(discord9): integration test to verify this fn lock_key(&self) -> LockKey { let lock_key: Vec<_> = self @@ -182,3 +248,297 @@ impl Procedure for GcRegionProcedure { LockKey::new(lock_key) } } + +/// Procedure to perform get file refs then batch GC for multiple regions, should only be used by admin function +/// for triggering manual gc, as it holds locks for too long and for all regions during the procedure. +pub struct BatchGcProcedure { + mailbox: MailboxRef, + data: BatchGcData, +} + +#[derive(Serialize, Deserialize)] +pub struct BatchGcData { + state: State, + server_addr: String, + /// The regions to be GC-ed + regions: Vec, + full_file_listing: bool, + region_routes: Region2Peers, + /// Related regions (e.g., for shared files). Map: RegionId -> List of related RegionIds. + related_regions: HashMap>, + /// Acquired file references (Populated in Acquiring state) + file_refs: FileRefsManifest, + /// mailbox timeout duration + timeout: Duration, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub enum State { + /// Initial state + Start, + /// Fetching file references from datanodes + Acquiring, + /// Sending GC instruction to the target datanode + Gcing, +} + +impl BatchGcProcedure { + pub const TYPE_NAME: &'static str = "metasrv-procedure::BatchGcProcedure"; + + pub fn new( + mailbox: MailboxRef, + server_addr: String, + regions: Vec, + full_file_listing: bool, + region_routes: Region2Peers, + related_regions: HashMap>, + timeout: Duration, + ) -> Self { + Self { + mailbox, + data: BatchGcData { + state: State::Start, + server_addr, + regions, + full_file_listing, + region_routes, + related_regions, + file_refs: FileRefsManifest::default(), + timeout, + }, + } + } + + /// Get file references from all datanodes that host the regions + async fn get_file_references(&self) -> Result { + use std::collections::{HashMap, HashSet}; + + let query_regions = &self.data.regions; + let related_regions = &self.data.related_regions; + let region_routes = &self.data.region_routes; + let timeout = self.data.timeout; + + // Group regions by datanode to minimize RPC calls + let mut datanode2query_regions: HashMap> = HashMap::new(); + + for region_id in query_regions { + if let Some((leader, followers)) = region_routes.get(region_id) { + datanode2query_regions + .entry(leader.clone()) + .or_default() + .push(*region_id); + // also need to send for follower regions for file refs in case query is running on follower + for follower in followers { + datanode2query_regions + .entry(follower.clone()) + .or_default() + .push(*region_id); + } + } else { + return error::UnexpectedSnafu { + violated: format!( + "region_routes: {region_routes:?} does not contain region_id: {region_id}", + ), + } + .fail(); + } + } + + let mut datanode2related_regions: HashMap>> = + HashMap::new(); + for (related_region, queries) in related_regions { + if let Some((leader, _followers)) = region_routes.get(related_region) { + datanode2related_regions + .entry(leader.clone()) + .or_default() + .insert(*related_region, queries.clone()); + } // since read from manifest, no need to send to followers + } + + // Send GetFileRefs instructions to each datanode + let mut all_file_refs: HashMap> = + HashMap::new(); + let mut all_manifest_versions = HashMap::new(); + + for (peer, regions) in datanode2query_regions { + let related_regions_for_peer = + datanode2related_regions.remove(&peer).unwrap_or_default(); + + let instruction = GetFileRefs { + query_regions: regions.clone(), + related_regions: related_regions_for_peer, + }; + + let reply = send_get_file_refs( + &self.mailbox, + &self.data.server_addr, + &peer, + instruction, + timeout, + ) + .await?; + + if !reply.success { + return error::UnexpectedSnafu { + violated: format!( + "Failed to get file references from datanode {}: {:?}", + peer, reply.error + ), + } + .fail(); + } + + // Merge the file references from this datanode + for (region_id, file_refs) in reply.file_refs_manifest.file_refs { + all_file_refs + .entry(region_id) + .or_default() + .extend(file_refs); + } + + // region manifest version should be the smallest one among all peers, so outdated region can be detected + for (region_id, version) in reply.file_refs_manifest.manifest_version { + let entry = all_manifest_versions.entry(region_id).or_insert(version); + *entry = (*entry).min(version); + } + } + + Ok(FileRefsManifest { + file_refs: all_file_refs, + manifest_version: all_manifest_versions, + }) + } + + /// Send GC instruction to all datanodes that host the regions, + /// returns regions that need retry. + async fn send_gc_instructions(&self) -> Result> { + let regions = &self.data.regions; + let region_routes = &self.data.region_routes; + let file_refs = &self.data.file_refs; + let timeout = self.data.timeout; + + // Group regions by datanode + let mut datanode2regions: HashMap> = HashMap::new(); + + for region_id in regions { + if let Some((leader, _followers)) = region_routes.get(region_id) { + datanode2regions + .entry(leader.clone()) + .or_default() + .push(*region_id); + } else { + return error::UnexpectedSnafu { + violated: format!( + "region_routes: {region_routes:?} does not contain region_id: {region_id}", + ), + } + .fail(); + } + } + + let mut all_need_retry = HashSet::new(); + // Send GC instructions to each datanode + for (peer, regions_for_peer) in datanode2regions { + let gc_regions = GcRegions { + regions: regions_for_peer.clone(), + // file_refs_manifest can be large; cloning for each datanode is acceptable here since this is an admin-only operation. + file_refs_manifest: file_refs.clone(), + full_file_listing: self.data.full_file_listing, + }; + + let report = send_gc_regions( + &self.mailbox, + &peer, + gc_regions, + self.data.server_addr.as_str(), + timeout, + "Batch GC", + ) + .await?; + + let success = report.deleted_files.keys().collect_vec(); + let need_retry = report.need_retry_regions.iter().cloned().collect_vec(); + + if need_retry.is_empty() { + info!( + "GC report from datanode {}: successfully deleted files for regions {:?}", + peer, success + ); + } else { + warn!( + "GC report from datanode {}: successfully deleted files for regions {:?}, need retry for regions {:?}", + peer, success, need_retry + ); + } + all_need_retry.extend(report.need_retry_regions); + } + + Ok(all_need_retry.into_iter().collect()) + } +} + +#[async_trait::async_trait] +impl Procedure for BatchGcProcedure { + fn type_name(&self) -> &str { + Self::TYPE_NAME + } + + async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { + match self.data.state { + State::Start => { + // Transition to Acquiring state + self.data.state = State::Acquiring; + Ok(Status::executing(false)) + } + State::Acquiring => { + // Get file references from all datanodes + match self.get_file_references().await { + Ok(file_refs) => { + self.data.file_refs = file_refs; + self.data.state = State::Gcing; + Ok(Status::executing(false)) + } + Err(e) => { + error!("Failed to get file references: {}", e); + Err(ProcedureError::external(e)) + } + } + } + State::Gcing => { + // Send GC instructions to all datanodes + // TODO(discord9): handle need-retry regions + match self.send_gc_instructions().await { + Ok(_) => { + info!( + "Batch GC completed successfully for regions {:?}", + self.data.regions + ); + Ok(Status::done()) + } + Err(e) => { + error!("Failed to send GC instructions: {}", e); + Err(ProcedureError::external(e)) + } + } + } + } + } + + fn dump(&self) -> ProcedureResult { + serde_json::to_string(&self.data).context(ToJsonSnafu) + } + + /// Read lock all regions involved in this GC procedure. + /// So i.e. region migration won't happen during GC and cause race conditions. + fn lock_key(&self) -> LockKey { + let lock_key: Vec<_> = self + .data + .regions + .iter() + .sorted() // sort to have a deterministic lock order + .map(|id| RegionLock::Read(*id).into()) + .collect(); + + LockKey::new(lock_key) + } +} diff --git a/src/mito2/src/gc.rs b/src/mito2/src/gc.rs index 04d59f2d99..c61b569cef 100644 --- a/src/mito2/src/gc.rs +++ b/src/mito2/src/gc.rs @@ -208,10 +208,6 @@ impl LocalGcWorker { } /// Get tmp ref files for all current regions - /// - /// Outdated regions are added to `outdated_regions` set, which means their manifest version in - /// self.file_ref_manifest is older than the current manifest version on datanode. - /// so they need to retry GC later by metasrv with updated tmp ref files. pub async fn read_tmp_ref_files(&self) -> Result>> { let mut tmp_ref_files = HashMap::new(); for (region_id, file_refs) in &self.file_ref_manifest.file_refs {