diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index cde76c1abb..d1b84acdbf 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -37,7 +37,7 @@ use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu use crate::metrics::{COMPACTION_STAGE_ELAPSED, FLUSH_ELAPSED}; use crate::read::{FlatSource, Source}; use crate::region::options::IndexOptions; -use crate::sst::file::{FileHandle, RegionFileId, RegionIndexId}; +use crate::sst::file::{FileHandle, RegionFileId, RegionIndexId, delete_index}; use crate::sst::index::IndexerBuilderImpl; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::{PuffinManagerFactory, SstPuffinManager}; diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index af34503d0a..75705258ab 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -538,38 +538,89 @@ pub async fn delete_files( ); for (file_id, index_version) in file_ids { - if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache()) { - // Removes index file from the cache. - if delete_index { - write_cache - .remove(IndexKey::new( - region_id, - *file_id, - FileType::Puffin(*index_version), - )) - .await; - } + purge_index_write_cache_stager( + region_id, + delete_index, + access_layer, + cache_manager, + file_id, + index_version, + ) + .await; + } + Ok(()) +} - // Remove the SST file from the cache. +/// Delete index file for a given SST file&index version. +pub async fn delete_index( + region_id: RegionId, + file_id: FileId, + index_version: u64, + access_layer: &AccessLayerRef, + cache_manager: &Option, +) -> crate::error::Result<()> { + if let Err(err) = access_layer + .delete_index(&RegionIndexId::new( + RegionFileId::new(region_id, file_id), + index_version, + )) + .await + { + error!(err; "Failed to delete index file for {}/{}.{}", + region_id, file_id, index_version); + } + + purge_index_write_cache_stager( + region_id, + true, + access_layer, + cache_manager, + &file_id, + &index_version, + ) + .await; + + Ok(()) +} + +pub async fn purge_index_write_cache_stager( + region_id: RegionId, + delete_index: bool, + access_layer: &Arc, + cache_manager: &Option>, + file_id: &FileId, + index_version: &u64, +) { + if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache()) { + // Removes index file from the cache. + if delete_index { write_cache - .remove(IndexKey::new(region_id, *file_id, FileType::Parquet)) + .remove(IndexKey::new( + region_id, + *file_id, + FileType::Puffin(*index_version), + )) .await; } - // Purges index content in the stager. - if let Err(e) = access_layer - .puffin_manager_factory() - .purge_stager(RegionIndexId::new( - RegionFileId::new(region_id, *file_id), - *index_version, - )) - .await - { - error!(e; "Failed to purge stager with index file, file_id: {}, index_version: {}, region: {}", - file_id, index_version, region_id); - } + // Remove the SST file from the cache. + write_cache + .remove(IndexKey::new(region_id, *file_id, FileType::Parquet)) + .await; + } + + // Purges index content in the stager. + if let Err(e) = access_layer + .puffin_manager_factory() + .purge_stager(RegionIndexId::new( + RegionFileId::new(region_id, *file_id), + *index_version, + )) + .await + { + error!(e; "Failed to purge stager with index file, file_id: {}, index_version: {}, region: {}", + file_id, index_version, region_id); } - Ok(()) } #[cfg(test)] diff --git a/src/mito2/src/sst/file_purger.rs b/src/mito2/src/sst/file_purger.rs index 710ba43677..62d5be1550 100644 --- a/src/mito2/src/sst/file_purger.rs +++ b/src/mito2/src/sst/file_purger.rs @@ -16,15 +16,23 @@ use std::fmt; use std::sync::Arc; use common_telemetry::error; +use dashmap::DashMap; use store_api::storage::IndexVersion; use crate::access_layer::AccessLayerRef; use crate::cache::CacheManagerRef; use crate::error::Result; use crate::schedule::scheduler::SchedulerRef; -use crate::sst::file::{FileMeta, RegionIndexId, delete_files}; +use crate::sst::file::{FileMeta, RegionIndexId, delete_files, delete_index}; use crate::sst::file_ref::FileReferenceManagerRef; +/// Index file reference tracking information +#[derive(Debug, Default)] +struct IndexRefInfo { + ref_count: usize, + is_deleted: bool, +} + /// A worker to delete files in background. pub trait FilePurger: Send + Sync + fmt::Debug { /// Send a request to remove the file. @@ -65,6 +73,8 @@ pub struct LocalFilePurger { scheduler: SchedulerRef, sst_layer: AccessLayerRef, cache_manager: Option, + /// Maps RegionIndexId to (ref_count, is_deleted) for index files + index_refs: Arc>, } impl fmt::Debug for LocalFilePurger { @@ -123,6 +133,7 @@ impl LocalFilePurger { scheduler, sst_layer, cache_manager, + index_refs: Arc::new(DashMap::new()), } } @@ -131,6 +142,66 @@ impl LocalFilePurger { self.scheduler.stop(true).await } + /// Add a reference to an index file + pub fn add_index_reference(&self, index_id: RegionIndexId) { + self.index_refs + .entry(index_id) + .and_modify(|info| { + info.ref_count += 1; + }) + .or_insert(IndexRefInfo { + ref_count: 1, + is_deleted: false, + }); + } + + /// Remove a reference to an index file, returns should_delete + pub fn remove_index_reference(&self, index_id: &RegionIndexId) -> bool { + let mut should_delete = false; + + if let Some(mut info) = self.index_refs.get_mut(index_id) { + info.ref_count -= 1; + should_delete = info.is_deleted && info.ref_count == 0; + + if info.ref_count == 0 { + // Remove from map if no more references + drop(info); // Release the lock before removing + self.index_refs.remove(index_id); + } + } + + should_delete + } + + /// Mark an index file as deleted (but don't actually delete yet) + pub fn mark_index_deleted(&self, index_id: &RegionIndexId) -> bool { + let should_delete; + + if let Some(mut info) = self.index_refs.get_mut(index_id) { + info.is_deleted = true; + should_delete = info.ref_count == 0; + + if should_delete { + // Remove from map if no references + drop(info); // Release the lock before removing + self.index_refs.remove(index_id); + } + } else { + // No references exist, can delete immediately + should_delete = true; + } + + should_delete + } + + /// Check and delete index if it's marked deleted and has no references + fn check_and_delete_index(&self, file_meta: FileMeta, index_version: IndexVersion) { + let index_id = RegionIndexId::new(file_meta.file_id(), index_version); + if self.mark_index_deleted(&index_id) { + self.delete_index(file_meta, index_version); + } + } + /// Deletes the file(and it's index, if any) from cache and storage. fn delete_file(&self, file_meta: FileMeta) { let sst_layer = self.sst_layer.clone(); @@ -158,9 +229,18 @@ impl LocalFilePurger { return; } let sst_layer = self.sst_layer.clone(); + let cache_manager = self.cache_manager.clone(); if let Err(e) = self.scheduler.schedule(Box::pin(async move { let index_id = RegionIndexId::new(file_meta.file_id(), old_version); - if let Err(e) = sst_layer.delete_index(&index_id).await { + if let Err(e) = delete_index( + file_meta.region_id, + file_meta.file_id, + old_version, + &sst_layer, + &cache_manager, + ) + .await + { error!(e; "Failed to delete index {:?} from storage", index_id); } })) { @@ -171,13 +251,34 @@ impl LocalFilePurger { impl FilePurger for LocalFilePurger { fn remove_file(&self, file_meta: FileMeta, is_delete: bool) { + // Release reference to current index version if it exists + if file_meta.exists_index() { + let index_id = RegionIndexId::new(file_meta.file_id(), file_meta.index_version); + let should_delete = self.remove_index_reference(&index_id); + + // If this index was marked for deletion and ref count reached 0, delete it + if should_delete { + self.delete_index(file_meta.clone(), file_meta.index_version); + } + } + if is_delete { self.delete_file(file_meta); } } fn update_index(&self, file_meta: FileMeta, old_version: IndexVersion) { - self.delete_index(file_meta, old_version); + // Mark old index as deleted, but only actually delete if no references + self.check_and_delete_index(file_meta, old_version); + } + + fn new_file(&self, file_meta: &FileMeta) { + if file_meta.exists_index() { + self.add_index_reference(RegionIndexId::new( + file_meta.file_id(), + file_meta.index_version, + )); + } } } @@ -363,4 +464,809 @@ mod tests { assert!(!object_store.exists(&path).await.unwrap()); assert!(!object_store.exists(&index_path).await.unwrap()); } + + // Tests for Index Reference Counting Logic + + #[tokio::test] + async fn test_add_index_reference() { + let scheduler = Arc::new(LocalScheduler::new(3)); + let dir = create_temp_dir("test-add-index-ref"); + let dir_path = dir.path().display().to_string(); + let builder = Fs::default().root(&dir_path); + let object_store = ObjectStore::new(builder).unwrap().finish(); + + let index_aux_path = dir.path().join("index_aux"); + let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None) + .await + .unwrap(); + let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap()) + .await + .unwrap(); + + let layer = Arc::new(AccessLayer::new( + "table1", + PathType::Bare, + object_store, + puffin_mgr, + intm_mgr, + )); + + let file_purger = LocalFilePurger::new(scheduler.clone(), layer, None); + + // Test adding a new index reference + let file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random()); + let index_id = RegionIndexId::new(file_id, 1); + + file_purger.add_index_reference(index_id); + + { + // Check that the reference was added + let info = file_purger.index_refs.get(&index_id).unwrap(); + assert_eq!(info.ref_count, 1); + assert!(!info.is_deleted); + } + + // Test adding another reference to the same index + file_purger.add_index_reference(index_id); + + { + let info = file_purger.index_refs.get(&index_id).unwrap(); + assert_eq!(info.ref_count, 2); + assert!(!info.is_deleted); + } + + scheduler.stop(true).await.unwrap(); + } + + #[tokio::test] + async fn test_remove_index_reference() { + let scheduler = Arc::new(LocalScheduler::new(3)); + let dir = create_temp_dir("test-remove-index-ref"); + let dir_path = dir.path().display().to_string(); + let builder = Fs::default().root(&dir_path); + let object_store = ObjectStore::new(builder).unwrap().finish(); + + let index_aux_path = dir.path().join("index_aux"); + let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None) + .await + .unwrap(); + let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap()) + .await + .unwrap(); + + let layer = Arc::new(AccessLayer::new( + "table1", + PathType::Bare, + object_store, + puffin_mgr, + intm_mgr, + )); + + let file_purger = LocalFilePurger::new(scheduler.clone(), layer, None); + + // Add an index reference first + let file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random()); + let index_id = RegionIndexId::new(file_id, 1); + file_purger.add_index_reference(index_id); + file_purger.add_index_reference(index_id); // Add twice for ref_count = 2 + + // Remove one reference + let should_delete = file_purger.remove_index_reference(&index_id); + assert!(!should_delete); // Should not delete as ref_count is still 1 + + { + let info = file_purger.index_refs.get(&index_id).unwrap(); + assert_eq!(info.ref_count, 1); + } // info is dropped here + + // Remove the second reference + let should_delete = file_purger.remove_index_reference(&index_id); + assert!(!should_delete); // Should not delete as is_deleted is false + + // Index should be removed from map as ref_count is 0 + assert!(file_purger.index_refs.get(&index_id).is_none()); + + scheduler.stop(true).await.unwrap(); + } + + #[tokio::test] + async fn test_remove_index_reference_with_deleted_flag() { + let scheduler = Arc::new(LocalScheduler::new(3)); + let dir = create_temp_dir("test-remove-index-ref-deleted"); + let dir_path = dir.path().display().to_string(); + let builder = Fs::default().root(&dir_path); + let object_store = ObjectStore::new(builder).unwrap().finish(); + + let index_aux_path = dir.path().join("index_aux"); + let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None) + .await + .unwrap(); + let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap()) + .await + .unwrap(); + + let layer = Arc::new(AccessLayer::new( + "table1", + PathType::Bare, + object_store, + puffin_mgr, + intm_mgr, + )); + + let file_purger = LocalFilePurger::new(scheduler.clone(), layer, None); + + // Add an index reference + let file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random()); + let index_id = RegionIndexId::new(file_id, 1); + file_purger.add_index_reference(index_id); + + // Mark as deleted + let should_delete = file_purger.mark_index_deleted(&index_id); + assert!(!should_delete); // Should not delete as ref_count is 1 + + // Remove the reference + let should_delete = file_purger.remove_index_reference(&index_id); + assert!(should_delete); // Should delete as is_deleted is true and ref_count is 0 + + // Index should be removed from map + assert!(file_purger.index_refs.get(&index_id).is_none()); + + scheduler.stop(true).await.unwrap(); + } + + // Tests for Deletion Logic + + #[tokio::test] + async fn test_mark_index_deleted() { + let scheduler = Arc::new(LocalScheduler::new(3)); + let dir = create_temp_dir("test-mark-index-deleted"); + let dir_path = dir.path().display().to_string(); + let builder = Fs::default().root(&dir_path); + let object_store = ObjectStore::new(builder).unwrap().finish(); + + let index_aux_path = dir.path().join("index_aux"); + let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None) + .await + .unwrap(); + let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap()) + .await + .unwrap(); + + let layer = Arc::new(AccessLayer::new( + "table1", + PathType::Bare, + object_store, + puffin_mgr, + intm_mgr, + )); + + let file_purger = LocalFilePurger::new(scheduler.clone(), layer, None); + + // Test marking a non-existent index as deleted + let file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random()); + let index_id = RegionIndexId::new(file_id, 1); + let should_delete = file_purger.mark_index_deleted(&index_id); + assert!(should_delete); // Should delete immediately as no references exist + + // Test marking an existing index as deleted + file_purger.add_index_reference(index_id); + let should_delete = file_purger.mark_index_deleted(&index_id); + assert!(!should_delete); // Should not delete as ref_count is 1 + + { + let info = file_purger.index_refs.get(&index_id).unwrap(); + assert!(info.is_deleted); + assert_eq!(info.ref_count, 1); + } // info is dropped here + + scheduler.stop(true).await.unwrap(); + } + + #[tokio::test] + async fn test_check_and_delete_index() { + common_telemetry::init_default_ut_logging(); + + let scheduler = Arc::new(LocalScheduler::new(3)); + let dir = create_temp_dir("test-check-delete-index"); + let dir_path = dir.path().display().to_string(); + let builder = Fs::default().root(&dir_path); + let object_store = ObjectStore::new(builder).unwrap().finish(); + + let index_aux_path = dir.path().join("index_aux"); + let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None) + .await + .unwrap(); + let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap()) + .await + .unwrap(); + + let layer = Arc::new(AccessLayer::new( + "table1", + PathType::Bare, + object_store.clone(), + puffin_mgr, + intm_mgr, + )); + + let file_purger = LocalFilePurger::new(scheduler.clone(), layer, None); + + // Create an index file + let file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random()); + let index_id = RegionIndexId::new(file_id, 1); + let index_path = location::index_file_path("table1", index_id, PathType::Bare); + object_store + .write(&index_path, vec![0; 4096]) + .await + .unwrap(); + + // Create a file meta with the index + let file_meta = FileMeta { + region_id: file_id.region_id(), + file_id: file_id.file_id(), + time_range: FileTimeRange::default(), + level: 0, + file_size: 4096, + available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]), + indexes: vec![ColumnIndexMetadata { + column_id: 0, + created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]), + }], + index_file_size: 4096, + index_version: 1, + num_rows: 1024, + num_row_groups: 1, + sequence: NonZeroU64::new(4096), + partition_expr: None, + num_series: 0, + }; + + // Add reference and check that index is not deleted + file_purger.add_index_reference(index_id); + file_purger.check_and_delete_index(file_meta.clone(), 1); + + // Index should still exist + assert!(object_store.exists(&index_path).await.unwrap()); + + // Remove reference and mark as deleted + file_purger.remove_index_reference(&index_id); + file_purger.check_and_delete_index(file_meta, 1); + + // Wait for async deletion + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + // Index should be deleted + assert!(!object_store.exists(&index_path).await.unwrap()); + + scheduler.stop(true).await.unwrap(); + } + + // Tests for FileHandle Lifecycle Integration + + #[tokio::test] + async fn test_file_handle_lifecycle_with_index() { + common_telemetry::init_default_ut_logging(); + + let scheduler = Arc::new(LocalScheduler::new(3)); + let dir = create_temp_dir("test-file-handle-lifecycle"); + let dir_path = dir.path().display().to_string(); + let builder = Fs::default().root(&dir_path); + let object_store = ObjectStore::new(builder).unwrap().finish(); + + let index_aux_path = dir.path().join("index_aux"); + let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None) + .await + .unwrap(); + let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap()) + .await + .unwrap(); + + let layer = Arc::new(AccessLayer::new( + "table1", + PathType::Bare, + object_store.clone(), + puffin_mgr, + intm_mgr, + )); + + let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer.clone(), None)); + + // Create a file with index + let file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random()); + let index_id = RegionIndexId::new(file_id, 1); + let file_path = location::sst_file_path("table1", file_id, PathType::Bare); + let index_path = location::index_file_path("table1", index_id, PathType::Bare); + + object_store.write(&file_path, vec![0; 4096]).await.unwrap(); + object_store + .write(&index_path, vec![0; 4096]) + .await + .unwrap(); + + let file_meta = FileMeta { + region_id: file_id.region_id(), + file_id: file_id.file_id(), + time_range: FileTimeRange::default(), + level: 0, + file_size: 4096, + available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]), + indexes: vec![ColumnIndexMetadata { + column_id: 0, + created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]), + }], + index_file_size: 4096, + index_version: 1, + num_rows: 1024, + num_row_groups: 1, + sequence: NonZeroU64::new(4096), + partition_expr: None, + num_series: 0, + }; + + // Create FileHandle - this should add a reference + let handle = FileHandle::new(file_meta.clone(), file_purger.clone()); + + // Check that reference was added + { + let info = file_purger.index_refs.get(&index_id).unwrap(); + assert_eq!(info.ref_count, 1); + assert!(!info.is_deleted); + } // info is dropped here + + // Clone the handle to increase reference count + let handle2 = handle.clone(); + + // Check that reference count is still 1 (only one FileHandleInner) + { + let info = file_purger.index_refs.get(&index_id).unwrap(); + assert_eq!(info.ref_count, 1); + } // info is dropped here + + // Mark file as deleted + handle.mark_deleted(); + + // Drop one handle - should not delete yet + drop(handle); + + // File and index should still exist + assert!(object_store.exists(&file_path).await.unwrap()); + assert!(object_store.exists(&index_path).await.unwrap()); + + // Drop the second handle - should delete now + drop(handle2); + + // Wait for async deletion + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + // File and index should be deleted + assert!(!object_store.exists(&file_path).await.unwrap()); + assert!(!object_store.exists(&index_path).await.unwrap()); + + scheduler.stop(true).await.unwrap(); + } + + #[tokio::test] + async fn test_update_index_version() { + common_telemetry::init_default_ut_logging(); + + let scheduler = Arc::new(LocalScheduler::new(3)); + let dir = create_temp_dir("test-update-index-version"); + let dir_path = dir.path().display().to_string(); + let builder = Fs::default().root(&dir_path); + let object_store = ObjectStore::new(builder).unwrap().finish(); + + let index_aux_path = dir.path().join("index_aux"); + let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None) + .await + .unwrap(); + let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap()) + .await + .unwrap(); + + let layer = Arc::new(AccessLayer::new( + "table1", + PathType::Bare, + object_store.clone(), + puffin_mgr, + intm_mgr, + )); + + let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer.clone(), None)); + + // Create a file with index version 1 + let file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random()); + let old_index_id = RegionIndexId::new(file_id, 1); + let new_index_id = RegionIndexId::new(file_id, 2); + let old_index_path = location::index_file_path("table1", old_index_id, PathType::Bare); + let new_index_path = location::index_file_path("table1", new_index_id, PathType::Bare); + + object_store + .write(&old_index_path, vec![0; 4096]) + .await + .unwrap(); + object_store + .write(&new_index_path, vec![0; 4096]) + .await + .unwrap(); + + let file_meta = FileMeta { + region_id: file_id.region_id(), + file_id: file_id.file_id(), + time_range: FileTimeRange::default(), + level: 0, + file_size: 4096, + available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]), + indexes: vec![ColumnIndexMetadata { + column_id: 0, + created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]), + }], + index_file_size: 4096, + index_version: 2, // New version + num_rows: 1024, + num_row_groups: 1, + sequence: NonZeroU64::new(4096), + partition_expr: None, + num_series: 0, + }; + + // Create FileHandle with new index version - this should add reference to new index + let handle = FileHandle::new(file_meta.clone(), file_purger.clone()); + + // Check that reference was added to new index + { + let info = file_purger.index_refs.get(&new_index_id).unwrap(); + assert_eq!(info.ref_count, 1); + assert!(!info.is_deleted); + } // info is dropped here + + // Update index - this should mark old index as deleted + file_purger.update_index(file_meta.clone(), 1); + + // Check that old index is marked as deleted + { + let info = file_purger.index_refs.get(&old_index_id); + // Old index should not exist as it had no references + assert!(info.is_none()); + } // info is dropped here + + // New index should still have its reference + { + let info = file_purger.index_refs.get(&new_index_id).unwrap(); + assert_eq!(info.ref_count, 1); + assert!(!info.is_deleted); + } // info is dropped here + + // Drop the handle + drop(handle); + + // Wait for async operations + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + // New index should still exist as it was the current one + assert!(object_store.exists(&new_index_path).await.unwrap()); + + scheduler.stop(true).await.unwrap(); + } + + // Tests for Edge Cases and Error Conditions + + #[tokio::test] + async fn test_concurrent_index_operations() { + let scheduler = Arc::new(LocalScheduler::new(3)); + let dir = create_temp_dir("test-concurrent-index-ops"); + let dir_path = dir.path().display().to_string(); + let builder = Fs::default().root(&dir_path); + let object_store = ObjectStore::new(builder).unwrap().finish(); + + let index_aux_path = dir.path().join("index_aux"); + let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None) + .await + .unwrap(); + let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap()) + .await + .unwrap(); + + let layer = Arc::new(AccessLayer::new( + "table1", + PathType::Bare, + object_store, + puffin_mgr, + intm_mgr, + )); + + let file_purger = Arc::new(LocalFilePurger::new(scheduler, layer, None)); + + let file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random()); + let index_id = RegionIndexId::new(file_id, 1); + + let mut handles = vec![]; + + // Spawn multiple threads to add references concurrently + for _ in 0..10 { + let purger = file_purger.clone(); + let idx = index_id; + let handle = tokio::spawn(async move { + purger.add_index_reference(idx); + }); + handles.push(handle); + } + + // Wait for all operations to complete + for handle in handles { + handle.await.unwrap(); + } + + // Check that all 10 references were added + { + let info = file_purger.index_refs.get(&index_id).unwrap(); + assert_eq!(info.ref_count, 10); + assert!(!info.is_deleted); + } // info is dropped here + + // Spawn multiple threads to remove references concurrently + let mut handles = vec![]; + for _ in 0..10 { + let purger = file_purger.clone(); + let idx = index_id; + let handle = tokio::spawn(async move { + purger.remove_index_reference(&idx); + }); + handles.push(handle); + } + + // Wait for all operations to complete + for handle in handles { + handle.await.unwrap(); + } + + // Index should be removed as ref_count is 0 + assert!(file_purger.index_refs.get(&index_id).is_none()); + } + + #[tokio::test] + async fn test_multiple_file_handles_same_index() { + let scheduler = Arc::new(LocalScheduler::new(3)); + let dir = create_temp_dir("test-multiple-handles-same-index"); + let dir_path = dir.path().display().to_string(); + let builder = Fs::default().root(&dir_path); + let object_store = ObjectStore::new(builder).unwrap().finish(); + + let index_aux_path = dir.path().join("index_aux"); + let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None) + .await + .unwrap(); + let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap()) + .await + .unwrap(); + + let layer = Arc::new(AccessLayer::new( + "table1", + PathType::Bare, + object_store, + puffin_mgr, + intm_mgr, + )); + + let file_purger = Arc::new(LocalFilePurger::new(scheduler, layer, None)); + + let file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random()); + let index_id = RegionIndexId::new(file_id, 1); + + let file_meta = FileMeta { + region_id: file_id.region_id(), + file_id: file_id.file_id(), + time_range: FileTimeRange::default(), + level: 0, + file_size: 4096, + available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]), + indexes: vec![ColumnIndexMetadata { + column_id: 0, + created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]), + }], + index_file_size: 4096, + index_version: 1, + num_rows: 1024, + num_row_groups: 1, + sequence: NonZeroU64::new(4096), + partition_expr: None, + num_series: 0, + }; + + // Create multiple FileHandles for the same file + let handle1 = FileHandle::new(file_meta.clone(), file_purger.clone()); + let handle2 = FileHandle::new(file_meta.clone(), file_purger.clone()); + let handle3 = FileHandle::new(file_meta, file_purger.clone()); + + // Each FileHandle should add a reference + { + let info = file_purger.index_refs.get(&index_id).unwrap(); + assert_eq!(info.ref_count, 3); + } // info is dropped here + + // Mark all handles as deleted + handle1.mark_deleted(); + handle2.mark_deleted(); + handle3.mark_deleted(); + + // Drop one handle - should not delete yet + drop(handle1); + { + let info = file_purger.index_refs.get(&index_id).unwrap(); + assert_eq!(info.ref_count, 2); + } // info is dropped here + + // Drop another handle - should not delete yet + drop(handle2); + { + let info = file_purger.index_refs.get(&index_id).unwrap(); + assert_eq!(info.ref_count, 1); + } // info is dropped here + + // Drop the last handle - should delete now + drop(handle3); + + // Index should be removed from map + assert!(file_purger.index_refs.get(&index_id).is_none()); + } + + #[tokio::test] + async fn test_error_handling_during_deletion() { + common_telemetry::init_default_ut_logging(); + + let scheduler = Arc::new(LocalScheduler::new(3)); + let dir = create_temp_dir("test-error-handling-deletion"); + let dir_path = dir.path().display().to_string(); + let builder = Fs::default().root(&dir_path); + let object_store = ObjectStore::new(builder).unwrap().finish(); + + let index_aux_path = dir.path().join("index_aux"); + let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None) + .await + .unwrap(); + let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap()) + .await + .unwrap(); + + let layer = Arc::new(AccessLayer::new( + "table1", + PathType::Bare, + object_store.clone(), + puffin_mgr, + intm_mgr, + )); + + let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer.clone(), None)); + + // Create a file with index + let file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random()); + let index_id = RegionIndexId::new(file_id, 1); + let file_path = location::sst_file_path("table1", file_id, PathType::Bare); + let index_path = location::index_file_path("table1", index_id, PathType::Bare); + + object_store.write(&file_path, vec![0; 4096]).await.unwrap(); + object_store + .write(&index_path, vec![0; 4096]) + .await + .unwrap(); + + let file_meta = FileMeta { + region_id: file_id.region_id(), + file_id: file_id.file_id(), + time_range: FileTimeRange::default(), + level: 0, + file_size: 4096, + available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]), + indexes: vec![ColumnIndexMetadata { + column_id: 0, + created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]), + }], + index_file_size: 4096, + index_version: 1, + num_rows: 1024, + num_row_groups: 1, + sequence: NonZeroU64::new(4096), + partition_expr: None, + num_series: 0, + }; + + // Create FileHandle + let handle = FileHandle::new(file_meta, file_purger.clone()); + + // Manually delete the index file to simulate an error during deletion + object_store.delete(&index_path).await.unwrap(); + + // Mark file as deleted and drop handle + handle.mark_deleted(); + drop(handle); + + // Wait for async deletion - should handle error gracefully + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + // File should still be deleted despite index deletion error + assert!(!object_store.exists(&file_path).await.unwrap()); + + scheduler.stop(true).await.unwrap(); + } + + // Tests for Integration with Storage Layer + + #[tokio::test] + async fn test_local_file_system_integration() { + common_telemetry::init_default_ut_logging(); + + let scheduler = Arc::new(LocalScheduler::new(3)); + let dir = create_temp_dir("test-local-fs-integration"); + let dir_path = dir.path().display().to_string(); + let builder = Fs::default().root(&dir_path); + let object_store = ObjectStore::new(builder).unwrap().finish(); + + let index_aux_path = dir.path().join("index_aux"); + let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None) + .await + .unwrap(); + let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap()) + .await + .unwrap(); + + let layer = Arc::new(AccessLayer::new( + "table1", + PathType::Bare, + object_store.clone(), + puffin_mgr, + intm_mgr, + )); + + let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer.clone(), None)); + + // Create a file with index + let file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random()); + let index_id = RegionIndexId::new(file_id, 1); + let file_path = location::sst_file_path("table1", file_id, PathType::Bare); + let index_path = location::index_file_path("table1", index_id, PathType::Bare); + + object_store.write(&file_path, vec![0; 4096]).await.unwrap(); + object_store + .write(&index_path, vec![0; 4096]) + .await + .unwrap(); + + let file_meta = FileMeta { + region_id: file_id.region_id(), + file_id: file_id.file_id(), + time_range: FileTimeRange::default(), + level: 0, + file_size: 4096, + available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]), + indexes: vec![ColumnIndexMetadata { + column_id: 0, + created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]), + }], + index_file_size: 4096, + index_version: 1, + num_rows: 1024, + num_row_groups: 1, + sequence: NonZeroU64::new(4096), + partition_expr: None, + num_series: 0, + }; + + // Create FileHandle + let handle = FileHandle::new(file_meta, file_purger.clone()); + + // Verify files exist + assert!(object_store.exists(&file_path).await.unwrap()); + assert!(object_store.exists(&index_path).await.unwrap()); + + // Mark file as deleted and drop handle + handle.mark_deleted(); + drop(handle); + + // Wait for async deletion + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + // Verify files are deleted + assert!(!object_store.exists(&file_path).await.unwrap()); + assert!(!object_store.exists(&index_path).await.unwrap()); + + scheduler.stop(true).await.unwrap(); + } }