From 758a1663257c5cb56617679ea7328f73a690b0da Mon Sep 17 00:00:00 2001 From: discord9 Date: Wed, 24 Jun 2026 12:40:10 +0800 Subject: [PATCH] fix: include index files in GC listing (#8327) * fix: include index files in GC listing Signed-off-by: discord9 * chore: filter GC index listing to puffins Signed-off-by: discord9 * chore: simplify GC index listing stream Signed-off-by: discord9 --------- Signed-off-by: discord9 --- src/mito2/src/gc.rs | 76 +++++++++++++++++++++++++++---- src/store-api/src/storage/file.rs | 3 +- 2 files changed, 70 insertions(+), 9 deletions(-) diff --git a/src/mito2/src/gc.rs b/src/mito2/src/gc.rs index 5b35909c99..8b5aadf141 100644 --- a/src/mito2/src/gc.rs +++ b/src/mito2/src/gc.rs @@ -30,7 +30,7 @@ use common_telemetry::tracing::Instrument as _; use common_telemetry::{debug, error, info, warn}; use common_time::Timestamp; use itertools::Itertools; -use object_store::{Entry, Lister}; +use object_store::{Entry, ErrorKind, Lister}; use serde::{Deserialize, Serialize}; use snafu::{ResultExt as _, ensure}; use store_api::storage::{FileId, FileRef, FileRefsManifest, GcReport, IndexVersion, RegionId}; @@ -308,7 +308,14 @@ impl LocalGcWorker { .iter() .filter_map(|f| f.index_version().map(|v| (f.file_id(), v))) .collect_vec(); - deleted_files.insert(*region_id, files.into_iter().map(|f| f.file_id()).collect()); + let data_files = files + .into_iter() + .filter_map(|f| match f { + RemovedFile::File(file_id, _) => Some(file_id), + RemovedFile::Index(_, _) => None, + }) + .collect(); + deleted_files.insert(*region_id, data_files); deleted_indexes.insert(*region_id, index_files); processed_regions.insert(*region_id); debug!( @@ -671,22 +678,72 @@ impl LocalGcWorker { })?; let lister_cnt = listers.len(); - // Step 2: Concurrently list all files in the region directory - let all_entries = self + // Step 2: Concurrently list all parquet files in the region root directory + let mut all_entries = self .list_region_files_concurrent(listers) .await .inspect_err(|_| { GC_ERRORS_TOTAL.with_label_values(&["list_failed"]).inc(); })?; - let cnt = all_entries.len(); + let root_cnt = all_entries.len(); + + // Step 2b: Flat-list region_dir/index/ for puffin files. + // This is NOT a recursive listing — we only list the index/ + // subdirectory to avoid scanning nested dirs/staging/blob/cache. + let index_entries = self + .list_region_index_files(region_id) + .await + .inspect_err(|_| { + GC_ERRORS_TOTAL.with_label_values(&["list_failed"]).inc(); + })?; + let index_cnt = index_entries.len(); + all_entries.extend(index_entries); info!( - "gc: full listing mode cost {} secs using {lister_cnt} lister for {cnt} files in region {}.", + "gc: full listing mode cost {} secs using {lister_cnt} lister for root={root_cnt} index={index_cnt} files in region {}.", start.elapsed().as_secs_f64(), region_id ); Ok(all_entries) } + /// Flat-list puffin files from `region_dir/index/`. + /// If the index directory does not exist, returns an empty vec without error. + /// Only `.puffin` files (not subdirectories) are included. + async fn list_region_index_files(&self, region_id: RegionId) -> Result> { + let region_dir = self.access_layer.build_region_dir(region_id); + let index_dir = object_store::util::join_dir(®ion_dir, "index"); + + let mut lister = match self + .access_layer + .object_store() + .lister_with(&index_dir) + .await + { + Ok(l) => l, + Err(e) if e.kind() == ErrorKind::NotFound => { + // Index dir may not exist — that's fine, just log and return empty. + // object-store backends (especially filesystem) may error on + // non-existent directories. + debug!( + "Index directory not found for region {}: {}. Treating as empty.", + region_id, e + ); + return Ok(vec![]); + } + Err(e) => return Err(e).context(OpenDalSnafu), + }; + + let mut entries = Vec::new(); + while let Some(entry) = lister.next().await { + let entry = entry.context(OpenDalSnafu)?; + if entry.metadata().is_file() && entry.name().ends_with(".puffin") { + entries.push(entry); + } + } + + Ok(entries) + } + /// Concurrently list all files in the region directory using the provided listers. /// Returns a vector of all file entries found across all partitions. async fn list_region_files_concurrent( @@ -710,7 +767,8 @@ impl LocalGcWorker { true } } - // entry went wrong, log and skip it + // Entry went wrong. Keep listing so the error can be propagated below + // instead of returning a partial listing as success. Err(err) => { warn!("Failed to list entry: {}", err); true @@ -747,7 +805,9 @@ impl LocalGcWorker { // Collect all entries from the channel let mut all_entries = vec![]; while let Some(stream) = rx.recv().await { - all_entries.extend(stream.into_iter().filter_map(Result::ok)); + for entry in stream { + all_entries.push(entry.context(OpenDalSnafu)?); + } } Ok(all_entries) diff --git a/src/store-api/src/storage/file.rs b/src/store-api/src/storage/file.rs index 8acc0e3b93..0dcfcdc8cd 100644 --- a/src/store-api/src/storage/file.rs +++ b/src/store-api/src/storage/file.rs @@ -108,7 +108,8 @@ pub struct FileRefsManifest { #[derive(Clone, Default, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct GcReport { - /// deleted files per region + /// Deleted SST/parquet file ids per region. Index-only deletions are reported via + /// `deleted_indexes` because a naked `FileId` cannot distinguish index versions. /// TODO(discord9): change to `RemovedFile`? pub deleted_files: HashMap>, pub deleted_indexes: HashMap>,