From 46683f908a7d9f942dee855cb7fac1f5e66d946d Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Thu, 26 Feb 2026 14:37:29 +0800 Subject: [PATCH] chore: tracing for gc (#7723) * chore: tracing for gc Signed-off-by: discord9 * chore: rm some clone Signed-off-by: discord9 --------- Signed-off-by: discord9 --- src/meta-srv/src/gc/handler.rs | 49 ++++++++++++++-- src/meta-srv/src/gc/procedure.rs | 84 ++++++++++++++++++++++++++-- src/meta-srv/src/gc/scheduler.rs | 12 +++- src/mito2/src/gc.rs | 96 +++++++++++++++++++++----------- 4 files changed, 197 insertions(+), 44 deletions(-) diff --git a/src/meta-srv/src/gc/handler.rs b/src/meta-srv/src/gc/handler.rs index 32e67e1d71..ccbfd00f6d 100644 --- a/src/meta-srv/src/gc/handler.rs +++ b/src/meta-srv/src/gc/handler.rs @@ -18,6 +18,7 @@ use std::time::Instant; use common_catalog::consts::MITO_ENGINE; use common_meta::datanode::{RegionManifestInfo, RegionStat}; use common_meta::peer::Peer; +use common_telemetry::tracing::Instrument as _; use common_telemetry::{debug, error, info, warn}; use futures::StreamExt; use itertools::Itertools; @@ -40,18 +41,40 @@ impl GcScheduler { info!("Starting GC cycle"); // limit gc region scope to regions whose datanode have reported stats(by heartbeat) - let table_to_region_stats = self.ctx.get_table_to_region_stats().await?; + let table_to_region_stats = self + .ctx + .get_table_to_region_stats() + .instrument(common_telemetry::tracing::info_span!( + "meta_gc_get_region_stats" + )) + .await?; info!( "Fetched region stats for {} tables", table_to_region_stats.len() ); - let per_table_candidates = self.select_gc_candidates(&table_to_region_stats).await?; + let per_table_candidates = self + .select_gc_candidates(&table_to_region_stats) + .instrument(common_telemetry::tracing::info_span!( + "meta_gc_select_candidates" + )) + .await?; - let table_reparts = self.ctx.get_table_reparts().await?; + let table_reparts = self + .ctx + .get_table_reparts() + .instrument(common_telemetry::tracing::info_span!( + "meta_gc_get_table_reparts" + )) + .await?; let dropped_collector = DroppedRegionCollector::new(self.ctx.as_ref(), &self.config, &self.region_gc_tracker); - let dropped_assignment = dropped_collector.collect_and_assign(&table_reparts).await?; + let dropped_assignment = dropped_collector + .collect_and_assign(&table_reparts) + .instrument(common_telemetry::tracing::info_span!( + "meta_gc_collect_dropped_regions" + )) + .await?; let candidate_count: usize = per_table_candidates.values().map(|c| c.len()).sum(); let dropped_count: usize = dropped_assignment .regions_by_peer @@ -67,6 +90,9 @@ impl GcScheduler { let mut datanode_to_candidates = self .aggregate_candidates_by_datanode(per_table_candidates) + .instrument(common_telemetry::tracing::info_span!( + "meta_gc_aggregate_by_datanode" + )) .await?; self.merge_dropped_regions(&mut datanode_to_candidates, &dropped_assignment); @@ -82,6 +108,9 @@ impl GcScheduler { dropped_assignment.force_full_listing, dropped_assignment.region_routes_override, ) + .instrument(common_telemetry::tracing::info_span!( + "meta_gc_dispatch_to_datanodes" + )) .await; let duration = start_time.elapsed(); @@ -280,6 +309,12 @@ impl GcScheduler { self.config.mailbox_timeout, region_routes_override.clone(), ) + .instrument(common_telemetry::tracing::info_span!( + "meta_gc_call_datanode", + peer = %peer, + mode = "fast", + region_count = fast_list_regions.len() + )) .await { Ok(report) => combined_report.merge(report), @@ -306,6 +341,12 @@ impl GcScheduler { self.config.mailbox_timeout, region_routes_override, ) + .instrument(common_telemetry::tracing::info_span!( + "meta_gc_call_datanode", + peer = %peer, + mode = "full", + region_count = need_full_list_regions.len() + )) .await { Ok(report) => combined_report.merge(report), diff --git a/src/meta-srv/src/gc/procedure.rs b/src/meta-srv/src/gc/procedure.rs index 1302191be8..7f9f655f44 100644 --- a/src/meta-srv/src/gc/procedure.rs +++ b/src/meta-srv/src/gc/procedure.rs @@ -29,6 +29,7 @@ use common_procedure::{ Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, Result as ProcedureResult, Status, }; +use common_telemetry::tracing::Instrument as _; use common_telemetry::{debug, error, info, warn}; use futures::future::join_all; use itertools::Itertools as _; @@ -524,7 +525,13 @@ impl BatchGcProcedure { /// Get file references from all datanodes that host the regions async fn get_file_references(&mut self) -> Result { - self.set_routes_and_related_regions().await?; + let region_count = self.data.regions.len(); + self.set_routes_and_related_regions() + .instrument(common_telemetry::tracing::info_span!( + "meta_gc_procedure_prepare_routes", + region_count = region_count + )) + .await?; let query_regions = &self.data.regions; let related_regions = &self.data.related_regions; @@ -848,14 +855,48 @@ impl Procedure for BatchGcProcedure { async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult { match self.data.state { State::Start => { + let _regions_span = common_telemetry::tracing::debug_span!( + "meta_gc_procedure_regions", + state = "start", + regions = ?self.data.regions + ) + .entered(); + info!( + "Batch GC procedure transitioning from Start to Acquiring for {} regions", + self.data.regions.len() + ); // Transition to Acquiring state self.data.state = State::Acquiring; Ok(Status::executing(false)) } State::Acquiring => { + let region_count = self.data.regions.len(); + let full_file_listing = self.data.full_file_listing; + let regions = self.data.regions.clone(); + info!( + "Batch GC procedure acquiring file references for {} regions", + region_count + ); // Get file references from all datanodes - match self.get_file_references().await { + match self + .get_file_references() + .instrument(common_telemetry::tracing::debug_span!( + "meta_gc_procedure_regions", + state = "acquiring", + regions = ?regions + )) + .instrument(common_telemetry::tracing::info_span!( + "meta_gc_procedure_get_file_references", + region_count = region_count, + full_file_listing = full_file_listing + )) + .await + { Ok(file_refs) => { + info!( + "Batch GC procedure acquired file references for {} regions", + file_refs.file_refs.len() + ); self.data.file_refs = file_refs; self.data.state = State::Gcing; Ok(Status::executing(false)) @@ -867,10 +908,31 @@ impl Procedure for BatchGcProcedure { } } State::Gcing => { + info!( + "Batch GC procedure sending GC instructions for {} regions", + self.data.regions.len() + ); // Send GC instructions to all datanodes // TODO(discord9): handle need-retry regions - match self.send_gc_instructions().await { + match self + .send_gc_instructions() + .instrument(common_telemetry::tracing::debug_span!( + "meta_gc_procedure_regions", + state = "gcing", + regions = ?self.data.regions + )) + .instrument(common_telemetry::tracing::info_span!( + "meta_gc_procedure_send_gc_instructions", + region_count = self.data.regions.len(), + full_file_listing = self.data.full_file_listing + )) + .await + { Ok(report) => { + info!( + "Batch GC procedure received GC report, retry region count: {}", + report.need_retry_regions.len() + ); self.data.state = State::UpdateRepartition; self.data.gc_report = Some(report); Ok(Status::executing(false)) @@ -881,9 +943,21 @@ impl Procedure for BatchGcProcedure { } } } - State::UpdateRepartition => match self.cleanup_region_repartition(ctx).await { + State::UpdateRepartition => match self + .cleanup_region_repartition(ctx) + .instrument(common_telemetry::tracing::debug_span!( + "meta_gc_procedure_regions", + state = "update_repartition", + regions = ?self.data.regions + )) + .instrument(common_telemetry::tracing::info_span!( + "meta_gc_procedure_update_repartition", + region_count = self.data.regions.len() + )) + .await + { Ok(()) => { - info!( + debug!( "Cleanup region repartition info completed successfully for regions {:?}", self.data.regions ); diff --git a/src/meta-srv/src/gc/scheduler.rs b/src/meta-srv/src/gc/scheduler.rs index 648539f27f..a275c64452 100644 --- a/src/meta-srv/src/gc/scheduler.rs +++ b/src/meta-srv/src/gc/scheduler.rs @@ -19,6 +19,7 @@ use std::time::Instant; use common_meta::DatanodeId; use common_meta::key::TableMetadataManagerRef; use common_procedure::ProcedureManagerRef; +use common_telemetry::tracing::Instrument as _; use common_telemetry::{error, info}; use store_api::storage::GcReport; use tokio::sync::mpsc::{Receiver, Sender}; @@ -123,13 +124,17 @@ impl GcScheduler { match event { Event::Tick => { info!("Received gc tick"); - if let Err(e) = self.handle_tick().await { + let span = + common_telemetry::tracing::info_span!("meta_gc_tick", trigger = "ticker"); + if let Err(e) = self.handle_tick().instrument(span).await { error!(e; "Failed to handle gc tick"); } } Event::Manually(sender) => { info!("Received manually gc request"); - match self.handle_tick().await { + let span = + common_telemetry::tracing::info_span!("meta_gc_tick", trigger = "manual"); + match self.handle_tick().instrument(span).await { Ok(report) => { // ignore error let _ = sender.send(report); @@ -147,7 +152,8 @@ impl GcScheduler { METRIC_META_GC_SCHEDULER_CYCLES_TOTAL.inc(); let _timer = METRIC_META_GC_SCHEDULER_DURATION_SECONDS.start_timer(); info!("Start to trigger gc"); - let report = self.trigger_gc().await?; + let span = common_telemetry::tracing::info_span!("meta_gc_handle_tick"); + let report = self.trigger_gc().instrument(span).await?; // Periodically clean up stale tracker entries self.cleanup_tracker_if_needed().await?; diff --git a/src/mito2/src/gc.rs b/src/mito2/src/gc.rs index 3866c8892f..cae67637be 100644 --- a/src/mito2/src/gc.rs +++ b/src/mito2/src/gc.rs @@ -26,6 +26,7 @@ use std::sync::Arc; use std::time::Duration; use common_meta::datanode::GcStat; +use common_telemetry::tracing::Instrument as _; use common_telemetry::{debug, error, info, warn}; use common_time::Timestamp; use itertools::Itertools; @@ -268,6 +269,10 @@ impl LocalGcWorker { /// may cause too many concurrent listing operations. /// /// TODO(discord9): consider instead running in parallel mode + #[common_telemetry::tracing::instrument( + skip_all, + fields(region_count = self.regions.len(), full_file_listing = self.full_file_listing) + )] pub async fn run(self) -> Result { info!("LocalGcWorker started"); let _timer = GC_DURATION_SECONDS @@ -336,6 +341,14 @@ impl LocalGcWorker { /// /// Note that the files that are still in use or may still be kept for a while are not deleted /// to avoid deleting files that are still needed. + #[common_telemetry::tracing::instrument( + skip_all, + fields( + region_id = %region_id, + full_file_listing = self.full_file_listing, + region_present = region.is_some() + ) + )] pub async fn do_region_gc( &self, region_id: RegionId, @@ -457,7 +470,7 @@ impl LocalGcWorker { let unused_file_cnt = deletable_files.len(); info!( - "gc: for region{}{region_id}: In manifest file cnt: {}, Tmp ref file cnt: {}, recently removed files: {}, Unused files to delete: {:?}", + "gc: for region{}{region_id}: In manifest file cnt: {}, Tmp ref file cnt: {}, recently removed files: {}, Unused files to delete count: {}", if region.is_none() { "(region dropped)" } else { @@ -466,7 +479,11 @@ impl LocalGcWorker { current_files.map(|c| c.len()).unwrap_or(0), tmp_ref_files.len(), removed_file_cnt, - &deletable_files, + deletable_files.len(), + ); + debug!( + "gc: deletable files for region {}: {:?}", + region_id, &deletable_files ); debug!( @@ -495,6 +512,10 @@ impl LocalGcWorker { Ok(deletable_files) } + #[common_telemetry::tracing::instrument( + skip_all, + fields(region_id = %region_id, removed_file_count = removed_files.len()) + )] async fn delete_files(&self, region_id: RegionId, removed_files: &[RemovedFile]) -> Result<()> { let mut index_ids = vec![]; let file_pairs = removed_files @@ -543,6 +564,10 @@ impl LocalGcWorker { } /// Update region manifest for clear the actually deleted files + #[common_telemetry::tracing::instrument( + skip_all, + fields(region_id = %region.region_id(), deleted_file_count = deleted_files.len()) + )] async fn update_manifest_removed_files( &self, region: &MitoRegionRef, @@ -618,6 +643,10 @@ impl LocalGcWorker { /// List all files in the region directory. /// Returns a vector of all file entries found. /// This might take a long time if there are many files in the region directory. + #[common_telemetry::tracing::instrument( + skip_all, + fields(region_id = %region_id, file_cnt_hint = file_cnt_hint) + )] async fn list_from_object_store( &self, region_id: RegionId, @@ -666,38 +695,41 @@ impl LocalGcWorker { for (lister, end) in listers { let tx = tx.clone(); - let handle = tokio::spawn(async move { - let stream = lister.take_while(|e: &std::result::Result| match e { - Ok(e) => { - if let Some(end) = &end { - // reach end, stop listing - e.name() < end.as_str() - } else { - // no end, take all entries + let handle = tokio::spawn( + async move { + let stream = lister.take_while(|e: &std::result::Result| match e { + Ok(e) => { + if let Some(end) = &end { + // reach end, stop listing + e.name() < end.as_str() + } else { + // no end, take all entries + true + } + } + // entry went wrong, log and skip it + Err(err) => { + warn!("Failed to list entry: {}", err); true } - } - // entry went wrong, log and skip it - Err(err) => { - warn!("Failed to list entry: {}", err); - true - } - }); - let stream = stream - .filter(|e| { - if let Ok(e) = &e { - // notice that we only care about files, skip dirs - e.metadata().is_file() - } else { - // error entry, take for further logging - true - } - }) - .collect::>() - .await; - // ordering of files doesn't matter here, so we can send them directly - tx.send(stream).await.expect("Failed to send entries"); - }); + }); + let stream = stream + .filter(|e| { + if let Ok(e) = &e { + // notice that we only care about files, skip dirs + e.metadata().is_file() + } else { + // error entry, take for further logging + true + } + }) + .collect::>() + .await; + // ordering of files doesn't matter here, so we can send them directly + tx.send(stream).await.expect("Failed to send entries"); + } + .instrument(common_telemetry::tracing::info_span!("gc_list_partition")), + ); handles.push(handle); }