chore: tracing for gc (#7723)

* chore: tracing for gc

Signed-off-by: discord9 <discord9@163.com>

* chore: rm some clone

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-02-26 14:37:29 +08:00
committed by GitHub
parent 0f6b8ff815
commit 46683f908a
4 changed files with 197 additions and 44 deletions

View File

@@ -18,6 +18,7 @@ use std::time::Instant;
use common_catalog::consts::MITO_ENGINE;
use common_meta::datanode::{RegionManifestInfo, RegionStat};
use common_meta::peer::Peer;
use common_telemetry::tracing::Instrument as _;
use common_telemetry::{debug, error, info, warn};
use futures::StreamExt;
use itertools::Itertools;
@@ -40,18 +41,40 @@ impl GcScheduler {
info!("Starting GC cycle");
// limit gc region scope to regions whose datanode have reported stats(by heartbeat)
let table_to_region_stats = self.ctx.get_table_to_region_stats().await?;
let table_to_region_stats = self
.ctx
.get_table_to_region_stats()
.instrument(common_telemetry::tracing::info_span!(
"meta_gc_get_region_stats"
))
.await?;
info!(
"Fetched region stats for {} tables",
table_to_region_stats.len()
);
let per_table_candidates = self.select_gc_candidates(&table_to_region_stats).await?;
let per_table_candidates = self
.select_gc_candidates(&table_to_region_stats)
.instrument(common_telemetry::tracing::info_span!(
"meta_gc_select_candidates"
))
.await?;
let table_reparts = self.ctx.get_table_reparts().await?;
let table_reparts = self
.ctx
.get_table_reparts()
.instrument(common_telemetry::tracing::info_span!(
"meta_gc_get_table_reparts"
))
.await?;
let dropped_collector =
DroppedRegionCollector::new(self.ctx.as_ref(), &self.config, &self.region_gc_tracker);
let dropped_assignment = dropped_collector.collect_and_assign(&table_reparts).await?;
let dropped_assignment = dropped_collector
.collect_and_assign(&table_reparts)
.instrument(common_telemetry::tracing::info_span!(
"meta_gc_collect_dropped_regions"
))
.await?;
let candidate_count: usize = per_table_candidates.values().map(|c| c.len()).sum();
let dropped_count: usize = dropped_assignment
.regions_by_peer
@@ -67,6 +90,9 @@ impl GcScheduler {
let mut datanode_to_candidates = self
.aggregate_candidates_by_datanode(per_table_candidates)
.instrument(common_telemetry::tracing::info_span!(
"meta_gc_aggregate_by_datanode"
))
.await?;
self.merge_dropped_regions(&mut datanode_to_candidates, &dropped_assignment);
@@ -82,6 +108,9 @@ impl GcScheduler {
dropped_assignment.force_full_listing,
dropped_assignment.region_routes_override,
)
.instrument(common_telemetry::tracing::info_span!(
"meta_gc_dispatch_to_datanodes"
))
.await;
let duration = start_time.elapsed();
@@ -280,6 +309,12 @@ impl GcScheduler {
self.config.mailbox_timeout,
region_routes_override.clone(),
)
.instrument(common_telemetry::tracing::info_span!(
"meta_gc_call_datanode",
peer = %peer,
mode = "fast",
region_count = fast_list_regions.len()
))
.await
{
Ok(report) => combined_report.merge(report),
@@ -306,6 +341,12 @@ impl GcScheduler {
self.config.mailbox_timeout,
region_routes_override,
)
.instrument(common_telemetry::tracing::info_span!(
"meta_gc_call_datanode",
peer = %peer,
mode = "full",
region_count = need_full_list_regions.len()
))
.await
{
Ok(report) => combined_report.merge(report),

View File

@@ -29,6 +29,7 @@ use common_procedure::{
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
Result as ProcedureResult, Status,
};
use common_telemetry::tracing::Instrument as _;
use common_telemetry::{debug, error, info, warn};
use futures::future::join_all;
use itertools::Itertools as _;
@@ -524,7 +525,13 @@ impl BatchGcProcedure {
/// Get file references from all datanodes that host the regions
async fn get_file_references(&mut self) -> Result<FileRefsManifest> {
self.set_routes_and_related_regions().await?;
let region_count = self.data.regions.len();
self.set_routes_and_related_regions()
.instrument(common_telemetry::tracing::info_span!(
"meta_gc_procedure_prepare_routes",
region_count = region_count
))
.await?;
let query_regions = &self.data.regions;
let related_regions = &self.data.related_regions;
@@ -848,14 +855,48 @@ impl Procedure for BatchGcProcedure {
async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
match self.data.state {
State::Start => {
let _regions_span = common_telemetry::tracing::debug_span!(
"meta_gc_procedure_regions",
state = "start",
regions = ?self.data.regions
)
.entered();
info!(
"Batch GC procedure transitioning from Start to Acquiring for {} regions",
self.data.regions.len()
);
// Transition to Acquiring state
self.data.state = State::Acquiring;
Ok(Status::executing(false))
}
State::Acquiring => {
let region_count = self.data.regions.len();
let full_file_listing = self.data.full_file_listing;
let regions = self.data.regions.clone();
info!(
"Batch GC procedure acquiring file references for {} regions",
region_count
);
// Get file references from all datanodes
match self.get_file_references().await {
match self
.get_file_references()
.instrument(common_telemetry::tracing::debug_span!(
"meta_gc_procedure_regions",
state = "acquiring",
regions = ?regions
))
.instrument(common_telemetry::tracing::info_span!(
"meta_gc_procedure_get_file_references",
region_count = region_count,
full_file_listing = full_file_listing
))
.await
{
Ok(file_refs) => {
info!(
"Batch GC procedure acquired file references for {} regions",
file_refs.file_refs.len()
);
self.data.file_refs = file_refs;
self.data.state = State::Gcing;
Ok(Status::executing(false))
@@ -867,10 +908,31 @@ impl Procedure for BatchGcProcedure {
}
}
State::Gcing => {
info!(
"Batch GC procedure sending GC instructions for {} regions",
self.data.regions.len()
);
// Send GC instructions to all datanodes
// TODO(discord9): handle need-retry regions
match self.send_gc_instructions().await {
match self
.send_gc_instructions()
.instrument(common_telemetry::tracing::debug_span!(
"meta_gc_procedure_regions",
state = "gcing",
regions = ?self.data.regions
))
.instrument(common_telemetry::tracing::info_span!(
"meta_gc_procedure_send_gc_instructions",
region_count = self.data.regions.len(),
full_file_listing = self.data.full_file_listing
))
.await
{
Ok(report) => {
info!(
"Batch GC procedure received GC report, retry region count: {}",
report.need_retry_regions.len()
);
self.data.state = State::UpdateRepartition;
self.data.gc_report = Some(report);
Ok(Status::executing(false))
@@ -881,9 +943,21 @@ impl Procedure for BatchGcProcedure {
}
}
}
State::UpdateRepartition => match self.cleanup_region_repartition(ctx).await {
State::UpdateRepartition => match self
.cleanup_region_repartition(ctx)
.instrument(common_telemetry::tracing::debug_span!(
"meta_gc_procedure_regions",
state = "update_repartition",
regions = ?self.data.regions
))
.instrument(common_telemetry::tracing::info_span!(
"meta_gc_procedure_update_repartition",
region_count = self.data.regions.len()
))
.await
{
Ok(()) => {
info!(
debug!(
"Cleanup region repartition info completed successfully for regions {:?}",
self.data.regions
);

View File

@@ -19,6 +19,7 @@ use std::time::Instant;
use common_meta::DatanodeId;
use common_meta::key::TableMetadataManagerRef;
use common_procedure::ProcedureManagerRef;
use common_telemetry::tracing::Instrument as _;
use common_telemetry::{error, info};
use store_api::storage::GcReport;
use tokio::sync::mpsc::{Receiver, Sender};
@@ -123,13 +124,17 @@ impl GcScheduler {
match event {
Event::Tick => {
info!("Received gc tick");
if let Err(e) = self.handle_tick().await {
let span =
common_telemetry::tracing::info_span!("meta_gc_tick", trigger = "ticker");
if let Err(e) = self.handle_tick().instrument(span).await {
error!(e; "Failed to handle gc tick");
}
}
Event::Manually(sender) => {
info!("Received manually gc request");
match self.handle_tick().await {
let span =
common_telemetry::tracing::info_span!("meta_gc_tick", trigger = "manual");
match self.handle_tick().instrument(span).await {
Ok(report) => {
// ignore error
let _ = sender.send(report);
@@ -147,7 +152,8 @@ impl GcScheduler {
METRIC_META_GC_SCHEDULER_CYCLES_TOTAL.inc();
let _timer = METRIC_META_GC_SCHEDULER_DURATION_SECONDS.start_timer();
info!("Start to trigger gc");
let report = self.trigger_gc().await?;
let span = common_telemetry::tracing::info_span!("meta_gc_handle_tick");
let report = self.trigger_gc().instrument(span).await?;
// Periodically clean up stale tracker entries
self.cleanup_tracker_if_needed().await?;

View File

@@ -26,6 +26,7 @@ use std::sync::Arc;
use std::time::Duration;
use common_meta::datanode::GcStat;
use common_telemetry::tracing::Instrument as _;
use common_telemetry::{debug, error, info, warn};
use common_time::Timestamp;
use itertools::Itertools;
@@ -268,6 +269,10 @@ impl LocalGcWorker {
/// may cause too many concurrent listing operations.
///
/// TODO(discord9): consider instead running in parallel mode
#[common_telemetry::tracing::instrument(
skip_all,
fields(region_count = self.regions.len(), full_file_listing = self.full_file_listing)
)]
pub async fn run(self) -> Result<GcReport> {
info!("LocalGcWorker started");
let _timer = GC_DURATION_SECONDS
@@ -336,6 +341,14 @@ impl LocalGcWorker {
///
/// Note that the files that are still in use or may still be kept for a while are not deleted
/// to avoid deleting files that are still needed.
#[common_telemetry::tracing::instrument(
skip_all,
fields(
region_id = %region_id,
full_file_listing = self.full_file_listing,
region_present = region.is_some()
)
)]
pub async fn do_region_gc(
&self,
region_id: RegionId,
@@ -457,7 +470,7 @@ impl LocalGcWorker {
let unused_file_cnt = deletable_files.len();
info!(
"gc: for region{}{region_id}: In manifest file cnt: {}, Tmp ref file cnt: {}, recently removed files: {}, Unused files to delete: {:?}",
"gc: for region{}{region_id}: In manifest file cnt: {}, Tmp ref file cnt: {}, recently removed files: {}, Unused files to delete count: {}",
if region.is_none() {
"(region dropped)"
} else {
@@ -466,7 +479,11 @@ impl LocalGcWorker {
current_files.map(|c| c.len()).unwrap_or(0),
tmp_ref_files.len(),
removed_file_cnt,
&deletable_files,
deletable_files.len(),
);
debug!(
"gc: deletable files for region {}: {:?}",
region_id, &deletable_files
);
debug!(
@@ -495,6 +512,10 @@ impl LocalGcWorker {
Ok(deletable_files)
}
#[common_telemetry::tracing::instrument(
skip_all,
fields(region_id = %region_id, removed_file_count = removed_files.len())
)]
async fn delete_files(&self, region_id: RegionId, removed_files: &[RemovedFile]) -> Result<()> {
let mut index_ids = vec![];
let file_pairs = removed_files
@@ -543,6 +564,10 @@ impl LocalGcWorker {
}
/// Update region manifest for clear the actually deleted files
#[common_telemetry::tracing::instrument(
skip_all,
fields(region_id = %region.region_id(), deleted_file_count = deleted_files.len())
)]
async fn update_manifest_removed_files(
&self,
region: &MitoRegionRef,
@@ -618,6 +643,10 @@ impl LocalGcWorker {
/// List all files in the region directory.
/// Returns a vector of all file entries found.
/// This might take a long time if there are many files in the region directory.
#[common_telemetry::tracing::instrument(
skip_all,
fields(region_id = %region_id, file_cnt_hint = file_cnt_hint)
)]
async fn list_from_object_store(
&self,
region_id: RegionId,
@@ -666,38 +695,41 @@ impl LocalGcWorker {
for (lister, end) in listers {
let tx = tx.clone();
let handle = tokio::spawn(async move {
let stream = lister.take_while(|e: &std::result::Result<Entry, _>| match e {
Ok(e) => {
if let Some(end) = &end {
// reach end, stop listing
e.name() < end.as_str()
} else {
// no end, take all entries
let handle = tokio::spawn(
async move {
let stream = lister.take_while(|e: &std::result::Result<Entry, _>| match e {
Ok(e) => {
if let Some(end) = &end {
// reach end, stop listing
e.name() < end.as_str()
} else {
// no end, take all entries
true
}
}
// entry went wrong, log and skip it
Err(err) => {
warn!("Failed to list entry: {}", err);
true
}
}
// entry went wrong, log and skip it
Err(err) => {
warn!("Failed to list entry: {}", err);
true
}
});
let stream = stream
.filter(|e| {
if let Ok(e) = &e {
// notice that we only care about files, skip dirs
e.metadata().is_file()
} else {
// error entry, take for further logging
true
}
})
.collect::<Vec<_>>()
.await;
// ordering of files doesn't matter here, so we can send them directly
tx.send(stream).await.expect("Failed to send entries");
});
});
let stream = stream
.filter(|e| {
if let Ok(e) = &e {
// notice that we only care about files, skip dirs
e.metadata().is_file()
} else {
// error entry, take for further logging
true
}
})
.collect::<Vec<_>>()
.await;
// ordering of files doesn't matter here, so we can send them directly
tx.send(stream).await.expect("Failed to send entries");
}
.instrument(common_telemetry::tracing::info_span!("gc_list_partition")),
);
handles.push(handle);
}