fix: gc schd parallel send hb instr (#7716)

* fix: parallel send hb instr

Signed-off-by: discord9 <discord9@163.com>

* chore: per review

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-02-25 16:27:40 +08:00
committed by GitHub
parent df04267c54
commit 58123ee5a6

View File

@@ -30,6 +30,7 @@ use common_procedure::{
Result as ProcedureResult, Status,
};
use common_telemetry::{debug, error, info, warn};
use futures::future::join_all;
use itertools::Itertools as _;
use serde::{Deserialize, Serialize};
use snafu::ResultExt as _;
@@ -41,29 +42,7 @@ use crate::gc::util::table_route_to_region;
use crate::gc::{Peer2Regions, Region2Peers};
use crate::handler::HeartbeatMailbox;
use crate::metrics::{METRIC_META_GC_DATANODE_CALLS_TOTAL, METRIC_META_GC_FAILED_REGIONS_TOTAL};
use crate::service::mailbox::{Channel, MailboxRef};
/// Helper function to send GetFileRefs instruction and wait for reply.
async fn send_get_file_refs(
mailbox: &MailboxRef,
server_addr: &str,
peer: &Peer,
instruction: GetFileRefs,
timeout: Duration,
) -> Result<GetFileRefsReply> {
let result = send_get_file_refs_inner(mailbox, server_addr, peer, instruction, timeout).await;
match &result {
Ok(_) => METRIC_META_GC_DATANODE_CALLS_TOTAL
.with_label_values(&["get_file_refs", "success"])
.inc(),
Err(_) => METRIC_META_GC_DATANODE_CALLS_TOTAL
.with_label_values(&["get_file_refs", "error"])
.inc(),
}
result
}
use crate::service::mailbox::{Channel, MailboxReceiver, MailboxRef};
async fn send_get_file_refs_inner(
mailbox: &MailboxRef,
@@ -71,7 +50,7 @@ async fn send_get_file_refs_inner(
peer: &Peer,
instruction: GetFileRefs,
timeout: Duration,
) -> Result<GetFileRefsReply> {
) -> Result<MailboxReceiver> {
let instruction = instruction::Instruction::GetFileRefs(instruction);
let msg = MailboxMessage::json_message(
&format!("Get file references: {}", instruction),
@@ -84,10 +63,15 @@ async fn send_get_file_refs_inner(
input: instruction.to_string(),
})?;
let mailbox_rx = mailbox
mailbox
.send(&Channel::Datanode(peer.id), msg, timeout)
.await?;
.await
}
async fn recv_get_file_refs_reply(
peer: &Peer,
mailbox_rx: MailboxReceiver,
) -> Result<GetFileRefsReply> {
let reply = match mailbox_rx.await {
Ok(reply_msg) => HeartbeatMailbox::json_reply(&reply_msg)?,
Err(e) => {
@@ -110,52 +94,14 @@ async fn send_get_file_refs_inner(
Ok(reply)
}
/// Helper function to send GcRegions instruction and wait for reply.
async fn send_gc_regions(
mailbox: &MailboxRef,
peer: &Peer,
gc_regions: GcRegions,
server_addr: &str,
timeout: Duration,
description: &str,
) -> Result<GcReport> {
let failed_region_count = gc_regions.regions.len() as u64;
let result =
send_gc_regions_inner(mailbox, peer, gc_regions, server_addr, timeout, description).await;
match result {
Ok(report) => {
METRIC_META_GC_DATANODE_CALLS_TOTAL
.with_label_values(&["gc_regions", "success"])
.inc();
let need_retry_count = report.need_retry_regions.len() as u64;
if need_retry_count > 0 {
METRIC_META_GC_FAILED_REGIONS_TOTAL.inc_by(need_retry_count);
}
Ok(report)
}
Err(e) => {
METRIC_META_GC_DATANODE_CALLS_TOTAL
.with_label_values(&["gc_regions", "error"])
.inc();
if failed_region_count > 0 {
METRIC_META_GC_FAILED_REGIONS_TOTAL.inc_by(failed_region_count);
}
Err(e)
}
}
}
async fn send_gc_regions_inner(
mailbox: &MailboxRef,
peer: &Peer,
gc_regions: GcRegions,
gc_regions: &GcRegions,
server_addr: &str,
timeout: Duration,
description: &str,
) -> Result<GcReport> {
) -> Result<MailboxReceiver> {
let instruction = instruction::Instruction::GcRegions(gc_regions.clone());
let msg = MailboxMessage::json_message(
&format!("{}: {}", description, instruction),
@@ -168,10 +114,17 @@ async fn send_gc_regions_inner(
input: instruction.to_string(),
})?;
let mailbox_rx = mailbox
mailbox
.send(&Channel::Datanode(peer.id), msg, timeout)
.await?;
.await
}
async fn recv_gc_regions_reply(
peer: &Peer,
gc_regions: &GcRegions,
description: &str,
mailbox_rx: MailboxReceiver,
) -> Result<GcReport> {
let reply = match mailbox_rx.await {
Ok(reply_msg) => HeartbeatMailbox::json_reply(&reply_msg)?,
Err(e) => {
@@ -636,6 +589,10 @@ impl BatchGcProcedure {
peers.extend(datanode2query_regions.keys().cloned());
peers.extend(datanode2related_regions.keys().cloned());
let mailbox = &self.mailbox;
let server_addr = &self.data.server_addr;
let mut tasks = Vec::new();
for peer in peers {
let regions = datanode2query_regions.remove(&peer).unwrap_or_default();
let related_regions_for_peer =
@@ -645,33 +602,75 @@ impl BatchGcProcedure {
continue;
}
let instruction = GetFileRefs {
query_regions: regions.clone(),
related_regions: related_regions_for_peer.clone(),
};
tasks.push(async move {
let instruction = GetFileRefs {
query_regions: regions.clone(),
related_regions: related_regions_for_peer.clone(),
};
let reply = send_get_file_refs(
&self.mailbox,
&self.data.server_addr,
&peer,
instruction,
timeout,
)
.await?;
let reply =
send_get_file_refs_inner(mailbox, server_addr, &peer, instruction, timeout)
.await;
(peer, regions, related_regions_for_peer, reply)
});
}
let mut recv_tasks = Vec::new();
// store error to make sure metrics doesn't ignore other peers
let mut first_error = None;
let mut record_get_file_refs_error = |e| {
METRIC_META_GC_DATANODE_CALLS_TOTAL
.with_label_values(&["get_file_refs", "error"])
.inc();
if first_error.is_none() {
first_error = Some(e);
}
};
for (peer, regions, related_regions_for_peer, reply) in join_all(tasks).await {
match reply {
Ok(mailbox_rx) => {
recv_tasks.push(async move {
let reply = recv_get_file_refs_reply(&peer, mailbox_rx).await;
(peer, regions, related_regions_for_peer, reply)
});
}
Err(e) => record_get_file_refs_error(e),
}
}
let replies = join_all(recv_tasks).await;
for (peer, regions, related_regions_for_peer, reply) in replies {
let reply = match reply {
Ok(reply) => reply,
Err(e) => {
record_get_file_refs_error(e);
continue;
}
};
debug!(
"Got file references from datanode: {:?}, query_regions: {:?}, related_regions: {:?}, reply: {:?}",
peer, regions, related_regions_for_peer, reply
);
if !reply.success {
return error::UnexpectedSnafu {
METRIC_META_GC_DATANODE_CALLS_TOTAL
.with_label_values(&["get_file_refs", "error"])
.inc();
let err = error::UnexpectedSnafu {
violated: format!(
"Failed to get file references from datanode {}: {:?}",
peer, reply.error
),
}
.fail();
.build();
record_get_file_refs_error(err);
continue;
}
METRIC_META_GC_DATANODE_CALLS_TOTAL
.with_label_values(&["get_file_refs", "success"])
.inc();
// Merge the file references from this datanode
for (region_id, file_refs) in reply.file_refs_manifest.file_refs {
@@ -695,6 +694,10 @@ impl BatchGcProcedure {
}
}
if let Some(e) = first_error {
return Err(e);
}
Ok(FileRefsManifest {
file_refs: all_file_refs,
manifest_version: all_manifest_versions,
@@ -731,24 +734,80 @@ impl BatchGcProcedure {
}
let mut all_need_retry = HashSet::new();
// Send GC instructions to each datanode
for (peer, regions_for_peer) in datanode2regions {
let gc_regions = GcRegions {
regions: regions_for_peer.clone(),
// file_refs_manifest can be large; cloning for each datanode is acceptable here since this is an admin-only operation.
file_refs_manifest: file_refs.clone(),
full_file_listing: self.data.full_file_listing,
};
let mailbox = &self.mailbox;
let server_addr = self.data.server_addr.as_str();
let full_file_listing = self.data.full_file_listing;
let tasks = datanode2regions
.into_iter()
.map(|(peer, regions_for_peer)| {
let gc_regions = GcRegions {
regions: regions_for_peer.clone(),
// file_refs_manifest could be somewhere large. But still intentionally clone per datanode here:
// this path is admin-triggered or scheduler-triggered, peer count is expected to be bounded, and
// and abnormal manifest growth should be addressed at the source
file_refs_manifest: file_refs.clone(),
full_file_listing,
};
let region_count = gc_regions.regions.len() as u64;
let report = send_gc_regions(
&self.mailbox,
&peer,
gc_regions,
self.data.server_addr.as_str(),
timeout,
"Batch GC",
)
.await?;
async move {
let report = send_gc_regions_inner(
mailbox,
&peer,
&gc_regions,
server_addr,
timeout,
"Batch GC",
)
.await;
(peer, gc_regions, region_count, report)
}
});
let mut recv_tasks = Vec::new();
let mut first_error = None;
let mut record_gc_error = |e, region_count| {
METRIC_META_GC_DATANODE_CALLS_TOTAL
.with_label_values(&["gc_regions", "error"])
.inc();
if region_count > 0 {
METRIC_META_GC_FAILED_REGIONS_TOTAL.inc_by(region_count);
}
if first_error.is_none() {
first_error = Some(e);
}
};
for (peer, gc_regions, region_count, report) in join_all(tasks).await {
match report {
Ok(mailbox_rx) => {
recv_tasks.push(async move {
let report =
recv_gc_regions_reply(&peer, &gc_regions, "Batch GC", mailbox_rx).await;
(peer, region_count, report)
});
}
Err(e) => record_gc_error(e, region_count),
}
}
for (peer, region_count, report) in join_all(recv_tasks).await {
let report = match report {
Ok(report) => {
METRIC_META_GC_DATANODE_CALLS_TOTAL
.with_label_values(&["gc_regions", "success"])
.inc();
let need_retry_count = report.need_retry_regions.len() as u64;
if need_retry_count > 0 {
METRIC_META_GC_FAILED_REGIONS_TOTAL.inc_by(need_retry_count);
}
report
}
Err(e) => {
record_gc_error(e, region_count);
continue;
}
};
let success = report.deleted_files.keys().collect_vec();
let need_retry = report.need_retry_regions.iter().cloned().collect_vec();
@@ -768,6 +827,10 @@ impl BatchGcProcedure {
all_report.merge(report);
}
if let Some(e) = first_error {
return Err(e);
}
if !all_need_retry.is_empty() {
warn!("Regions need retry after batch GC: {:?}", all_need_retry);
}