From dec0d522f8a66f398193e28462a28ec0ba80a6f8 Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Wed, 24 Dec 2025 11:07:53 +0800 Subject: [PATCH] feat: gc versioned index (#7412) * feat: add index version to file ref Signed-off-by: discord9 * refactor wip Signed-off-by: discord9 * wip Signed-off-by: discord9 * update gc worker Signed-off-by: discord9 * stuff Signed-off-by: discord9 * gc report for index files Signed-off-by: discord9 * fix: type Signed-off-by: discord9 * stuff Signed-off-by: discord9 * chore: clippy Signed-off-by: discord9 * chore: metrics Signed-off-by: discord9 * typo Signed-off-by: discord9 * typo Signed-off-by: discord9 * chore: naming Signed-off-by: discord9 * docs: update explain Signed-off-by: discord9 * test: parse file id/type from file path Signed-off-by: discord9 * chore: change parse method visibility to crate Signed-off-by: discord9 * pcr Signed-off-by: discord9 * pcr Signed-off-by: discord9 * chore Signed-off-by: discord9 --------- Signed-off-by: discord9 --- src/common/meta/src/instruction.rs | 16 +- src/meta-srv/src/gc/ctx.rs | 2 +- src/meta-srv/src/gc/mock.rs | 1 + src/meta-srv/src/gc/mock/concurrent.rs | 6 +- src/meta-srv/src/gc/mock/err_handle.rs | 7 +- src/meta-srv/src/gc/procedure.rs | 3 +- src/mito2/src/cache/file_cache.rs | 2 +- src/mito2/src/gc.rs | 321 +++++++++++++++++-------- src/mito2/src/gc/worker_test.rs | 34 ++- src/mito2/src/manifest/action.rs | 84 +++++-- src/mito2/src/manifest/manager.rs | 5 +- src/mito2/src/metrics.rs | 14 ++ src/mito2/src/sst/file.rs | 8 + src/mito2/src/sst/file_purger.rs | 2 +- src/mito2/src/sst/file_ref.rs | 53 +++- src/mito2/src/sst/location.rs | 79 +++++- src/store-api/src/storage/file.rs | 86 ++++++- 17 files changed, 552 insertions(+), 171 deletions(-) diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index 52dd43579b..44a6d920f9 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -834,7 +834,7 @@ impl InstructionReply { mod tests { use std::collections::HashSet; - use store_api::storage::FileId; + use store_api::storage::{FileId, FileRef}; use super::*; @@ -1209,12 +1209,14 @@ mod tests { let mut manifest = FileRefsManifest::default(); let r0 = RegionId::new(1024, 1); let r1 = RegionId::new(1024, 2); - manifest - .file_refs - .insert(r0, HashSet::from([FileId::random()])); - manifest - .file_refs - .insert(r1, HashSet::from([FileId::random()])); + manifest.file_refs.insert( + r0, + HashSet::from([FileRef::new(r0, FileId::random(), None)]), + ); + manifest.file_refs.insert( + r1, + HashSet::from([FileRef::new(r1, FileId::random(), None)]), + ); manifest.manifest_version.insert(r0, 10); manifest.manifest_version.insert(r1, 20); diff --git a/src/meta-srv/src/gc/ctx.rs b/src/meta-srv/src/gc/ctx.rs index 7b1cfc68e1..a4c75d4cd6 100644 --- a/src/meta-srv/src/gc/ctx.rs +++ b/src/meta-srv/src/gc/ctx.rs @@ -194,7 +194,7 @@ impl SchedulerCtx for DefaultGcSchedulerCtx { } // Send GetFileRefs instructions to each datanode - let mut all_file_refs: HashMap> = HashMap::new(); + let mut all_file_refs: HashMap> = HashMap::new(); let mut all_manifest_versions = HashMap::new(); for (peer, regions) in datanode2query_regions { diff --git a/src/meta-srv/src/gc/mock.rs b/src/meta-srv/src/gc/mock.rs index 61ec515985..96eb1cdf51 100644 --- a/src/meta-srv/src/gc/mock.rs +++ b/src/meta-srv/src/gc/mock.rs @@ -53,6 +53,7 @@ pub fn new_empty_report_with(region_ids: impl IntoIterator) -> } GcReport { deleted_files, + deleted_indexes: HashMap::new(), need_retry_regions: HashSet::new(), } } diff --git a/src/meta-srv/src/gc/mock/concurrent.rs b/src/meta-srv/src/gc/mock/concurrent.rs index 2bef9b9896..0d5bf4af3f 100644 --- a/src/meta-srv/src/gc/mock/concurrent.rs +++ b/src/meta-srv/src/gc/mock/concurrent.rs @@ -454,7 +454,11 @@ async fn test_region_gc_concurrency_with_retryable_errors() { ( region_id, // mock the actual gc report with deleted files when succeeded(even no files to delete) - GcReport::new(HashMap::from([(region_id, vec![])]), HashSet::new()), + GcReport::new( + HashMap::from([(region_id, vec![])]), + Default::default(), + HashSet::new(), + ), ) }) .collect(); diff --git a/src/meta-srv/src/gc/mock/err_handle.rs b/src/meta-srv/src/gc/mock/err_handle.rs index 952671006d..a447904440 100644 --- a/src/meta-srv/src/gc/mock/err_handle.rs +++ b/src/meta-srv/src/gc/mock/err_handle.rs @@ -20,7 +20,7 @@ use common_meta::datanode::RegionManifestInfo; use common_meta::peer::Peer; use common_telemetry::init_default_ut_logging; use store_api::region_engine::RegionRole; -use store_api::storage::{FileId, FileRefsManifest, GcReport, RegionId}; +use store_api::storage::{FileId, FileRef, FileRefsManifest, GcReport, RegionId}; use crate::gc::mock::{ MockSchedulerCtx, TEST_REGION_SIZE_200MB, mock_region_stat, new_empty_report_with, @@ -60,7 +60,10 @@ async fn test_gc_regions_failure_handling() { let file_refs = FileRefsManifest { manifest_version: HashMap::from([(region_id, 1)]), - file_refs: HashMap::from([(region_id, HashSet::from([FileId::random()]))]), + file_refs: HashMap::from([( + region_id, + HashSet::from([FileRef::new(region_id, FileId::random(), None)]), + )]), }; let ctx = Arc::new( diff --git a/src/meta-srv/src/gc/procedure.rs b/src/meta-srv/src/gc/procedure.rs index 4ddd606630..039e542cd0 100644 --- a/src/meta-srv/src/gc/procedure.rs +++ b/src/meta-srv/src/gc/procedure.rs @@ -356,8 +356,7 @@ impl BatchGcProcedure { } // Send GetFileRefs instructions to each datanode - let mut all_file_refs: HashMap> = - HashMap::new(); + let mut all_file_refs: HashMap> = HashMap::new(); let mut all_manifest_versions = HashMap::new(); for (peer, regions) in datanode2query_regions { diff --git a/src/mito2/src/cache/file_cache.rs b/src/mito2/src/cache/file_cache.rs index 9392cbeaba..58a2dac588 100644 --- a/src/mito2/src/cache/file_cache.rs +++ b/src/mito2/src/cache/file_cache.rs @@ -727,7 +727,7 @@ impl fmt::Display for FileType { impl FileType { /// Parses the file type from string. - fn parse(s: &str) -> Option { + pub(crate) fn parse(s: &str) -> Option { match s { "parquet" => Some(FileType::Parquet), "puffin" => Some(FileType::Puffin(0)), diff --git a/src/mito2/src/gc.rs b/src/mito2/src/gc.rs index ed8841794b..3b5e988295 100644 --- a/src/mito2/src/gc.rs +++ b/src/mito2/src/gc.rs @@ -28,28 +28,62 @@ use std::time::Duration; use common_meta::datanode::GcStat; use common_telemetry::{debug, error, info, warn}; use common_time::Timestamp; +use itertools::Itertools; use object_store::{Entry, Lister}; use serde::{Deserialize, Serialize}; use snafu::ResultExt as _; -use store_api::storage::{FileId, FileRefsManifest, GcReport, RegionId}; +use store_api::storage::{FileId, FileRef, FileRefsManifest, GcReport, IndexVersion, RegionId}; use tokio::sync::{OwnedSemaphorePermit, TryAcquireError}; use tokio_stream::StreamExt; use crate::access_layer::AccessLayerRef; use crate::cache::CacheManagerRef; +use crate::cache::file_cache::FileType; use crate::config::MitoConfig; use crate::error::{ DurationOutOfRangeSnafu, JoinSnafu, OpenDalSnafu, Result, TooManyGcJobsSnafu, UnexpectedSnafu, }; -use crate::manifest::action::RegionManifest; -use crate::metrics::GC_DELETE_FILE_CNT; +use crate::manifest::action::{RegionManifest, RemovedFile}; +use crate::metrics::{GC_DELETE_FILE_CNT, GC_ORPHANED_INDEX_FILES, GC_SKIPPED_UNPARSABLE_FILES}; use crate::region::{MitoRegionRef, RegionRoleState}; -use crate::sst::file::delete_files; +use crate::sst::file::{RegionFileId, RegionIndexId, delete_files, delete_index}; use crate::sst::location::{self}; #[cfg(test)] mod worker_test; +/// Helper function to determine if a file should be deleted based on common logic +/// shared between Parquet and Puffin file types. +fn should_delete_file( + is_in_manifest: bool, + is_in_tmp_ref: bool, + is_linger: bool, + is_eligible_for_delete: bool, + entry: &Entry, + unknown_file_may_linger_until: chrono::DateTime, +) -> bool { + let is_known = is_linger || is_eligible_for_delete; + + let is_unknown_linger_time_exceeded = || { + // if the file's expel time is unknown(because not appear in delta manifest), we keep it for a while + // using it's last modified time + // notice unknown files use a different lingering time + entry + .metadata() + .last_modified() + .map(|t| t < unknown_file_may_linger_until) + .unwrap_or(false) + }; + + !is_in_manifest + && !is_in_tmp_ref + && if is_known { + is_eligible_for_delete + } else { + is_unknown_linger_time_exceeded() + } +} + /// Limit the amount of concurrent GC jobs on the datanode pub struct GcLimiter { pub gc_job_limit: Arc, @@ -208,7 +242,7 @@ impl LocalGcWorker { } /// Get tmp ref files for all current regions - pub async fn read_tmp_ref_files(&self) -> Result>> { + pub async fn read_tmp_ref_files(&self) -> Result>> { let mut tmp_ref_files = HashMap::new(); for (region_id, file_refs) in &self.file_ref_manifest.file_refs { tmp_ref_files @@ -230,6 +264,7 @@ impl LocalGcWorker { let now = std::time::Instant::now(); let mut deleted_files = HashMap::new(); + let mut deleted_indexes = HashMap::new(); let tmp_ref_files = self.read_tmp_ref_files().await?; for (region_id, region) in &self.regions { let per_region_time = std::time::Instant::now(); @@ -247,7 +282,12 @@ impl LocalGcWorker { .cloned() .unwrap_or_else(HashSet::new); let files = self.do_region_gc(region.clone(), &tmp_ref_files).await?; - deleted_files.insert(*region_id, files); + let index_files = files + .iter() + .filter_map(|f| f.index_version().map(|v| (f.file_id(), v))) + .collect_vec(); + deleted_files.insert(*region_id, files.into_iter().map(|f| f.file_id()).collect()); + deleted_indexes.insert(*region_id, index_files); debug!( "GC for region {} took {} secs.", region_id, @@ -260,6 +300,7 @@ impl LocalGcWorker { ); let report = GcReport { deleted_files, + deleted_indexes, need_retry_regions: HashSet::new(), }; Ok(report) @@ -282,8 +323,8 @@ impl LocalGcWorker { pub async fn do_region_gc( &self, region: MitoRegionRef, - tmp_ref_files: &HashSet, - ) -> Result> { + tmp_ref_files: &HashSet, + ) -> Result> { let region_id = region.region_id(); debug!("Doing gc for region {}", region_id); @@ -311,64 +352,83 @@ impl LocalGcWorker { .map(|s| s.len()) .sum::(); - let in_used: HashSet = current_files - .keys() - .cloned() - .chain(tmp_ref_files.clone().into_iter()) - .collect(); + let in_manifest = current_files + .iter() + .map(|(file_id, meta)| (*file_id, meta.index_version())) + .collect::>(); - let unused_files = self - .list_to_be_deleted_files(region_id, &in_used, recently_removed_files, all_entries) + let in_tmp_ref = tmp_ref_files + .iter() + .map(|file_ref| (file_ref.file_id, file_ref.index_version)) + .collect::>(); + + let deletable_files = self + .list_to_be_deleted_files( + region_id, + &in_manifest, + &in_tmp_ref, + recently_removed_files, + all_entries, + ) .await?; - let unused_file_cnt = unused_files.len(); + let unused_file_cnt = deletable_files.len(); debug!( - "gc: for region {region_id}: In manifest files: {}, Tmp ref file cnt: {}, In-used files: {}, recently removed files: {}, Unused files to delete: {} ", + "gc: for region {region_id}: In manifest files: {}, Tmp ref file cnt: {}, recently removed files: {}, Unused files to delete: {} ", current_files.len(), tmp_ref_files.len(), - in_used.len(), removed_file_cnt, - unused_files.len() + deletable_files.len() ); - // TODO(discord9): for now, ignore async index file as it's design is not stable, need to be improved once - // index file design is stable - let file_pairs: Vec<(FileId, u64)> = - unused_files.iter().map(|file_id| (*file_id, 0)).collect(); - // TODO(discord9): gc worker need another major refactor to support versioned index files - debug!( "Found {} unused index files to delete for region {}", - file_pairs.len(), + deletable_files.len(), region_id ); - self.delete_files(region_id, &file_pairs).await?; + self.delete_files(region_id, &deletable_files).await?; debug!( "Successfully deleted {} unused files for region {}", unused_file_cnt, region_id ); - // TODO(discord9): update region manifest about deleted files - self.update_manifest_removed_files(®ion, unused_files.clone()) + self.update_manifest_removed_files(®ion, deletable_files.clone()) .await?; - Ok(unused_files) + Ok(deletable_files) } - async fn delete_files(&self, region_id: RegionId, file_ids: &[(FileId, u64)]) -> Result<()> { + async fn delete_files(&self, region_id: RegionId, removed_files: &[RemovedFile]) -> Result<()> { + let mut index_ids = vec![]; + let file_pairs = removed_files + .iter() + .filter_map(|f| match f { + RemovedFile::File(file_id, v) => Some((*file_id, v.unwrap_or(0))), + RemovedFile::Index(file_id, index_version) => { + let region_index_id = + RegionIndexId::new(RegionFileId::new(region_id, *file_id), *index_version); + index_ids.push(region_index_id); + None + } + }) + .collect_vec(); delete_files( region_id, - file_ids, + &file_pairs, true, &self.access_layer, &self.cache_manager, ) .await?; + for index_id in index_ids { + delete_index(index_id, &self.access_layer, &self.cache_manager).await?; + } + // FIXME(discord9): if files are already deleted before calling delete_files, the metric will be inaccurate, no clean way to fix it now - GC_DELETE_FILE_CNT.add(file_ids.len() as i64); + GC_DELETE_FILE_CNT.add(removed_files.len() as i64); Ok(()) } @@ -377,7 +437,7 @@ impl LocalGcWorker { async fn update_manifest_removed_files( &self, region: &MitoRegionRef, - deleted_files: Vec, + deleted_files: Vec, ) -> Result<()> { let deleted_file_cnt = deleted_files.len(); debug!( @@ -403,12 +463,12 @@ impl LocalGcWorker { pub async fn get_removed_files_expel_times( &self, region_manifest: &Arc, - ) -> Result>> { + ) -> Result>> { let mut ret = BTreeMap::new(); for files in ®ion_manifest.removed_files.removed_files { let expel_time = Timestamp::new_millisecond(files.removed_at); let set = ret.entry(expel_time).or_insert_with(HashSet::new); - set.extend(files.file_ids.iter().cloned()); + set.extend(files.files.iter().cloned()); } Ok(ret) @@ -535,75 +595,136 @@ impl LocalGcWorker { Ok(all_entries) } - /// Filter files to determine which ones can be deleted based on usage status and lingering time. - /// Returns a vector of file IDs that are safe to delete. fn filter_deletable_files( &self, entries: Vec, - in_use_filenames: &HashSet, - may_linger_filenames: &HashSet<&FileId>, - eligible_for_removal: &HashSet<&FileId>, + in_manifest: &HashMap>, + in_tmp_ref: &HashSet<(FileId, Option)>, + may_linger_files: &HashSet<&RemovedFile>, + eligible_for_delete: &HashSet<&RemovedFile>, unknown_file_may_linger_until: chrono::DateTime, - ) -> (Vec, HashSet) { - let mut all_unused_files_ready_for_delete = vec![]; - let mut all_in_exist_linger_files = HashSet::new(); + ) -> Vec { + let mut ready_for_delete = vec![]; + // all group by file id for easier checking + let in_tmp_ref: HashMap> = + in_tmp_ref + .iter() + .fold(HashMap::new(), |mut acc, (file, version)| { + let indices = acc.entry(*file).or_default(); + if let Some(version) = version { + indices.insert(*version); + } + acc + }); + + let may_linger_files: HashMap> = may_linger_files + .iter() + .fold(HashMap::new(), |mut acc, file| { + let indices = acc.entry(file.file_id()).or_default(); + indices.insert(file); + acc + }); + + let eligible_for_delete: HashMap> = eligible_for_delete + .iter() + .fold(HashMap::new(), |mut acc, file| { + let indices = acc.entry(file.file_id()).or_default(); + indices.insert(file); + acc + }); for entry in entries { - let file_id = match location::parse_file_id_from_path(entry.name()) { - Ok(file_id) => file_id, + let (file_id, file_type) = match location::parse_file_id_type_from_path(entry.name()) { + Ok((file_id, file_type)) => (file_id, file_type), Err(err) => { error!(err; "Failed to parse file id from path: {}", entry.name()); // if we can't parse the file id, it means it's not a sst or index file // shouldn't delete it because we don't know what it is + GC_SKIPPED_UNPARSABLE_FILES.inc(); continue; } }; - if may_linger_filenames.contains(&file_id) { - all_in_exist_linger_files.insert(file_id); - } + let should_delete = match file_type { + FileType::Parquet => { + let is_in_manifest = in_manifest.contains_key(&file_id); + let is_in_tmp_ref = in_tmp_ref.contains_key(&file_id); + let is_linger = may_linger_files.contains_key(&file_id); + let is_eligible_for_delete = eligible_for_delete.contains_key(&file_id); - let should_delete = !in_use_filenames.contains(&file_id) - && !may_linger_filenames.contains(&file_id) - && { - if !eligible_for_removal.contains(&file_id) { - // if the file's expel time is unknown(because not appear in delta manifest), we keep it for a while - // using it's last modified time - // notice unknown files use a different lingering time - entry - .metadata() - .last_modified() - .map(|t| t < unknown_file_may_linger_until) - .unwrap_or(false) - } else { - // if the file did appear in manifest delta(and passes previous predicate), we can delete it immediately - true - } - }; + should_delete_file( + is_in_manifest, + is_in_tmp_ref, + is_linger, + is_eligible_for_delete, + &entry, + unknown_file_may_linger_until, + ) + } + FileType::Puffin(version) => { + // notice need to check both file id and version + let is_in_manifest = in_manifest + .get(&file_id) + .map(|opt_ver| *opt_ver == Some(version)) + .unwrap_or(false); + let is_in_tmp_ref = in_tmp_ref + .get(&file_id) + .map(|versions| versions.contains(&version)) + .unwrap_or(false); + let is_linger = may_linger_files + .get(&file_id) + .map(|files| files.contains(&&RemovedFile::Index(file_id, version))) + .unwrap_or(false); + let is_eligible_for_delete = eligible_for_delete + .get(&file_id) + .map(|files| files.contains(&&RemovedFile::Index(file_id, version))) + .unwrap_or(false); + + should_delete_file( + is_in_manifest, + is_in_tmp_ref, + is_linger, + is_eligible_for_delete, + &entry, + unknown_file_may_linger_until, + ) + } + }; if should_delete { - all_unused_files_ready_for_delete.push(file_id); + let removed_file = match file_type { + FileType::Parquet => { + // notice this cause we don't track index version for parquet files + // since entries comes from listing, we can't get index version from path + RemovedFile::File(file_id, None) + } + FileType::Puffin(version) => { + GC_ORPHANED_INDEX_FILES.inc(); + RemovedFile::Index(file_id, version) + } + }; + ready_for_delete.push(removed_file); } } - - (all_unused_files_ready_for_delete, all_in_exist_linger_files) + ready_for_delete } - /// Concurrently list unused files in the region dir - /// because there may be a lot of files in the region dir - /// and listing them may take a long time. + /// List files to be deleted based on their presence in the manifest, temporary references, and recently removed files. + /// Returns a vector of `RemovedFile` that are eligible for deletion. + /// + /// When `full_file_listing` is false, this method will only delete (subset of) files tracked in + /// `recently_removed_files`, which significantly + /// improves performance. When `full_file_listing` is true, it read from `all_entries` to find + /// and delete orphan files (files not tracked in the manifest). /// - /// When `full_file_listing` is false, this method will only delete files tracked in - /// `recently_removed_files` without performing expensive list operations, which significantly - /// improves performance. When `full_file_listing` is true, it performs a full listing to - /// find and delete orphan files. pub async fn list_to_be_deleted_files( &self, region_id: RegionId, - in_used: &HashSet, - recently_removed_files: BTreeMap>, + in_manifest: &HashMap>, + in_tmp_ref: &HashSet<(FileId, Option)>, + recently_removed_files: BTreeMap>, all_entries: Vec, - ) -> Result> { + ) -> Result> { let now = chrono::Utc::now(); let may_linger_until = self .opt @@ -634,8 +755,10 @@ impl LocalGcWorker { }; debug!("may_linger_files: {:?}", may_linger_files); - let may_linger_filenames = may_linger_files.values().flatten().collect::>(); + let all_may_linger_files = may_linger_files.values().flatten().collect::>(); + // known files(tracked in removed files field) that are eligible for removal + // (passed lingering time) let eligible_for_removal = recently_removed_files .values() .flatten() @@ -646,12 +769,24 @@ impl LocalGcWorker { if !self.full_file_listing { // Only delete files that: // 1. Are in recently_removed_files (tracked in manifest) - // 2. Are not in use + // 2. Are not in use(in manifest or tmp ref) // 3. Have passed the lingering time - let files_to_delete: Vec = eligible_for_removal + let files_to_delete: Vec = eligible_for_removal .iter() - .filter(|file_id| !in_used.contains(*file_id)) - .map(|&f| *f) + .filter(|file_id| { + let in_use = match file_id { + RemovedFile::File(file_id, index_version) => { + in_manifest.get(file_id) == Some(index_version) + || in_tmp_ref.contains(&(*file_id, *index_version)) + } + RemovedFile::Index(file_id, index_version) => { + in_manifest.get(file_id) == Some(&Some(*index_version)) + || in_tmp_ref.contains(&(*file_id, Some(*index_version))) + } + }; + !in_use + }) + .map(|&f| f.clone()) .collect(); info!( @@ -666,16 +801,14 @@ impl LocalGcWorker { // Full file listing mode: get the full list of files from object store // Step 3: Filter files to determine which ones can be deleted - let (all_unused_files_ready_for_delete, all_in_exist_linger_files) = self - .filter_deletable_files( - all_entries, - in_used, - &may_linger_filenames, - &eligible_for_removal, - unknown_file_may_linger_until, - ); - - debug!("All in exist linger files: {:?}", all_in_exist_linger_files); + let all_unused_files_ready_for_delete = self.filter_deletable_files( + all_entries, + in_manifest, + in_tmp_ref, + &all_may_linger_files, + &eligible_for_removal, + unknown_file_may_linger_until, + ); Ok(all_unused_files_ready_for_delete) } diff --git a/src/mito2/src/gc/worker_test.rs b/src/mito2/src/gc/worker_test.rs index 6e3f5288c0..2d5a835aab 100644 --- a/src/mito2/src/gc/worker_test.rs +++ b/src/mito2/src/gc/worker_test.rs @@ -19,12 +19,13 @@ use api::v1::Rows; use common_telemetry::init_default_ut_logging; use store_api::region_engine::RegionEngine as _; use store_api::region_request::{RegionCompactRequest, RegionRequest}; -use store_api::storage::{FileRefsManifest, RegionId}; +use store_api::storage::{FileRef, FileRefsManifest, RegionId}; use crate::config::MitoConfig; use crate::engine::MitoEngine; use crate::engine::compaction_test::{delete_and_flush, put_and_flush}; use crate::gc::{GcConfig, LocalGcWorker}; +use crate::manifest::action::RemovedFile; use crate::region::MitoRegionRef; use crate::test_util::{ CreateRequestBuilder, TestEnv, build_rows, flush_region, put_rows, rows_schema, @@ -120,9 +121,9 @@ async fn test_gc_worker_basic_truncate() { let manifest = region.manifest_ctx.manifest().await; assert!( manifest.removed_files.removed_files[0] - .file_ids - .contains(&to_be_deleted_file_id) - && manifest.removed_files.removed_files[0].file_ids.len() == 1 + .files + .contains(&RemovedFile::File(to_be_deleted_file_id, None)) + && manifest.removed_files.removed_files[0].files.len() == 1 && manifest.files.is_empty(), "Manifest after truncate: {:?}", manifest @@ -214,9 +215,9 @@ async fn test_gc_worker_truncate_with_ref() { let manifest = region.manifest_ctx.manifest().await; assert!( manifest.removed_files.removed_files[0] - .file_ids - .contains(&to_be_deleted_file_id) - && manifest.removed_files.removed_files[0].file_ids.len() == 1 + .files + .contains(&RemovedFile::File(to_be_deleted_file_id, None)) + && manifest.removed_files.removed_files[0].files.len() == 1 && manifest.files.is_empty(), "Manifest after truncate: {:?}", manifest @@ -225,7 +226,11 @@ async fn test_gc_worker_truncate_with_ref() { let regions = BTreeMap::from([(region_id, region.clone())]); let file_ref_manifest = FileRefsManifest { - file_refs: [(region_id, HashSet::from([to_be_deleted_file_id]))].into(), + file_refs: [( + region_id, + HashSet::from([FileRef::new(region_id, to_be_deleted_file_id, None)]), + )] + .into(), manifest_version: [(region_id, version)].into(), }; let gc_worker = create_gc_worker(&engine, regions, &file_ref_manifest, true).await; @@ -235,7 +240,7 @@ async fn test_gc_worker_truncate_with_ref() { let manifest = region.manifest_ctx.manifest().await; assert!( - manifest.removed_files.removed_files[0].file_ids.len() == 1 && manifest.files.is_empty(), + manifest.removed_files.removed_files[0].files.len() == 1 && manifest.files.is_empty(), "Manifest: {:?}", manifest ); @@ -300,7 +305,7 @@ async fn test_gc_worker_basic_compact() { let region = engine.get_region(region_id).unwrap(); let manifest = region.manifest_ctx.manifest().await; - assert_eq!(manifest.removed_files.removed_files[0].file_ids.len(), 3); + assert_eq!(manifest.removed_files.removed_files[0].files.len(), 3); let version = manifest.manifest_version; @@ -376,7 +381,7 @@ async fn test_gc_worker_compact_with_ref() { let region = engine.get_region(region_id).unwrap(); let manifest = region.manifest_ctx.manifest().await; - assert_eq!(manifest.removed_files.removed_files[0].file_ids.len(), 3); + assert_eq!(manifest.removed_files.removed_files[0].files.len(), 3); let version = manifest.manifest_version; @@ -385,9 +390,12 @@ async fn test_gc_worker_compact_with_ref() { file_refs: HashMap::from([( region_id, manifest.removed_files.removed_files[0] - .file_ids + .files .iter() - .cloned() + .map(|removed_file| match removed_file { + RemovedFile::File(file_id, v) => FileRef::new(region_id, *file_id, *v), + RemovedFile::Index(file_id, v) => FileRef::new(region_id, *file_id, Some(*v)), + }) .collect(), )]), manifest_version: [(region_id, version)].into(), diff --git a/src/mito2/src/manifest/action.rs b/src/mito2/src/manifest/action.rs index dedb228e25..3478c7df48 100644 --- a/src/mito2/src/manifest/action.rs +++ b/src/mito2/src/manifest/action.rs @@ -22,7 +22,7 @@ use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use store_api::ManifestVersion; use store_api::metadata::RegionMetadataRef; -use store_api::storage::{FileId, RegionId, SequenceNumber}; +use store_api::storage::{FileId, IndexVersion, RegionId, SequenceNumber}; use strum::Display; use crate::error::{RegionMetadataNotFoundSnafu, Result, SerdeJsonSnafu, Utf8Snafu}; @@ -193,17 +193,27 @@ impl RegionManifestBuilder { pub fn apply_edit(&mut self, manifest_version: ManifestVersion, edit: RegionEdit) { self.manifest_version = manifest_version; + + let mut removed_files = vec![]; for file in edit.files_to_add { - self.files.insert(file.file_id, file); + if let Some(old_file) = self.files.insert(file.file_id, file.clone()) + && let Some(old_index) = old_file.index_version() + && !old_file.is_index_up_to_date(&file) + { + // The old file has an index that is now outdated. + removed_files.push(RemovedFile::Index(old_file.file_id, old_index)); + } } - self.removed_files.add_removed_files( + removed_files.extend( edit.files_to_remove .iter() - .map(|meta| meta.file_id) - .collect(), - edit.timestamp_ms - .unwrap_or_else(|| Utc::now().timestamp_millis()), + .map(|f| RemovedFile::File(f.file_id, f.index_version())), ); + let at = edit + .timestamp_ms + .unwrap_or_else(|| Utc::now().timestamp_millis()); + self.removed_files.add_removed_files(removed_files, at); + for file in edit.files_to_remove { self.files.remove(&file.file_id); } @@ -236,7 +246,10 @@ impl RegionManifestBuilder { self.flushed_sequence = truncated_sequence; self.truncated_entry_id = Some(truncated_entry_id); self.removed_files.add_removed_files( - self.files.values().map(|meta| meta.file_id).collect(), + self.files + .values() + .map(|f| RemovedFile::File(f.file_id, f.index_version())) + .collect(), truncate .timestamp_ms .unwrap_or_else(|| Utc::now().timestamp_millis()), @@ -245,7 +258,10 @@ impl RegionManifestBuilder { } TruncateKind::Partial { files_to_remove } => { self.removed_files.add_removed_files( - files_to_remove.iter().map(|meta| meta.file_id).collect(), + files_to_remove + .iter() + .map(|f| RemovedFile::File(f.file_id, f.index_version())) + .collect(), truncate .timestamp_ms .unwrap_or_else(|| Utc::now().timestamp_millis()), @@ -295,20 +311,22 @@ pub struct RemovedFilesRecord { impl RemovedFilesRecord { /// Clear the actually deleted files from the list of removed files - pub fn clear_deleted_files(&mut self, deleted_files: Vec) { + pub fn clear_deleted_files(&mut self, deleted_files: Vec) { let deleted_file_set: HashSet<_> = HashSet::from_iter(deleted_files); for files in self.removed_files.iter_mut() { - files.file_ids.retain(|fid| !deleted_file_set.contains(fid)); + files + .files + .retain(|removed| !deleted_file_set.contains(removed)); } - self.removed_files.retain(|fs| !fs.file_ids.is_empty()); + self.removed_files.retain(|fs| !fs.files.is_empty()); } pub fn update_file_removed_cnt_to_stats(&self, stats: &ManifestStats) { let cnt = self .removed_files .iter() - .map(|r| r.file_ids.len() as u64) + .map(|r| r.files.len() as u64) .sum(); stats .file_removed_cnt @@ -322,18 +340,42 @@ pub struct RemovedFiles { /// the files are removed from manifest. The timestamp is in milliseconds since unix epoch. pub removed_at: i64, /// The set of file ids that are removed. - pub file_ids: HashSet, + pub files: HashSet, +} + +/// A removed file, which can be a data file(optional paired with a index file) or an outdated index file. +#[derive(Serialize, Deserialize, Hash, Clone, Debug, PartialEq, Eq)] +pub enum RemovedFile { + File(FileId, Option), + Index(FileId, IndexVersion), +} + +impl RemovedFile { + pub fn file_id(&self) -> FileId { + match self { + RemovedFile::File(file_id, _) => *file_id, + RemovedFile::Index(file_id, _) => *file_id, + } + } + + pub fn index_version(&self) -> Option { + match self { + RemovedFile::File(_, index_version) => *index_version, + RemovedFile::Index(_, index_version) => Some(*index_version), + } + } } impl RemovedFilesRecord { /// Add a record of removed files with the current timestamp. - pub fn add_removed_files(&mut self, file_ids: HashSet, at: i64) { - if file_ids.is_empty() { + pub fn add_removed_files(&mut self, removed: Vec, at: i64) { + if removed.is_empty() { return; } + let files = removed.into_iter().collect(); self.removed_files.push(RemovedFiles { removed_at: at, - file_ids, + files, }); } @@ -738,10 +780,10 @@ mod tests { removed_files: RemovedFilesRecord { removed_files: vec![RemovedFiles { removed_at: 0, - file_ids: HashSet::from([FileId::parse_str( - "4b220a70-2b03-4641-9687-b65d94641208", - ) - .unwrap()]), + files: HashSet::from([RemovedFile::File( + FileId::parse_str("4b220a70-2b03-4641-9687-b65d94641208").unwrap(), + None, + )]), }], }, sst_format: FormatType::PrimaryKey, diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index 043a1293d9..037eee8c71 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -21,7 +21,6 @@ use futures::TryStreamExt; use object_store::ObjectStore; use snafu::{OptionExt, ResultExt, ensure}; use store_api::metadata::RegionMetadataRef; -use store_api::storage::FileId; use store_api::{MAX_VERSION, MIN_VERSION, ManifestVersion}; use crate::cache::manifest_cache::ManifestCache; @@ -31,7 +30,7 @@ use crate::error::{ }; use crate::manifest::action::{ RegionChange, RegionCheckpoint, RegionEdit, RegionManifest, RegionManifestBuilder, - RegionMetaAction, RegionMetaActionList, + RegionMetaAction, RegionMetaActionList, RemovedFile, }; use crate::manifest::checkpointer::Checkpointer; use crate::manifest::storage::{ @@ -589,7 +588,7 @@ impl RegionManifestManager { } /// Clear deleted files from manifest's `removed_files` field without update version. Notice if datanode exit before checkpoint then new manifest by open region may still contain these deleted files, which is acceptable for gc process. - pub fn clear_deleted_files(&mut self, deleted_files: Vec) { + pub fn clear_deleted_files(&mut self, deleted_files: Vec) { let mut manifest = (*self.manifest()).clone(); manifest.removed_files.clear_deleted_files(deleted_files); self.set_manifest(Arc::new(manifest)); diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index be5f4945fd..82cd441b57 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -509,6 +509,20 @@ lazy_static! { "mito gc deleted file count", ).unwrap(); + /// Counter for the number of unparsable files skipped by GC. + pub static ref GC_SKIPPED_UNPARSABLE_FILES: IntCounter = + register_int_counter!( + "greptime_mito_gc_skipped_unparsable_files", + "mito gc skipped unparsable files count", + ).unwrap(); + + /// Counter for the number of orphaned index files found by GC. + pub static ref GC_ORPHANED_INDEX_FILES: IntCounter = + register_int_counter!( + "greptime_mito_gc_orphaned_index_files", + "mito gc orphaned index files count", + ).unwrap(); + /// Total number of files downloaded during cache fill on region open. pub static ref CACHE_FILL_DOWNLOADED_FILES: IntCounter = register_int_counter!( "mito_cache_fill_downloaded_files", diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index 94209d7b0c..ccd6e931e2 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -312,6 +312,14 @@ impl FileMeta { !self.available_indexes.is_empty() } + pub fn index_version(&self) -> Option { + if self.exists_index() { + Some(self.index_version) + } else { + None + } + } + /// Whether the index file is up-to-date comparing to another file meta. pub fn is_index_up_to_date(&self, other: &FileMeta) -> bool { self.exists_index() && other.exists_index() && self.index_version >= other.index_version diff --git a/src/mito2/src/sst/file_purger.rs b/src/mito2/src/sst/file_purger.rs index 5b23f3e069..7075acfce1 100644 --- a/src/mito2/src/sst/file_purger.rs +++ b/src/mito2/src/sst/file_purger.rs @@ -178,8 +178,8 @@ impl FilePurger for ObjectStoreFilePurger { // if not on local file system, instead inform the global file purger to remove the file reference. // notice that no matter whether the file is deleted or not, we need to remove the reference // because the file is no longer in use nonetheless. + // for same reason, we don't care about index_outdated here. self.file_ref_manager.remove_file(&file_meta); - // TODO(discord9): consider impl a .tombstone file to reduce files needed to list } fn new_file(&self, file_meta: &FileMeta) { diff --git a/src/mito2/src/sst/file_ref.rs b/src/mito2/src/sst/file_ref.rs index 9e69bd42cf..cdc3b8370e 100644 --- a/src/mito2/src/sst/file_ref.rs +++ b/src/mito2/src/sst/file_ref.rs @@ -91,7 +91,7 @@ impl FileReferenceManager { // get from in memory file handles for region_id in query_regions.iter().map(|r| r.region_id()) { if let Some(files) = self.ref_file_set(region_id) { - ref_files.insert(region_id, files.into_iter().map(|f| f.file_id).collect()); + ref_files.insert(region_id, files); } } @@ -108,10 +108,17 @@ impl FileReferenceManager { let manifest = related_region.manifest_ctx.manifest().await; for meta in manifest.files.values() { if queries.contains(&meta.region_id) { + // since gc couldn't happen together with repartition + // (both the queries and related_region acquire region read lock), no need to worry about + // staging manifest in repartition here. ref_files .entry(meta.region_id) .or_insert_with(HashSet::new) - .insert(meta.file_id); + .insert(FileRef::new( + meta.region_id, + meta.file_id, + meta.index_version(), + )); } } // not sure if related region's manifest version is needed, but record it for now. @@ -132,7 +139,11 @@ impl FileReferenceManager { let region_id = file_meta.region_id; let mut is_new = false; { - let file_ref = FileRef::new(file_meta.region_id, file_meta.file_id); + let file_ref = FileRef::new( + file_meta.region_id, + file_meta.file_id, + file_meta.index_version(), + ); self.files_per_region .entry(region_id) .and_modify(|refs| { @@ -157,7 +168,7 @@ impl FileReferenceManager { /// If the reference count reaches zero, the file reference will be removed from the manager. pub fn remove_file(&self, file_meta: &FileMeta) { let region_id = file_meta.region_id; - let file_ref = FileRef::new(region_id, file_meta.file_id); + let file_ref = FileRef::new(region_id, file_meta.file_id, file_meta.index_version()); let mut remove_table_entry = false; let mut remove_file_ref = false; @@ -247,13 +258,23 @@ mod tests { .get(&file_meta.region_id) .unwrap() .files, - HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)]) + HashMap::from_iter([( + FileRef::new( + file_meta.region_id, + file_meta.file_id, + file_meta.index_version() + ), + 1 + )]) ); file_ref_mgr.add_file(&file_meta); - let expected_region_ref_manifest = - HashSet::from_iter([FileRef::new(file_meta.region_id, file_meta.file_id)]); + let expected_region_ref_manifest = HashSet::from_iter([FileRef::new( + file_meta.region_id, + file_meta.file_id, + file_meta.index_version(), + )]); assert_eq!( file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(), @@ -266,7 +287,14 @@ mod tests { .get(&file_meta.region_id) .unwrap() .files, - HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 2)]) + HashMap::from_iter([( + FileRef::new( + file_meta.region_id, + file_meta.file_id, + file_meta.index_version() + ), + 2 + )]) ); assert_eq!( @@ -282,7 +310,14 @@ mod tests { .get(&file_meta.region_id) .unwrap() .files, - HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)]) + HashMap::from_iter([( + FileRef::new( + file_meta.region_id, + file_meta.file_id, + file_meta.index_version() + ), + 1 + )]) ); assert_eq!( diff --git a/src/mito2/src/sst/location.rs b/src/mito2/src/sst/location.rs index f3d9e1bdeb..5bff4a6d4c 100644 --- a/src/mito2/src/sst/location.rs +++ b/src/mito2/src/sst/location.rs @@ -19,6 +19,7 @@ use store_api::path_utils::region_name; use store_api::region_request::PathType; use store_api::storage::{FileId, RegionId}; +use crate::cache::file_cache::FileType; use crate::error::UnexpectedSnafu; use crate::sst::file::{RegionFileId, RegionIndexId}; @@ -110,31 +111,29 @@ pub fn parse_index_file_info(filepath: &str) -> crate::error::Result<(FileId, u6 } } -/// Get RegionFileId from sst or index filename -pub fn parse_file_id_from_path(filepath: &str) -> crate::error::Result { +pub fn parse_file_id_type_from_path(filepath: &str) -> crate::error::Result<(FileId, FileType)> { let filename = filepath.rsplit('/').next().context(UnexpectedSnafu { reason: format!("invalid file path: {}", filepath), })?; + // get part before first '.' let parts: Vec<&str> = filename.split('.').collect(); - if parts.len() != 2 { + if parts.len() < 2 { return UnexpectedSnafu { reason: format!("invalid file name: {}", filename), } .fail(); } - if parts[1] != "parquet" && parts[1] != "puffin" { - return UnexpectedSnafu { - reason: format!("invalid file extension: {}", parts[1]), - } - .fail(); - } let file_id = parts[0]; - FileId::parse_str(file_id).map_err(|e| { + let file_id = FileId::parse_str(file_id).map_err(|e| { UnexpectedSnafu { reason: format!("invalid file id: {}, err: {}", file_id, e), } .build() - }) + })?; + let file_type = FileType::parse(parts[1..].join(".").as_str()).context(UnexpectedSnafu { + reason: format!("invalid file type in file name: {}", filename), + })?; + Ok((file_id, file_type)) } #[cfg(test)] @@ -220,4 +219,62 @@ mod tests { assert_eq!(result.0.to_string(), file_id.to_string()); assert_eq!(result.1, 42); } + + #[test] + fn test_parse_file_id_type_from_path() { + use crate::cache::file_cache::FileType; + + // Test parquet file + let file_id = FileId::random(); + let path = format!("table_dir/1_0000000002/data/{}.parquet", file_id); + let result = parse_file_id_type_from_path(&path).unwrap(); + assert_eq!(result.0.to_string(), file_id.to_string()); + assert_eq!(result.1, FileType::Parquet); + + // Test puffin file (legacy format, version 0) + let file_id = FileId::random(); + let path = format!("table_dir/1_0000000002/index/{}.puffin", file_id); + let result = parse_file_id_type_from_path(&path).unwrap(); + assert_eq!(result.0.to_string(), file_id.to_string()); + assert_eq!(result.1, FileType::Puffin(0)); + + // Test versioned puffin file + let file_id = FileId::random(); + let path = format!("table_dir/1_0000000002/index/{}.1.puffin", file_id); + let result = parse_file_id_type_from_path(&path).unwrap(); + assert_eq!(result.0.to_string(), file_id.to_string()); + assert_eq!(result.1, FileType::Puffin(1)); + + // Test with different path types + let file_id = FileId::random(); + let path = format!("table_dir/1_0000000002/metadata/{}.parquet", file_id); + let result = parse_file_id_type_from_path(&path).unwrap(); + assert_eq!(result.0.to_string(), file_id.to_string()); + assert_eq!(result.1, FileType::Parquet); + + // Test with bare path type + let file_id = FileId::random(); + let path = format!("table_dir/1_0000000002/{}.parquet", file_id); + let result = parse_file_id_type_from_path(&path).unwrap(); + assert_eq!(result.0.to_string(), file_id.to_string()); + assert_eq!(result.1, FileType::Parquet); + + // Test error cases + // Invalid file extension + let result = parse_file_id_type_from_path("table_dir/1_0000000002/data/test.invalid"); + assert!(result.is_err()); + + // Invalid file ID + let result = + parse_file_id_type_from_path("table_dir/1_0000000002/data/invalid-file-id.parquet"); + assert!(result.is_err()); + + // No file extension + let result = parse_file_id_type_from_path("table_dir/1_0000000002/data/test"); + assert!(result.is_err()); + + // Empty filename + let result = parse_file_id_type_from_path("table_dir/1_0000000002/data/"); + assert!(result.is_err()); + } } diff --git a/src/store-api/src/storage/file.rs b/src/store-api/src/storage/file.rs index bb7490ccf5..f55f081224 100644 --- a/src/store-api/src/storage/file.rs +++ b/src/store-api/src/storage/file.rs @@ -77,11 +77,16 @@ impl FromStr for FileId { pub struct FileRef { pub region_id: RegionId, pub file_id: FileId, + pub index_version: Option, } impl FileRef { - pub fn new(region_id: RegionId, file_id: FileId) -> Self { - Self { region_id, file_id } + pub fn new(region_id: RegionId, file_id: FileId, index_version: Option) -> Self { + Self { + region_id, + file_id, + index_version, + } } } @@ -89,7 +94,7 @@ impl FileRef { /// Also record the manifest version when these tmp files are read. #[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] pub struct FileRefsManifest { - pub file_refs: HashMap>, + pub file_refs: HashMap>, /// Manifest version when this manifest is read for it's files pub manifest_version: HashMap, } @@ -97,7 +102,9 @@ pub struct FileRefsManifest { #[derive(Clone, Default, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct GcReport { /// deleted files per region + /// TODO(discord9): change to `RemovedFile`? pub deleted_files: HashMap>, + pub deleted_indexes: HashMap>, /// Regions that need retry in next gc round, usually because their tmp ref files are outdated pub need_retry_regions: HashSet, } @@ -105,10 +112,12 @@ pub struct GcReport { impl GcReport { pub fn new( deleted_files: HashMap>, + deleted_indexes: HashMap>, need_retry_regions: HashSet, ) -> Self { Self { deleted_files, + deleted_indexes, need_retry_regions, } } @@ -162,8 +171,12 @@ mod tests { let mut manifest = FileRefsManifest::default(); let r0 = RegionId::new(1024, 1); let r1 = RegionId::new(1024, 2); - manifest.file_refs.insert(r0, [FileId::random()].into()); - manifest.file_refs.insert(r1, [FileId::random()].into()); + manifest + .file_refs + .insert(r0, [FileRef::new(r0, FileId::random(), None)].into()); + manifest + .file_refs + .insert(r1, [FileRef::new(r1, FileId::random(), None)].into()); manifest.manifest_version.insert(r0, 10); manifest.manifest_version.insert(r1, 20); @@ -171,4 +184,67 @@ mod tests { let parsed: FileRefsManifest = serde_json::from_str(&json).unwrap(); assert_eq!(manifest, parsed); } + + #[test] + fn test_file_ref_new() { + let region_id = RegionId::new(1024, 1); + let file_id = FileId::random(); + + // Test with Some(index_version) + let index_version: IndexVersion = 42; + let file_ref = FileRef::new(region_id, file_id, Some(index_version)); + assert_eq!(file_ref.region_id, region_id); + assert_eq!(file_ref.file_id, file_id); + assert_eq!(file_ref.index_version, Some(index_version)); + + // Test with None + let file_ref_none = FileRef::new(region_id, file_id, None); + assert_eq!(file_ref_none.region_id, region_id); + assert_eq!(file_ref_none.file_id, file_id); + assert_eq!(file_ref_none.index_version, None); + } + + #[test] + fn test_file_ref_equality() { + let region_id = RegionId::new(1024, 1); + let file_id = FileId::random(); + + let file_ref1 = FileRef::new(region_id, file_id, Some(10)); + let file_ref2 = FileRef::new(region_id, file_id, Some(10)); + let file_ref3 = FileRef::new(region_id, file_id, Some(20)); + let file_ref4 = FileRef::new(region_id, file_id, None); + + assert_eq!(file_ref1, file_ref2); + assert_ne!(file_ref1, file_ref3); + assert_ne!(file_ref1, file_ref4); + assert_ne!(file_ref3, file_ref4); + + // Test equality with Some(0) vs None + let file_ref_zero = FileRef::new(region_id, file_id, Some(0)); + assert_ne!(file_ref_zero, file_ref4); + } + + #[test] + fn test_file_ref_serialization() { + let region_id = RegionId::new(1024, 1); + let file_id = FileId::random(); + + // Test with Some(index_version) + let index_version: IndexVersion = 12345; + let file_ref = FileRef::new(region_id, file_id, Some(index_version)); + + let json = serde_json::to_string(&file_ref).unwrap(); + let parsed: FileRef = serde_json::from_str(&json).unwrap(); + + assert_eq!(file_ref, parsed); + assert_eq!(parsed.index_version, Some(index_version)); + + // Test with None + let file_ref_none = FileRef::new(region_id, file_id, None); + let json_none = serde_json::to_string(&file_ref_none).unwrap(); + let parsed_none: FileRef = serde_json::from_str(&json_none).unwrap(); + + assert_eq!(file_ref_none, parsed_none); + assert_eq!(parsed_none.index_version, None); + } }