fix: include index files in GC listing (#8327)

* fix: include index files in GC listing

Signed-off-by: discord9 <discord9@163.com>

* chore: filter GC index listing to puffins

Signed-off-by: discord9 <discord9@163.com>

* chore: simplify GC index listing stream

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-06-24 12:40:10 +08:00
committed by GitHub
parent 019a913f7c
commit 758a166325
2 changed files with 70 additions and 9 deletions

View File

@@ -30,7 +30,7 @@ use common_telemetry::tracing::Instrument as _;
use common_telemetry::{debug, error, info, warn};
use common_time::Timestamp;
use itertools::Itertools;
use object_store::{Entry, Lister};
use object_store::{Entry, ErrorKind, Lister};
use serde::{Deserialize, Serialize};
use snafu::{ResultExt as _, ensure};
use store_api::storage::{FileId, FileRef, FileRefsManifest, GcReport, IndexVersion, RegionId};
@@ -308,7 +308,14 @@ impl LocalGcWorker {
.iter()
.filter_map(|f| f.index_version().map(|v| (f.file_id(), v)))
.collect_vec();
deleted_files.insert(*region_id, files.into_iter().map(|f| f.file_id()).collect());
let data_files = files
.into_iter()
.filter_map(|f| match f {
RemovedFile::File(file_id, _) => Some(file_id),
RemovedFile::Index(_, _) => None,
})
.collect();
deleted_files.insert(*region_id, data_files);
deleted_indexes.insert(*region_id, index_files);
processed_regions.insert(*region_id);
debug!(
@@ -671,22 +678,72 @@ impl LocalGcWorker {
})?;
let lister_cnt = listers.len();
// Step 2: Concurrently list all files in the region directory
let all_entries = self
// Step 2: Concurrently list all parquet files in the region root directory
let mut all_entries = self
.list_region_files_concurrent(listers)
.await
.inspect_err(|_| {
GC_ERRORS_TOTAL.with_label_values(&["list_failed"]).inc();
})?;
let cnt = all_entries.len();
let root_cnt = all_entries.len();
// Step 2b: Flat-list region_dir/index/ for puffin files.
// This is NOT a recursive listing — we only list the index/
// subdirectory to avoid scanning nested dirs/staging/blob/cache.
let index_entries = self
.list_region_index_files(region_id)
.await
.inspect_err(|_| {
GC_ERRORS_TOTAL.with_label_values(&["list_failed"]).inc();
})?;
let index_cnt = index_entries.len();
all_entries.extend(index_entries);
info!(
"gc: full listing mode cost {} secs using {lister_cnt} lister for {cnt} files in region {}.",
"gc: full listing mode cost {} secs using {lister_cnt} lister for root={root_cnt} index={index_cnt} files in region {}.",
start.elapsed().as_secs_f64(),
region_id
);
Ok(all_entries)
}
/// Flat-list puffin files from `region_dir/index/`.
/// If the index directory does not exist, returns an empty vec without error.
/// Only `.puffin` files (not subdirectories) are included.
async fn list_region_index_files(&self, region_id: RegionId) -> Result<Vec<Entry>> {
let region_dir = self.access_layer.build_region_dir(region_id);
let index_dir = object_store::util::join_dir(&region_dir, "index");
let mut lister = match self
.access_layer
.object_store()
.lister_with(&index_dir)
.await
{
Ok(l) => l,
Err(e) if e.kind() == ErrorKind::NotFound => {
// Index dir may not exist — that's fine, just log and return empty.
// object-store backends (especially filesystem) may error on
// non-existent directories.
debug!(
"Index directory not found for region {}: {}. Treating as empty.",
region_id, e
);
return Ok(vec![]);
}
Err(e) => return Err(e).context(OpenDalSnafu),
};
let mut entries = Vec::new();
while let Some(entry) = lister.next().await {
let entry = entry.context(OpenDalSnafu)?;
if entry.metadata().is_file() && entry.name().ends_with(".puffin") {
entries.push(entry);
}
}
Ok(entries)
}
/// Concurrently list all files in the region directory using the provided listers.
/// Returns a vector of all file entries found across all partitions.
async fn list_region_files_concurrent(
@@ -710,7 +767,8 @@ impl LocalGcWorker {
true
}
}
// entry went wrong, log and skip it
// Entry went wrong. Keep listing so the error can be propagated below
// instead of returning a partial listing as success.
Err(err) => {
warn!("Failed to list entry: {}", err);
true
@@ -747,7 +805,9 @@ impl LocalGcWorker {
// Collect all entries from the channel
let mut all_entries = vec![];
while let Some(stream) = rx.recv().await {
all_entries.extend(stream.into_iter().filter_map(Result::ok));
for entry in stream {
all_entries.push(entry.context(OpenDalSnafu)?);
}
}
Ok(all_entries)

View File

@@ -108,7 +108,8 @@ pub struct FileRefsManifest {
#[derive(Clone, Default, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct GcReport {
/// deleted files per region
/// Deleted SST/parquet file ids per region. Index-only deletions are reported via
/// `deleted_indexes` because a naked `FileId` cannot distinguish index versions.
/// TODO(discord9): change to `RemovedFile`?
pub deleted_files: HashMap<RegionId, Vec<FileId>>,
pub deleted_indexes: HashMap<RegionId, Vec<(FileId, IndexVersion)>>,