diff --git a/src/meta-srv/src/gc/procedure.rs b/src/meta-srv/src/gc/procedure.rs index bf56a2f4cd..1302191be8 100644 --- a/src/meta-srv/src/gc/procedure.rs +++ b/src/meta-srv/src/gc/procedure.rs @@ -30,6 +30,7 @@ use common_procedure::{ Result as ProcedureResult, Status, }; use common_telemetry::{debug, error, info, warn}; +use futures::future::join_all; use itertools::Itertools as _; use serde::{Deserialize, Serialize}; use snafu::ResultExt as _; @@ -41,29 +42,7 @@ use crate::gc::util::table_route_to_region; use crate::gc::{Peer2Regions, Region2Peers}; use crate::handler::HeartbeatMailbox; use crate::metrics::{METRIC_META_GC_DATANODE_CALLS_TOTAL, METRIC_META_GC_FAILED_REGIONS_TOTAL}; -use crate::service::mailbox::{Channel, MailboxRef}; - -/// Helper function to send GetFileRefs instruction and wait for reply. -async fn send_get_file_refs( - mailbox: &MailboxRef, - server_addr: &str, - peer: &Peer, - instruction: GetFileRefs, - timeout: Duration, -) -> Result { - let result = send_get_file_refs_inner(mailbox, server_addr, peer, instruction, timeout).await; - - match &result { - Ok(_) => METRIC_META_GC_DATANODE_CALLS_TOTAL - .with_label_values(&["get_file_refs", "success"]) - .inc(), - Err(_) => METRIC_META_GC_DATANODE_CALLS_TOTAL - .with_label_values(&["get_file_refs", "error"]) - .inc(), - } - - result -} +use crate::service::mailbox::{Channel, MailboxReceiver, MailboxRef}; async fn send_get_file_refs_inner( mailbox: &MailboxRef, @@ -71,7 +50,7 @@ async fn send_get_file_refs_inner( peer: &Peer, instruction: GetFileRefs, timeout: Duration, -) -> Result { +) -> Result { let instruction = instruction::Instruction::GetFileRefs(instruction); let msg = MailboxMessage::json_message( &format!("Get file references: {}", instruction), @@ -84,10 +63,15 @@ async fn send_get_file_refs_inner( input: instruction.to_string(), })?; - let mailbox_rx = mailbox + mailbox .send(&Channel::Datanode(peer.id), msg, timeout) - .await?; + .await +} +async fn recv_get_file_refs_reply( + peer: &Peer, + mailbox_rx: MailboxReceiver, +) -> Result { let reply = match mailbox_rx.await { Ok(reply_msg) => HeartbeatMailbox::json_reply(&reply_msg)?, Err(e) => { @@ -110,52 +94,14 @@ async fn send_get_file_refs_inner( Ok(reply) } -/// Helper function to send GcRegions instruction and wait for reply. -async fn send_gc_regions( - mailbox: &MailboxRef, - peer: &Peer, - gc_regions: GcRegions, - server_addr: &str, - timeout: Duration, - description: &str, -) -> Result { - let failed_region_count = gc_regions.regions.len() as u64; - let result = - send_gc_regions_inner(mailbox, peer, gc_regions, server_addr, timeout, description).await; - - match result { - Ok(report) => { - METRIC_META_GC_DATANODE_CALLS_TOTAL - .with_label_values(&["gc_regions", "success"]) - .inc(); - - let need_retry_count = report.need_retry_regions.len() as u64; - if need_retry_count > 0 { - METRIC_META_GC_FAILED_REGIONS_TOTAL.inc_by(need_retry_count); - } - - Ok(report) - } - Err(e) => { - METRIC_META_GC_DATANODE_CALLS_TOTAL - .with_label_values(&["gc_regions", "error"]) - .inc(); - if failed_region_count > 0 { - METRIC_META_GC_FAILED_REGIONS_TOTAL.inc_by(failed_region_count); - } - Err(e) - } - } -} - async fn send_gc_regions_inner( mailbox: &MailboxRef, peer: &Peer, - gc_regions: GcRegions, + gc_regions: &GcRegions, server_addr: &str, timeout: Duration, description: &str, -) -> Result { +) -> Result { let instruction = instruction::Instruction::GcRegions(gc_regions.clone()); let msg = MailboxMessage::json_message( &format!("{}: {}", description, instruction), @@ -168,10 +114,17 @@ async fn send_gc_regions_inner( input: instruction.to_string(), })?; - let mailbox_rx = mailbox + mailbox .send(&Channel::Datanode(peer.id), msg, timeout) - .await?; + .await +} +async fn recv_gc_regions_reply( + peer: &Peer, + gc_regions: &GcRegions, + description: &str, + mailbox_rx: MailboxReceiver, +) -> Result { let reply = match mailbox_rx.await { Ok(reply_msg) => HeartbeatMailbox::json_reply(&reply_msg)?, Err(e) => { @@ -636,6 +589,10 @@ impl BatchGcProcedure { peers.extend(datanode2query_regions.keys().cloned()); peers.extend(datanode2related_regions.keys().cloned()); + let mailbox = &self.mailbox; + let server_addr = &self.data.server_addr; + let mut tasks = Vec::new(); + for peer in peers { let regions = datanode2query_regions.remove(&peer).unwrap_or_default(); let related_regions_for_peer = @@ -645,33 +602,75 @@ impl BatchGcProcedure { continue; } - let instruction = GetFileRefs { - query_regions: regions.clone(), - related_regions: related_regions_for_peer.clone(), - }; + tasks.push(async move { + let instruction = GetFileRefs { + query_regions: regions.clone(), + related_regions: related_regions_for_peer.clone(), + }; - let reply = send_get_file_refs( - &self.mailbox, - &self.data.server_addr, - &peer, - instruction, - timeout, - ) - .await?; + let reply = + send_get_file_refs_inner(mailbox, server_addr, &peer, instruction, timeout) + .await; + + (peer, regions, related_regions_for_peer, reply) + }); + } + + let mut recv_tasks = Vec::new(); + // store error to make sure metrics doesn't ignore other peers + let mut first_error = None; + let mut record_get_file_refs_error = |e| { + METRIC_META_GC_DATANODE_CALLS_TOTAL + .with_label_values(&["get_file_refs", "error"]) + .inc(); + if first_error.is_none() { + first_error = Some(e); + } + }; + for (peer, regions, related_regions_for_peer, reply) in join_all(tasks).await { + match reply { + Ok(mailbox_rx) => { + recv_tasks.push(async move { + let reply = recv_get_file_refs_reply(&peer, mailbox_rx).await; + (peer, regions, related_regions_for_peer, reply) + }); + } + Err(e) => record_get_file_refs_error(e), + } + } + + let replies = join_all(recv_tasks).await; + + for (peer, regions, related_regions_for_peer, reply) in replies { + let reply = match reply { + Ok(reply) => reply, + Err(e) => { + record_get_file_refs_error(e); + continue; + } + }; debug!( "Got file references from datanode: {:?}, query_regions: {:?}, related_regions: {:?}, reply: {:?}", peer, regions, related_regions_for_peer, reply ); if !reply.success { - return error::UnexpectedSnafu { + METRIC_META_GC_DATANODE_CALLS_TOTAL + .with_label_values(&["get_file_refs", "error"]) + .inc(); + let err = error::UnexpectedSnafu { violated: format!( "Failed to get file references from datanode {}: {:?}", peer, reply.error ), } - .fail(); + .build(); + record_get_file_refs_error(err); + continue; } + METRIC_META_GC_DATANODE_CALLS_TOTAL + .with_label_values(&["get_file_refs", "success"]) + .inc(); // Merge the file references from this datanode for (region_id, file_refs) in reply.file_refs_manifest.file_refs { @@ -695,6 +694,10 @@ impl BatchGcProcedure { } } + if let Some(e) = first_error { + return Err(e); + } + Ok(FileRefsManifest { file_refs: all_file_refs, manifest_version: all_manifest_versions, @@ -731,24 +734,80 @@ impl BatchGcProcedure { } let mut all_need_retry = HashSet::new(); - // Send GC instructions to each datanode - for (peer, regions_for_peer) in datanode2regions { - let gc_regions = GcRegions { - regions: regions_for_peer.clone(), - // file_refs_manifest can be large; cloning for each datanode is acceptable here since this is an admin-only operation. - file_refs_manifest: file_refs.clone(), - full_file_listing: self.data.full_file_listing, - }; + let mailbox = &self.mailbox; + let server_addr = self.data.server_addr.as_str(); + let full_file_listing = self.data.full_file_listing; + let tasks = datanode2regions + .into_iter() + .map(|(peer, regions_for_peer)| { + let gc_regions = GcRegions { + regions: regions_for_peer.clone(), + // file_refs_manifest could be somewhere large. But still intentionally clone per datanode here: + // this path is admin-triggered or scheduler-triggered, peer count is expected to be bounded, and + // and abnormal manifest growth should be addressed at the source + file_refs_manifest: file_refs.clone(), + full_file_listing, + }; + let region_count = gc_regions.regions.len() as u64; - let report = send_gc_regions( - &self.mailbox, - &peer, - gc_regions, - self.data.server_addr.as_str(), - timeout, - "Batch GC", - ) - .await?; + async move { + let report = send_gc_regions_inner( + mailbox, + &peer, + &gc_regions, + server_addr, + timeout, + "Batch GC", + ) + .await; + + (peer, gc_regions, region_count, report) + } + }); + + let mut recv_tasks = Vec::new(); + let mut first_error = None; + let mut record_gc_error = |e, region_count| { + METRIC_META_GC_DATANODE_CALLS_TOTAL + .with_label_values(&["gc_regions", "error"]) + .inc(); + if region_count > 0 { + METRIC_META_GC_FAILED_REGIONS_TOTAL.inc_by(region_count); + } + if first_error.is_none() { + first_error = Some(e); + } + }; + for (peer, gc_regions, region_count, report) in join_all(tasks).await { + match report { + Ok(mailbox_rx) => { + recv_tasks.push(async move { + let report = + recv_gc_regions_reply(&peer, &gc_regions, "Batch GC", mailbox_rx).await; + (peer, region_count, report) + }); + } + Err(e) => record_gc_error(e, region_count), + } + } + + for (peer, region_count, report) in join_all(recv_tasks).await { + let report = match report { + Ok(report) => { + METRIC_META_GC_DATANODE_CALLS_TOTAL + .with_label_values(&["gc_regions", "success"]) + .inc(); + let need_retry_count = report.need_retry_regions.len() as u64; + if need_retry_count > 0 { + METRIC_META_GC_FAILED_REGIONS_TOTAL.inc_by(need_retry_count); + } + report + } + Err(e) => { + record_gc_error(e, region_count); + continue; + } + }; let success = report.deleted_files.keys().collect_vec(); let need_retry = report.need_retry_regions.iter().cloned().collect_vec(); @@ -768,6 +827,10 @@ impl BatchGcProcedure { all_report.merge(report); } + if let Some(e) = first_error { + return Err(e); + } + if !all_need_retry.is_empty() { warn!("Regions need retry after batch GC: {:?}", all_need_retry); }