feat: gc versioned index (#7412)

* feat: add index version to file ref

Signed-off-by: discord9 <discord9@163.com>

* refactor wip

Signed-off-by: discord9 <discord9@163.com>

* wip

Signed-off-by: discord9 <discord9@163.com>

* update gc worker

Signed-off-by: discord9 <discord9@163.com>

* stuff

Signed-off-by: discord9 <discord9@163.com>

* gc report for index files

Signed-off-by: discord9 <discord9@163.com>

* fix: type

Signed-off-by: discord9 <discord9@163.com>

* stuff

Signed-off-by: discord9 <discord9@163.com>

* chore: clippy

Signed-off-by: discord9 <discord9@163.com>

* chore: metrics

Signed-off-by: discord9 <discord9@163.com>

* typo

Signed-off-by: discord9 <discord9@163.com>

* typo

Signed-off-by: discord9 <discord9@163.com>

* chore: naming

Signed-off-by: discord9 <discord9@163.com>

* docs: update explain

Signed-off-by: discord9 <discord9@163.com>

* test: parse file id/type from file path

Signed-off-by: discord9 <discord9@163.com>

* chore: change parse method visibility to crate

Signed-off-by: discord9 <discord9@163.com>

* pcr

Signed-off-by: discord9 <discord9@163.com>

* pcr

Signed-off-by: discord9 <discord9@163.com>

* chore

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2025-12-24 11:07:53 +08:00
committed by GitHub
parent 17e2b98132
commit dec0d522f8
17 changed files with 552 additions and 171 deletions

View File

@@ -834,7 +834,7 @@ impl InstructionReply {
mod tests {
use std::collections::HashSet;
use store_api::storage::FileId;
use store_api::storage::{FileId, FileRef};
use super::*;
@@ -1209,12 +1209,14 @@ mod tests {
let mut manifest = FileRefsManifest::default();
let r0 = RegionId::new(1024, 1);
let r1 = RegionId::new(1024, 2);
manifest
.file_refs
.insert(r0, HashSet::from([FileId::random()]));
manifest
.file_refs
.insert(r1, HashSet::from([FileId::random()]));
manifest.file_refs.insert(
r0,
HashSet::from([FileRef::new(r0, FileId::random(), None)]),
);
manifest.file_refs.insert(
r1,
HashSet::from([FileRef::new(r1, FileId::random(), None)]),
);
manifest.manifest_version.insert(r0, 10);
manifest.manifest_version.insert(r1, 20);

View File

@@ -194,7 +194,7 @@ impl SchedulerCtx for DefaultGcSchedulerCtx {
}
// Send GetFileRefs instructions to each datanode
let mut all_file_refs: HashMap<RegionId, HashSet<FileId>> = HashMap::new();
let mut all_file_refs: HashMap<RegionId, HashSet<_>> = HashMap::new();
let mut all_manifest_versions = HashMap::new();
for (peer, regions) in datanode2query_regions {

View File

@@ -53,6 +53,7 @@ pub fn new_empty_report_with(region_ids: impl IntoIterator<Item = RegionId>) ->
}
GcReport {
deleted_files,
deleted_indexes: HashMap::new(),
need_retry_regions: HashSet::new(),
}
}

View File

@@ -454,7 +454,11 @@ async fn test_region_gc_concurrency_with_retryable_errors() {
(
region_id,
// mock the actual gc report with deleted files when succeeded(even no files to delete)
GcReport::new(HashMap::from([(region_id, vec![])]), HashSet::new()),
GcReport::new(
HashMap::from([(region_id, vec![])]),
Default::default(),
HashSet::new(),
),
)
})
.collect();

View File

@@ -20,7 +20,7 @@ use common_meta::datanode::RegionManifestInfo;
use common_meta::peer::Peer;
use common_telemetry::init_default_ut_logging;
use store_api::region_engine::RegionRole;
use store_api::storage::{FileId, FileRefsManifest, GcReport, RegionId};
use store_api::storage::{FileId, FileRef, FileRefsManifest, GcReport, RegionId};
use crate::gc::mock::{
MockSchedulerCtx, TEST_REGION_SIZE_200MB, mock_region_stat, new_empty_report_with,
@@ -60,7 +60,10 @@ async fn test_gc_regions_failure_handling() {
let file_refs = FileRefsManifest {
manifest_version: HashMap::from([(region_id, 1)]),
file_refs: HashMap::from([(region_id, HashSet::from([FileId::random()]))]),
file_refs: HashMap::from([(
region_id,
HashSet::from([FileRef::new(region_id, FileId::random(), None)]),
)]),
};
let ctx = Arc::new(

View File

@@ -356,8 +356,7 @@ impl BatchGcProcedure {
}
// Send GetFileRefs instructions to each datanode
let mut all_file_refs: HashMap<RegionId, HashSet<store_api::storage::FileId>> =
HashMap::new();
let mut all_file_refs: HashMap<RegionId, HashSet<_>> = HashMap::new();
let mut all_manifest_versions = HashMap::new();
for (peer, regions) in datanode2query_regions {

View File

@@ -727,7 +727,7 @@ impl fmt::Display for FileType {
impl FileType {
/// Parses the file type from string.
fn parse(s: &str) -> Option<FileType> {
pub(crate) fn parse(s: &str) -> Option<FileType> {
match s {
"parquet" => Some(FileType::Parquet),
"puffin" => Some(FileType::Puffin(0)),

View File

@@ -28,28 +28,62 @@ use std::time::Duration;
use common_meta::datanode::GcStat;
use common_telemetry::{debug, error, info, warn};
use common_time::Timestamp;
use itertools::Itertools;
use object_store::{Entry, Lister};
use serde::{Deserialize, Serialize};
use snafu::ResultExt as _;
use store_api::storage::{FileId, FileRefsManifest, GcReport, RegionId};
use store_api::storage::{FileId, FileRef, FileRefsManifest, GcReport, IndexVersion, RegionId};
use tokio::sync::{OwnedSemaphorePermit, TryAcquireError};
use tokio_stream::StreamExt;
use crate::access_layer::AccessLayerRef;
use crate::cache::CacheManagerRef;
use crate::cache::file_cache::FileType;
use crate::config::MitoConfig;
use crate::error::{
DurationOutOfRangeSnafu, JoinSnafu, OpenDalSnafu, Result, TooManyGcJobsSnafu, UnexpectedSnafu,
};
use crate::manifest::action::RegionManifest;
use crate::metrics::GC_DELETE_FILE_CNT;
use crate::manifest::action::{RegionManifest, RemovedFile};
use crate::metrics::{GC_DELETE_FILE_CNT, GC_ORPHANED_INDEX_FILES, GC_SKIPPED_UNPARSABLE_FILES};
use crate::region::{MitoRegionRef, RegionRoleState};
use crate::sst::file::delete_files;
use crate::sst::file::{RegionFileId, RegionIndexId, delete_files, delete_index};
use crate::sst::location::{self};
#[cfg(test)]
mod worker_test;
/// Helper function to determine if a file should be deleted based on common logic
/// shared between Parquet and Puffin file types.
fn should_delete_file(
is_in_manifest: bool,
is_in_tmp_ref: bool,
is_linger: bool,
is_eligible_for_delete: bool,
entry: &Entry,
unknown_file_may_linger_until: chrono::DateTime<chrono::Utc>,
) -> bool {
let is_known = is_linger || is_eligible_for_delete;
let is_unknown_linger_time_exceeded = || {
// if the file's expel time is unknown(because not appear in delta manifest), we keep it for a while
// using it's last modified time
// notice unknown files use a different lingering time
entry
.metadata()
.last_modified()
.map(|t| t < unknown_file_may_linger_until)
.unwrap_or(false)
};
!is_in_manifest
&& !is_in_tmp_ref
&& if is_known {
is_eligible_for_delete
} else {
is_unknown_linger_time_exceeded()
}
}
/// Limit the amount of concurrent GC jobs on the datanode
pub struct GcLimiter {
pub gc_job_limit: Arc<tokio::sync::Semaphore>,
@@ -208,7 +242,7 @@ impl LocalGcWorker {
}
/// Get tmp ref files for all current regions
pub async fn read_tmp_ref_files(&self) -> Result<HashMap<RegionId, HashSet<FileId>>> {
pub async fn read_tmp_ref_files(&self) -> Result<HashMap<RegionId, HashSet<FileRef>>> {
let mut tmp_ref_files = HashMap::new();
for (region_id, file_refs) in &self.file_ref_manifest.file_refs {
tmp_ref_files
@@ -230,6 +264,7 @@ impl LocalGcWorker {
let now = std::time::Instant::now();
let mut deleted_files = HashMap::new();
let mut deleted_indexes = HashMap::new();
let tmp_ref_files = self.read_tmp_ref_files().await?;
for (region_id, region) in &self.regions {
let per_region_time = std::time::Instant::now();
@@ -247,7 +282,12 @@ impl LocalGcWorker {
.cloned()
.unwrap_or_else(HashSet::new);
let files = self.do_region_gc(region.clone(), &tmp_ref_files).await?;
deleted_files.insert(*region_id, files);
let index_files = files
.iter()
.filter_map(|f| f.index_version().map(|v| (f.file_id(), v)))
.collect_vec();
deleted_files.insert(*region_id, files.into_iter().map(|f| f.file_id()).collect());
deleted_indexes.insert(*region_id, index_files);
debug!(
"GC for region {} took {} secs.",
region_id,
@@ -260,6 +300,7 @@ impl LocalGcWorker {
);
let report = GcReport {
deleted_files,
deleted_indexes,
need_retry_regions: HashSet::new(),
};
Ok(report)
@@ -282,8 +323,8 @@ impl LocalGcWorker {
pub async fn do_region_gc(
&self,
region: MitoRegionRef,
tmp_ref_files: &HashSet<FileId>,
) -> Result<Vec<FileId>> {
tmp_ref_files: &HashSet<FileRef>,
) -> Result<Vec<RemovedFile>> {
let region_id = region.region_id();
debug!("Doing gc for region {}", region_id);
@@ -311,64 +352,83 @@ impl LocalGcWorker {
.map(|s| s.len())
.sum::<usize>();
let in_used: HashSet<FileId> = current_files
.keys()
.cloned()
.chain(tmp_ref_files.clone().into_iter())
.collect();
let in_manifest = current_files
.iter()
.map(|(file_id, meta)| (*file_id, meta.index_version()))
.collect::<HashMap<_, _>>();
let unused_files = self
.list_to_be_deleted_files(region_id, &in_used, recently_removed_files, all_entries)
let in_tmp_ref = tmp_ref_files
.iter()
.map(|file_ref| (file_ref.file_id, file_ref.index_version))
.collect::<HashSet<_>>();
let deletable_files = self
.list_to_be_deleted_files(
region_id,
&in_manifest,
&in_tmp_ref,
recently_removed_files,
all_entries,
)
.await?;
let unused_file_cnt = unused_files.len();
let unused_file_cnt = deletable_files.len();
debug!(
"gc: for region {region_id}: In manifest files: {}, Tmp ref file cnt: {}, In-used files: {}, recently removed files: {}, Unused files to delete: {} ",
"gc: for region {region_id}: In manifest files: {}, Tmp ref file cnt: {}, recently removed files: {}, Unused files to delete: {} ",
current_files.len(),
tmp_ref_files.len(),
in_used.len(),
removed_file_cnt,
unused_files.len()
deletable_files.len()
);
// TODO(discord9): for now, ignore async index file as it's design is not stable, need to be improved once
// index file design is stable
let file_pairs: Vec<(FileId, u64)> =
unused_files.iter().map(|file_id| (*file_id, 0)).collect();
// TODO(discord9): gc worker need another major refactor to support versioned index files
debug!(
"Found {} unused index files to delete for region {}",
file_pairs.len(),
deletable_files.len(),
region_id
);
self.delete_files(region_id, &file_pairs).await?;
self.delete_files(region_id, &deletable_files).await?;
debug!(
"Successfully deleted {} unused files for region {}",
unused_file_cnt, region_id
);
// TODO(discord9): update region manifest about deleted files
self.update_manifest_removed_files(&region, unused_files.clone())
self.update_manifest_removed_files(&region, deletable_files.clone())
.await?;
Ok(unused_files)
Ok(deletable_files)
}
async fn delete_files(&self, region_id: RegionId, file_ids: &[(FileId, u64)]) -> Result<()> {
async fn delete_files(&self, region_id: RegionId, removed_files: &[RemovedFile]) -> Result<()> {
let mut index_ids = vec![];
let file_pairs = removed_files
.iter()
.filter_map(|f| match f {
RemovedFile::File(file_id, v) => Some((*file_id, v.unwrap_or(0))),
RemovedFile::Index(file_id, index_version) => {
let region_index_id =
RegionIndexId::new(RegionFileId::new(region_id, *file_id), *index_version);
index_ids.push(region_index_id);
None
}
})
.collect_vec();
delete_files(
region_id,
file_ids,
&file_pairs,
true,
&self.access_layer,
&self.cache_manager,
)
.await?;
for index_id in index_ids {
delete_index(index_id, &self.access_layer, &self.cache_manager).await?;
}
// FIXME(discord9): if files are already deleted before calling delete_files, the metric will be inaccurate, no clean way to fix it now
GC_DELETE_FILE_CNT.add(file_ids.len() as i64);
GC_DELETE_FILE_CNT.add(removed_files.len() as i64);
Ok(())
}
@@ -377,7 +437,7 @@ impl LocalGcWorker {
async fn update_manifest_removed_files(
&self,
region: &MitoRegionRef,
deleted_files: Vec<FileId>,
deleted_files: Vec<RemovedFile>,
) -> Result<()> {
let deleted_file_cnt = deleted_files.len();
debug!(
@@ -403,12 +463,12 @@ impl LocalGcWorker {
pub async fn get_removed_files_expel_times(
&self,
region_manifest: &Arc<RegionManifest>,
) -> Result<BTreeMap<Timestamp, HashSet<FileId>>> {
) -> Result<BTreeMap<Timestamp, HashSet<RemovedFile>>> {
let mut ret = BTreeMap::new();
for files in &region_manifest.removed_files.removed_files {
let expel_time = Timestamp::new_millisecond(files.removed_at);
let set = ret.entry(expel_time).or_insert_with(HashSet::new);
set.extend(files.file_ids.iter().cloned());
set.extend(files.files.iter().cloned());
}
Ok(ret)
@@ -535,75 +595,136 @@ impl LocalGcWorker {
Ok(all_entries)
}
/// Filter files to determine which ones can be deleted based on usage status and lingering time.
/// Returns a vector of file IDs that are safe to delete.
fn filter_deletable_files(
&self,
entries: Vec<Entry>,
in_use_filenames: &HashSet<FileId>,
may_linger_filenames: &HashSet<&FileId>,
eligible_for_removal: &HashSet<&FileId>,
in_manifest: &HashMap<FileId, Option<IndexVersion>>,
in_tmp_ref: &HashSet<(FileId, Option<IndexVersion>)>,
may_linger_files: &HashSet<&RemovedFile>,
eligible_for_delete: &HashSet<&RemovedFile>,
unknown_file_may_linger_until: chrono::DateTime<chrono::Utc>,
) -> (Vec<FileId>, HashSet<FileId>) {
let mut all_unused_files_ready_for_delete = vec![];
let mut all_in_exist_linger_files = HashSet::new();
) -> Vec<RemovedFile> {
let mut ready_for_delete = vec![];
// all group by file id for easier checking
let in_tmp_ref: HashMap<FileId, HashSet<IndexVersion>> =
in_tmp_ref
.iter()
.fold(HashMap::new(), |mut acc, (file, version)| {
let indices = acc.entry(*file).or_default();
if let Some(version) = version {
indices.insert(*version);
}
acc
});
let may_linger_files: HashMap<FileId, HashSet<&RemovedFile>> = may_linger_files
.iter()
.fold(HashMap::new(), |mut acc, file| {
let indices = acc.entry(file.file_id()).or_default();
indices.insert(file);
acc
});
let eligible_for_delete: HashMap<FileId, HashSet<&RemovedFile>> = eligible_for_delete
.iter()
.fold(HashMap::new(), |mut acc, file| {
let indices = acc.entry(file.file_id()).or_default();
indices.insert(file);
acc
});
for entry in entries {
let file_id = match location::parse_file_id_from_path(entry.name()) {
Ok(file_id) => file_id,
let (file_id, file_type) = match location::parse_file_id_type_from_path(entry.name()) {
Ok((file_id, file_type)) => (file_id, file_type),
Err(err) => {
error!(err; "Failed to parse file id from path: {}", entry.name());
// if we can't parse the file id, it means it's not a sst or index file
// shouldn't delete it because we don't know what it is
GC_SKIPPED_UNPARSABLE_FILES.inc();
continue;
}
};
if may_linger_filenames.contains(&file_id) {
all_in_exist_linger_files.insert(file_id);
}
let should_delete = match file_type {
FileType::Parquet => {
let is_in_manifest = in_manifest.contains_key(&file_id);
let is_in_tmp_ref = in_tmp_ref.contains_key(&file_id);
let is_linger = may_linger_files.contains_key(&file_id);
let is_eligible_for_delete = eligible_for_delete.contains_key(&file_id);
let should_delete = !in_use_filenames.contains(&file_id)
&& !may_linger_filenames.contains(&file_id)
&& {
if !eligible_for_removal.contains(&file_id) {
// if the file's expel time is unknown(because not appear in delta manifest), we keep it for a while
// using it's last modified time
// notice unknown files use a different lingering time
entry
.metadata()
.last_modified()
.map(|t| t < unknown_file_may_linger_until)
.unwrap_or(false)
} else {
// if the file did appear in manifest delta(and passes previous predicate), we can delete it immediately
true
}
};
should_delete_file(
is_in_manifest,
is_in_tmp_ref,
is_linger,
is_eligible_for_delete,
&entry,
unknown_file_may_linger_until,
)
}
FileType::Puffin(version) => {
// notice need to check both file id and version
let is_in_manifest = in_manifest
.get(&file_id)
.map(|opt_ver| *opt_ver == Some(version))
.unwrap_or(false);
let is_in_tmp_ref = in_tmp_ref
.get(&file_id)
.map(|versions| versions.contains(&version))
.unwrap_or(false);
let is_linger = may_linger_files
.get(&file_id)
.map(|files| files.contains(&&RemovedFile::Index(file_id, version)))
.unwrap_or(false);
let is_eligible_for_delete = eligible_for_delete
.get(&file_id)
.map(|files| files.contains(&&RemovedFile::Index(file_id, version)))
.unwrap_or(false);
should_delete_file(
is_in_manifest,
is_in_tmp_ref,
is_linger,
is_eligible_for_delete,
&entry,
unknown_file_may_linger_until,
)
}
};
if should_delete {
all_unused_files_ready_for_delete.push(file_id);
let removed_file = match file_type {
FileType::Parquet => {
// notice this cause we don't track index version for parquet files
// since entries comes from listing, we can't get index version from path
RemovedFile::File(file_id, None)
}
FileType::Puffin(version) => {
GC_ORPHANED_INDEX_FILES.inc();
RemovedFile::Index(file_id, version)
}
};
ready_for_delete.push(removed_file);
}
}
(all_unused_files_ready_for_delete, all_in_exist_linger_files)
ready_for_delete
}
/// Concurrently list unused files in the region dir
/// because there may be a lot of files in the region dir
/// and listing them may take a long time.
/// List files to be deleted based on their presence in the manifest, temporary references, and recently removed files.
/// Returns a vector of `RemovedFile` that are eligible for deletion.
///
/// When `full_file_listing` is false, this method will only delete (subset of) files tracked in
/// `recently_removed_files`, which significantly
/// improves performance. When `full_file_listing` is true, it read from `all_entries` to find
/// and delete orphan files (files not tracked in the manifest).
///
/// When `full_file_listing` is false, this method will only delete files tracked in
/// `recently_removed_files` without performing expensive list operations, which significantly
/// improves performance. When `full_file_listing` is true, it performs a full listing to
/// find and delete orphan files.
pub async fn list_to_be_deleted_files(
&self,
region_id: RegionId,
in_used: &HashSet<FileId>,
recently_removed_files: BTreeMap<Timestamp, HashSet<FileId>>,
in_manifest: &HashMap<FileId, Option<IndexVersion>>,
in_tmp_ref: &HashSet<(FileId, Option<IndexVersion>)>,
recently_removed_files: BTreeMap<Timestamp, HashSet<RemovedFile>>,
all_entries: Vec<Entry>,
) -> Result<Vec<FileId>> {
) -> Result<Vec<RemovedFile>> {
let now = chrono::Utc::now();
let may_linger_until = self
.opt
@@ -634,8 +755,10 @@ impl LocalGcWorker {
};
debug!("may_linger_files: {:?}", may_linger_files);
let may_linger_filenames = may_linger_files.values().flatten().collect::<HashSet<_>>();
let all_may_linger_files = may_linger_files.values().flatten().collect::<HashSet<_>>();
// known files(tracked in removed files field) that are eligible for removal
// (passed lingering time)
let eligible_for_removal = recently_removed_files
.values()
.flatten()
@@ -646,12 +769,24 @@ impl LocalGcWorker {
if !self.full_file_listing {
// Only delete files that:
// 1. Are in recently_removed_files (tracked in manifest)
// 2. Are not in use
// 2. Are not in use(in manifest or tmp ref)
// 3. Have passed the lingering time
let files_to_delete: Vec<FileId> = eligible_for_removal
let files_to_delete: Vec<RemovedFile> = eligible_for_removal
.iter()
.filter(|file_id| !in_used.contains(*file_id))
.map(|&f| *f)
.filter(|file_id| {
let in_use = match file_id {
RemovedFile::File(file_id, index_version) => {
in_manifest.get(file_id) == Some(index_version)
|| in_tmp_ref.contains(&(*file_id, *index_version))
}
RemovedFile::Index(file_id, index_version) => {
in_manifest.get(file_id) == Some(&Some(*index_version))
|| in_tmp_ref.contains(&(*file_id, Some(*index_version)))
}
};
!in_use
})
.map(|&f| f.clone())
.collect();
info!(
@@ -666,16 +801,14 @@ impl LocalGcWorker {
// Full file listing mode: get the full list of files from object store
// Step 3: Filter files to determine which ones can be deleted
let (all_unused_files_ready_for_delete, all_in_exist_linger_files) = self
.filter_deletable_files(
all_entries,
in_used,
&may_linger_filenames,
&eligible_for_removal,
unknown_file_may_linger_until,
);
debug!("All in exist linger files: {:?}", all_in_exist_linger_files);
let all_unused_files_ready_for_delete = self.filter_deletable_files(
all_entries,
in_manifest,
in_tmp_ref,
&all_may_linger_files,
&eligible_for_removal,
unknown_file_may_linger_until,
);
Ok(all_unused_files_ready_for_delete)
}

View File

@@ -19,12 +19,13 @@ use api::v1::Rows;
use common_telemetry::init_default_ut_logging;
use store_api::region_engine::RegionEngine as _;
use store_api::region_request::{RegionCompactRequest, RegionRequest};
use store_api::storage::{FileRefsManifest, RegionId};
use store_api::storage::{FileRef, FileRefsManifest, RegionId};
use crate::config::MitoConfig;
use crate::engine::MitoEngine;
use crate::engine::compaction_test::{delete_and_flush, put_and_flush};
use crate::gc::{GcConfig, LocalGcWorker};
use crate::manifest::action::RemovedFile;
use crate::region::MitoRegionRef;
use crate::test_util::{
CreateRequestBuilder, TestEnv, build_rows, flush_region, put_rows, rows_schema,
@@ -120,9 +121,9 @@ async fn test_gc_worker_basic_truncate() {
let manifest = region.manifest_ctx.manifest().await;
assert!(
manifest.removed_files.removed_files[0]
.file_ids
.contains(&to_be_deleted_file_id)
&& manifest.removed_files.removed_files[0].file_ids.len() == 1
.files
.contains(&RemovedFile::File(to_be_deleted_file_id, None))
&& manifest.removed_files.removed_files[0].files.len() == 1
&& manifest.files.is_empty(),
"Manifest after truncate: {:?}",
manifest
@@ -214,9 +215,9 @@ async fn test_gc_worker_truncate_with_ref() {
let manifest = region.manifest_ctx.manifest().await;
assert!(
manifest.removed_files.removed_files[0]
.file_ids
.contains(&to_be_deleted_file_id)
&& manifest.removed_files.removed_files[0].file_ids.len() == 1
.files
.contains(&RemovedFile::File(to_be_deleted_file_id, None))
&& manifest.removed_files.removed_files[0].files.len() == 1
&& manifest.files.is_empty(),
"Manifest after truncate: {:?}",
manifest
@@ -225,7 +226,11 @@ async fn test_gc_worker_truncate_with_ref() {
let regions = BTreeMap::from([(region_id, region.clone())]);
let file_ref_manifest = FileRefsManifest {
file_refs: [(region_id, HashSet::from([to_be_deleted_file_id]))].into(),
file_refs: [(
region_id,
HashSet::from([FileRef::new(region_id, to_be_deleted_file_id, None)]),
)]
.into(),
manifest_version: [(region_id, version)].into(),
};
let gc_worker = create_gc_worker(&engine, regions, &file_ref_manifest, true).await;
@@ -235,7 +240,7 @@ async fn test_gc_worker_truncate_with_ref() {
let manifest = region.manifest_ctx.manifest().await;
assert!(
manifest.removed_files.removed_files[0].file_ids.len() == 1 && manifest.files.is_empty(),
manifest.removed_files.removed_files[0].files.len() == 1 && manifest.files.is_empty(),
"Manifest: {:?}",
manifest
);
@@ -300,7 +305,7 @@ async fn test_gc_worker_basic_compact() {
let region = engine.get_region(region_id).unwrap();
let manifest = region.manifest_ctx.manifest().await;
assert_eq!(manifest.removed_files.removed_files[0].file_ids.len(), 3);
assert_eq!(manifest.removed_files.removed_files[0].files.len(), 3);
let version = manifest.manifest_version;
@@ -376,7 +381,7 @@ async fn test_gc_worker_compact_with_ref() {
let region = engine.get_region(region_id).unwrap();
let manifest = region.manifest_ctx.manifest().await;
assert_eq!(manifest.removed_files.removed_files[0].file_ids.len(), 3);
assert_eq!(manifest.removed_files.removed_files[0].files.len(), 3);
let version = manifest.manifest_version;
@@ -385,9 +390,12 @@ async fn test_gc_worker_compact_with_ref() {
file_refs: HashMap::from([(
region_id,
manifest.removed_files.removed_files[0]
.file_ids
.files
.iter()
.cloned()
.map(|removed_file| match removed_file {
RemovedFile::File(file_id, v) => FileRef::new(region_id, *file_id, *v),
RemovedFile::Index(file_id, v) => FileRef::new(region_id, *file_id, Some(*v)),
})
.collect(),
)]),
manifest_version: [(region_id, version)].into(),

View File

@@ -22,7 +22,7 @@ use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::ManifestVersion;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{FileId, RegionId, SequenceNumber};
use store_api::storage::{FileId, IndexVersion, RegionId, SequenceNumber};
use strum::Display;
use crate::error::{RegionMetadataNotFoundSnafu, Result, SerdeJsonSnafu, Utf8Snafu};
@@ -193,17 +193,27 @@ impl RegionManifestBuilder {
pub fn apply_edit(&mut self, manifest_version: ManifestVersion, edit: RegionEdit) {
self.manifest_version = manifest_version;
let mut removed_files = vec![];
for file in edit.files_to_add {
self.files.insert(file.file_id, file);
if let Some(old_file) = self.files.insert(file.file_id, file.clone())
&& let Some(old_index) = old_file.index_version()
&& !old_file.is_index_up_to_date(&file)
{
// The old file has an index that is now outdated.
removed_files.push(RemovedFile::Index(old_file.file_id, old_index));
}
}
self.removed_files.add_removed_files(
removed_files.extend(
edit.files_to_remove
.iter()
.map(|meta| meta.file_id)
.collect(),
edit.timestamp_ms
.unwrap_or_else(|| Utc::now().timestamp_millis()),
.map(|f| RemovedFile::File(f.file_id, f.index_version())),
);
let at = edit
.timestamp_ms
.unwrap_or_else(|| Utc::now().timestamp_millis());
self.removed_files.add_removed_files(removed_files, at);
for file in edit.files_to_remove {
self.files.remove(&file.file_id);
}
@@ -236,7 +246,10 @@ impl RegionManifestBuilder {
self.flushed_sequence = truncated_sequence;
self.truncated_entry_id = Some(truncated_entry_id);
self.removed_files.add_removed_files(
self.files.values().map(|meta| meta.file_id).collect(),
self.files
.values()
.map(|f| RemovedFile::File(f.file_id, f.index_version()))
.collect(),
truncate
.timestamp_ms
.unwrap_or_else(|| Utc::now().timestamp_millis()),
@@ -245,7 +258,10 @@ impl RegionManifestBuilder {
}
TruncateKind::Partial { files_to_remove } => {
self.removed_files.add_removed_files(
files_to_remove.iter().map(|meta| meta.file_id).collect(),
files_to_remove
.iter()
.map(|f| RemovedFile::File(f.file_id, f.index_version()))
.collect(),
truncate
.timestamp_ms
.unwrap_or_else(|| Utc::now().timestamp_millis()),
@@ -295,20 +311,22 @@ pub struct RemovedFilesRecord {
impl RemovedFilesRecord {
/// Clear the actually deleted files from the list of removed files
pub fn clear_deleted_files(&mut self, deleted_files: Vec<FileId>) {
pub fn clear_deleted_files(&mut self, deleted_files: Vec<RemovedFile>) {
let deleted_file_set: HashSet<_> = HashSet::from_iter(deleted_files);
for files in self.removed_files.iter_mut() {
files.file_ids.retain(|fid| !deleted_file_set.contains(fid));
files
.files
.retain(|removed| !deleted_file_set.contains(removed));
}
self.removed_files.retain(|fs| !fs.file_ids.is_empty());
self.removed_files.retain(|fs| !fs.files.is_empty());
}
pub fn update_file_removed_cnt_to_stats(&self, stats: &ManifestStats) {
let cnt = self
.removed_files
.iter()
.map(|r| r.file_ids.len() as u64)
.map(|r| r.files.len() as u64)
.sum();
stats
.file_removed_cnt
@@ -322,18 +340,42 @@ pub struct RemovedFiles {
/// the files are removed from manifest. The timestamp is in milliseconds since unix epoch.
pub removed_at: i64,
/// The set of file ids that are removed.
pub file_ids: HashSet<FileId>,
pub files: HashSet<RemovedFile>,
}
/// A removed file, which can be a data file(optional paired with a index file) or an outdated index file.
#[derive(Serialize, Deserialize, Hash, Clone, Debug, PartialEq, Eq)]
pub enum RemovedFile {
File(FileId, Option<IndexVersion>),
Index(FileId, IndexVersion),
}
impl RemovedFile {
pub fn file_id(&self) -> FileId {
match self {
RemovedFile::File(file_id, _) => *file_id,
RemovedFile::Index(file_id, _) => *file_id,
}
}
pub fn index_version(&self) -> Option<IndexVersion> {
match self {
RemovedFile::File(_, index_version) => *index_version,
RemovedFile::Index(_, index_version) => Some(*index_version),
}
}
}
impl RemovedFilesRecord {
/// Add a record of removed files with the current timestamp.
pub fn add_removed_files(&mut self, file_ids: HashSet<FileId>, at: i64) {
if file_ids.is_empty() {
pub fn add_removed_files(&mut self, removed: Vec<RemovedFile>, at: i64) {
if removed.is_empty() {
return;
}
let files = removed.into_iter().collect();
self.removed_files.push(RemovedFiles {
removed_at: at,
file_ids,
files,
});
}
@@ -738,10 +780,10 @@ mod tests {
removed_files: RemovedFilesRecord {
removed_files: vec![RemovedFiles {
removed_at: 0,
file_ids: HashSet::from([FileId::parse_str(
"4b220a70-2b03-4641-9687-b65d94641208",
)
.unwrap()]),
files: HashSet::from([RemovedFile::File(
FileId::parse_str("4b220a70-2b03-4641-9687-b65d94641208").unwrap(),
None,
)]),
}],
},
sst_format: FormatType::PrimaryKey,

View File

@@ -21,7 +21,6 @@ use futures::TryStreamExt;
use object_store::ObjectStore;
use snafu::{OptionExt, ResultExt, ensure};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::FileId;
use store_api::{MAX_VERSION, MIN_VERSION, ManifestVersion};
use crate::cache::manifest_cache::ManifestCache;
@@ -31,7 +30,7 @@ use crate::error::{
};
use crate::manifest::action::{
RegionChange, RegionCheckpoint, RegionEdit, RegionManifest, RegionManifestBuilder,
RegionMetaAction, RegionMetaActionList,
RegionMetaAction, RegionMetaActionList, RemovedFile,
};
use crate::manifest::checkpointer::Checkpointer;
use crate::manifest::storage::{
@@ -589,7 +588,7 @@ impl RegionManifestManager {
}
/// Clear deleted files from manifest's `removed_files` field without update version. Notice if datanode exit before checkpoint then new manifest by open region may still contain these deleted files, which is acceptable for gc process.
pub fn clear_deleted_files(&mut self, deleted_files: Vec<FileId>) {
pub fn clear_deleted_files(&mut self, deleted_files: Vec<RemovedFile>) {
let mut manifest = (*self.manifest()).clone();
manifest.removed_files.clear_deleted_files(deleted_files);
self.set_manifest(Arc::new(manifest));

View File

@@ -509,6 +509,20 @@ lazy_static! {
"mito gc deleted file count",
).unwrap();
/// Counter for the number of unparsable files skipped by GC.
pub static ref GC_SKIPPED_UNPARSABLE_FILES: IntCounter =
register_int_counter!(
"greptime_mito_gc_skipped_unparsable_files",
"mito gc skipped unparsable files count",
).unwrap();
/// Counter for the number of orphaned index files found by GC.
pub static ref GC_ORPHANED_INDEX_FILES: IntCounter =
register_int_counter!(
"greptime_mito_gc_orphaned_index_files",
"mito gc orphaned index files count",
).unwrap();
/// Total number of files downloaded during cache fill on region open.
pub static ref CACHE_FILL_DOWNLOADED_FILES: IntCounter = register_int_counter!(
"mito_cache_fill_downloaded_files",

View File

@@ -312,6 +312,14 @@ impl FileMeta {
!self.available_indexes.is_empty()
}
pub fn index_version(&self) -> Option<IndexVersion> {
if self.exists_index() {
Some(self.index_version)
} else {
None
}
}
/// Whether the index file is up-to-date comparing to another file meta.
pub fn is_index_up_to_date(&self, other: &FileMeta) -> bool {
self.exists_index() && other.exists_index() && self.index_version >= other.index_version

View File

@@ -178,8 +178,8 @@ impl FilePurger for ObjectStoreFilePurger {
// if not on local file system, instead inform the global file purger to remove the file reference.
// notice that no matter whether the file is deleted or not, we need to remove the reference
// because the file is no longer in use nonetheless.
// for same reason, we don't care about index_outdated here.
self.file_ref_manager.remove_file(&file_meta);
// TODO(discord9): consider impl a .tombstone file to reduce files needed to list
}
fn new_file(&self, file_meta: &FileMeta) {

View File

@@ -91,7 +91,7 @@ impl FileReferenceManager {
// get from in memory file handles
for region_id in query_regions.iter().map(|r| r.region_id()) {
if let Some(files) = self.ref_file_set(region_id) {
ref_files.insert(region_id, files.into_iter().map(|f| f.file_id).collect());
ref_files.insert(region_id, files);
}
}
@@ -108,10 +108,17 @@ impl FileReferenceManager {
let manifest = related_region.manifest_ctx.manifest().await;
for meta in manifest.files.values() {
if queries.contains(&meta.region_id) {
// since gc couldn't happen together with repartition
// (both the queries and related_region acquire region read lock), no need to worry about
// staging manifest in repartition here.
ref_files
.entry(meta.region_id)
.or_insert_with(HashSet::new)
.insert(meta.file_id);
.insert(FileRef::new(
meta.region_id,
meta.file_id,
meta.index_version(),
));
}
}
// not sure if related region's manifest version is needed, but record it for now.
@@ -132,7 +139,11 @@ impl FileReferenceManager {
let region_id = file_meta.region_id;
let mut is_new = false;
{
let file_ref = FileRef::new(file_meta.region_id, file_meta.file_id);
let file_ref = FileRef::new(
file_meta.region_id,
file_meta.file_id,
file_meta.index_version(),
);
self.files_per_region
.entry(region_id)
.and_modify(|refs| {
@@ -157,7 +168,7 @@ impl FileReferenceManager {
/// If the reference count reaches zero, the file reference will be removed from the manager.
pub fn remove_file(&self, file_meta: &FileMeta) {
let region_id = file_meta.region_id;
let file_ref = FileRef::new(region_id, file_meta.file_id);
let file_ref = FileRef::new(region_id, file_meta.file_id, file_meta.index_version());
let mut remove_table_entry = false;
let mut remove_file_ref = false;
@@ -247,13 +258,23 @@ mod tests {
.get(&file_meta.region_id)
.unwrap()
.files,
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)])
HashMap::from_iter([(
FileRef::new(
file_meta.region_id,
file_meta.file_id,
file_meta.index_version()
),
1
)])
);
file_ref_mgr.add_file(&file_meta);
let expected_region_ref_manifest =
HashSet::from_iter([FileRef::new(file_meta.region_id, file_meta.file_id)]);
let expected_region_ref_manifest = HashSet::from_iter([FileRef::new(
file_meta.region_id,
file_meta.file_id,
file_meta.index_version(),
)]);
assert_eq!(
file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(),
@@ -266,7 +287,14 @@ mod tests {
.get(&file_meta.region_id)
.unwrap()
.files,
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 2)])
HashMap::from_iter([(
FileRef::new(
file_meta.region_id,
file_meta.file_id,
file_meta.index_version()
),
2
)])
);
assert_eq!(
@@ -282,7 +310,14 @@ mod tests {
.get(&file_meta.region_id)
.unwrap()
.files,
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)])
HashMap::from_iter([(
FileRef::new(
file_meta.region_id,
file_meta.file_id,
file_meta.index_version()
),
1
)])
);
assert_eq!(

View File

@@ -19,6 +19,7 @@ use store_api::path_utils::region_name;
use store_api::region_request::PathType;
use store_api::storage::{FileId, RegionId};
use crate::cache::file_cache::FileType;
use crate::error::UnexpectedSnafu;
use crate::sst::file::{RegionFileId, RegionIndexId};
@@ -110,31 +111,29 @@ pub fn parse_index_file_info(filepath: &str) -> crate::error::Result<(FileId, u6
}
}
/// Get RegionFileId from sst or index filename
pub fn parse_file_id_from_path(filepath: &str) -> crate::error::Result<FileId> {
pub fn parse_file_id_type_from_path(filepath: &str) -> crate::error::Result<(FileId, FileType)> {
let filename = filepath.rsplit('/').next().context(UnexpectedSnafu {
reason: format!("invalid file path: {}", filepath),
})?;
// get part before first '.'
let parts: Vec<&str> = filename.split('.').collect();
if parts.len() != 2 {
if parts.len() < 2 {
return UnexpectedSnafu {
reason: format!("invalid file name: {}", filename),
}
.fail();
}
if parts[1] != "parquet" && parts[1] != "puffin" {
return UnexpectedSnafu {
reason: format!("invalid file extension: {}", parts[1]),
}
.fail();
}
let file_id = parts[0];
FileId::parse_str(file_id).map_err(|e| {
let file_id = FileId::parse_str(file_id).map_err(|e| {
UnexpectedSnafu {
reason: format!("invalid file id: {}, err: {}", file_id, e),
}
.build()
})
})?;
let file_type = FileType::parse(parts[1..].join(".").as_str()).context(UnexpectedSnafu {
reason: format!("invalid file type in file name: {}", filename),
})?;
Ok((file_id, file_type))
}
#[cfg(test)]
@@ -220,4 +219,62 @@ mod tests {
assert_eq!(result.0.to_string(), file_id.to_string());
assert_eq!(result.1, 42);
}
#[test]
fn test_parse_file_id_type_from_path() {
use crate::cache::file_cache::FileType;
// Test parquet file
let file_id = FileId::random();
let path = format!("table_dir/1_0000000002/data/{}.parquet", file_id);
let result = parse_file_id_type_from_path(&path).unwrap();
assert_eq!(result.0.to_string(), file_id.to_string());
assert_eq!(result.1, FileType::Parquet);
// Test puffin file (legacy format, version 0)
let file_id = FileId::random();
let path = format!("table_dir/1_0000000002/index/{}.puffin", file_id);
let result = parse_file_id_type_from_path(&path).unwrap();
assert_eq!(result.0.to_string(), file_id.to_string());
assert_eq!(result.1, FileType::Puffin(0));
// Test versioned puffin file
let file_id = FileId::random();
let path = format!("table_dir/1_0000000002/index/{}.1.puffin", file_id);
let result = parse_file_id_type_from_path(&path).unwrap();
assert_eq!(result.0.to_string(), file_id.to_string());
assert_eq!(result.1, FileType::Puffin(1));
// Test with different path types
let file_id = FileId::random();
let path = format!("table_dir/1_0000000002/metadata/{}.parquet", file_id);
let result = parse_file_id_type_from_path(&path).unwrap();
assert_eq!(result.0.to_string(), file_id.to_string());
assert_eq!(result.1, FileType::Parquet);
// Test with bare path type
let file_id = FileId::random();
let path = format!("table_dir/1_0000000002/{}.parquet", file_id);
let result = parse_file_id_type_from_path(&path).unwrap();
assert_eq!(result.0.to_string(), file_id.to_string());
assert_eq!(result.1, FileType::Parquet);
// Test error cases
// Invalid file extension
let result = parse_file_id_type_from_path("table_dir/1_0000000002/data/test.invalid");
assert!(result.is_err());
// Invalid file ID
let result =
parse_file_id_type_from_path("table_dir/1_0000000002/data/invalid-file-id.parquet");
assert!(result.is_err());
// No file extension
let result = parse_file_id_type_from_path("table_dir/1_0000000002/data/test");
assert!(result.is_err());
// Empty filename
let result = parse_file_id_type_from_path("table_dir/1_0000000002/data/");
assert!(result.is_err());
}
}

View File

@@ -77,11 +77,16 @@ impl FromStr for FileId {
pub struct FileRef {
pub region_id: RegionId,
pub file_id: FileId,
pub index_version: Option<IndexVersion>,
}
impl FileRef {
pub fn new(region_id: RegionId, file_id: FileId) -> Self {
Self { region_id, file_id }
pub fn new(region_id: RegionId, file_id: FileId, index_version: Option<IndexVersion>) -> Self {
Self {
region_id,
file_id,
index_version,
}
}
}
@@ -89,7 +94,7 @@ impl FileRef {
/// Also record the manifest version when these tmp files are read.
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct FileRefsManifest {
pub file_refs: HashMap<RegionId, HashSet<FileId>>,
pub file_refs: HashMap<RegionId, HashSet<FileRef>>,
/// Manifest version when this manifest is read for it's files
pub manifest_version: HashMap<RegionId, ManifestVersion>,
}
@@ -97,7 +102,9 @@ pub struct FileRefsManifest {
#[derive(Clone, Default, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct GcReport {
/// deleted files per region
/// TODO(discord9): change to `RemovedFile`?
pub deleted_files: HashMap<RegionId, Vec<FileId>>,
pub deleted_indexes: HashMap<RegionId, Vec<(FileId, IndexVersion)>>,
/// Regions that need retry in next gc round, usually because their tmp ref files are outdated
pub need_retry_regions: HashSet<RegionId>,
}
@@ -105,10 +112,12 @@ pub struct GcReport {
impl GcReport {
pub fn new(
deleted_files: HashMap<RegionId, Vec<FileId>>,
deleted_indexes: HashMap<RegionId, Vec<(FileId, IndexVersion)>>,
need_retry_regions: HashSet<RegionId>,
) -> Self {
Self {
deleted_files,
deleted_indexes,
need_retry_regions,
}
}
@@ -162,8 +171,12 @@ mod tests {
let mut manifest = FileRefsManifest::default();
let r0 = RegionId::new(1024, 1);
let r1 = RegionId::new(1024, 2);
manifest.file_refs.insert(r0, [FileId::random()].into());
manifest.file_refs.insert(r1, [FileId::random()].into());
manifest
.file_refs
.insert(r0, [FileRef::new(r0, FileId::random(), None)].into());
manifest
.file_refs
.insert(r1, [FileRef::new(r1, FileId::random(), None)].into());
manifest.manifest_version.insert(r0, 10);
manifest.manifest_version.insert(r1, 20);
@@ -171,4 +184,67 @@ mod tests {
let parsed: FileRefsManifest = serde_json::from_str(&json).unwrap();
assert_eq!(manifest, parsed);
}
#[test]
fn test_file_ref_new() {
let region_id = RegionId::new(1024, 1);
let file_id = FileId::random();
// Test with Some(index_version)
let index_version: IndexVersion = 42;
let file_ref = FileRef::new(region_id, file_id, Some(index_version));
assert_eq!(file_ref.region_id, region_id);
assert_eq!(file_ref.file_id, file_id);
assert_eq!(file_ref.index_version, Some(index_version));
// Test with None
let file_ref_none = FileRef::new(region_id, file_id, None);
assert_eq!(file_ref_none.region_id, region_id);
assert_eq!(file_ref_none.file_id, file_id);
assert_eq!(file_ref_none.index_version, None);
}
#[test]
fn test_file_ref_equality() {
let region_id = RegionId::new(1024, 1);
let file_id = FileId::random();
let file_ref1 = FileRef::new(region_id, file_id, Some(10));
let file_ref2 = FileRef::new(region_id, file_id, Some(10));
let file_ref3 = FileRef::new(region_id, file_id, Some(20));
let file_ref4 = FileRef::new(region_id, file_id, None);
assert_eq!(file_ref1, file_ref2);
assert_ne!(file_ref1, file_ref3);
assert_ne!(file_ref1, file_ref4);
assert_ne!(file_ref3, file_ref4);
// Test equality with Some(0) vs None
let file_ref_zero = FileRef::new(region_id, file_id, Some(0));
assert_ne!(file_ref_zero, file_ref4);
}
#[test]
fn test_file_ref_serialization() {
let region_id = RegionId::new(1024, 1);
let file_id = FileId::random();
// Test with Some(index_version)
let index_version: IndexVersion = 12345;
let file_ref = FileRef::new(region_id, file_id, Some(index_version));
let json = serde_json::to_string(&file_ref).unwrap();
let parsed: FileRef = serde_json::from_str(&json).unwrap();
assert_eq!(file_ref, parsed);
assert_eq!(parsed.index_version, Some(index_version));
// Test with None
let file_ref_none = FileRef::new(region_id, file_id, None);
let json_none = serde_json::to_string(&file_ref_none).unwrap();
let parsed_none: FileRef = serde_json::from_str(&json_none).unwrap();
assert_eq!(file_ref_none, parsed_none);
assert_eq!(parsed_none.index_version, None);
}
}