From dbeba421db6b4e5999da82c0cb55a3b2b943b96b Mon Sep 17 00:00:00 2001 From: discord9 Date: Wed, 10 Dec 2025 16:25:22 +0800 Subject: [PATCH] feat: mark index outdated Signed-off-by: discord9 --- src/cmd/src/datanode/objbench.rs | 2 +- src/mito2/src/access_layer.rs | 31 ++++-- src/mito2/src/sst/file.rs | 111 ++++++++++++++----- src/mito2/src/sst/file_purger.rs | 26 ++++- src/mito2/src/sst/index.rs | 53 ++++++--- src/mito2/src/sst/version.rs | 10 +- src/mito2/src/worker/handle_rebuild_index.rs | 1 + 7 files changed, 171 insertions(+), 63 deletions(-) diff --git a/src/cmd/src/datanode/objbench.rs b/src/cmd/src/datanode/objbench.rs index bdb12bcbcb..61b47552eb 100644 --- a/src/cmd/src/datanode/objbench.rs +++ b/src/cmd/src/datanode/objbench.rs @@ -564,7 +564,7 @@ fn new_noop_file_purger() -> FilePurgerRef { #[derive(Debug)] struct Noop; impl FilePurger for Noop { - fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool) {} + fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool, _index_outdated: bool) {} } Arc::new(Noop) } diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index ea7cbaca97..8888ade815 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -228,22 +228,31 @@ impl AccessLayer { // Delete all versions of the index file. for version in 0..=index_file_id.version { - let path = location::index_file_path( - &self.table_dir, - RegionIndexId::new(index_file_id.file_id, version), - self.path_type, - ); - self.object_store - .delete(&path) - .await - .context(DeleteIndexSnafu { - file_id: region_file_id.file_id(), - })?; + let index_id = RegionIndexId::new(*region_file_id, version); + self.delete_index(index_id).await?; } Ok(()) } + pub(crate) async fn delete_index( + &self, + index_file_id: RegionIndexId, + ) -> Result<(), crate::error::Error> { + let path = location::index_file_path( + &self.table_dir, + RegionIndexId::new(index_file_id.file_id, index_file_id.version), + self.path_type, + ); + self.object_store + .delete(&path) + .await + .context(DeleteIndexSnafu { + file_id: index_file_id.file_id(), + })?; + Ok(()) + } + /// Returns the directory of the region in the table. pub fn build_region_dir(&self, region_id: RegionId) -> String { region_dir_from_table_dir(&self.table_dir, region_id, self.path_type) diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index bd12720011..05995123c5 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -434,6 +434,16 @@ impl FileHandle { self.inner.compacting.store(compacting, Ordering::Relaxed); } + pub fn index_outdated(&self) -> bool { + self.inner.index_outdated.load(Ordering::Relaxed) + } + + pub fn set_index_outdated(&self, index_outdated: bool) { + self.inner + .index_outdated + .store(index_outdated, Ordering::Relaxed); + } + /// Returns a reference to the [FileMeta]. pub fn meta_ref(&self) -> &FileMeta { &self.inner.meta @@ -471,23 +481,29 @@ struct FileHandleInner { meta: FileMeta, compacting: AtomicBool, deleted: AtomicBool, + index_outdated: AtomicBool, file_purger: FilePurgerRef, } impl Drop for FileHandleInner { fn drop(&mut self) { - self.file_purger - .remove_file(self.meta.clone(), self.deleted.load(Ordering::Relaxed)); + self.file_purger.remove_file( + self.meta.clone(), + self.deleted.load(Ordering::Acquire), + self.index_outdated.load(Ordering::Acquire), + ); } } impl FileHandleInner { + /// There should only be one `FileHandleInner` for each file on a datanode fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandleInner { file_purger.new_file(&meta); FileHandleInner { meta, compacting: AtomicBool::new(false), deleted: AtomicBool::new(false), + index_outdated: AtomicBool::new(false), file_purger, } } @@ -540,38 +556,77 @@ pub async fn delete_files( ); for (file_id, index_version) in file_ids { - if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache()) { - // Removes index file from the cache. - if delete_index { - write_cache - .remove(IndexKey::new( - region_id, - *file_id, - FileType::Puffin(*index_version), - )) - .await; - } + purge_index_cache_stager( + region_id, + delete_index, + access_layer, + cache_manager, + *file_id, + *index_version, + ) + .await; + } + Ok(()) +} - // Remove the SST file from the cache. +pub async fn delete_index( + region_index_id: RegionIndexId, + access_layer: &AccessLayerRef, + cache_manager: &Option, +) -> crate::error::Result<()> { + access_layer.delete_index(region_index_id).await?; + + purge_index_cache_stager( + region_index_id.region_id(), + true, + access_layer, + cache_manager, + region_index_id.file_id(), + region_index_id.version, + ) + .await; + + Ok(()) +} + +async fn purge_index_cache_stager( + region_id: RegionId, + delete_index: bool, + access_layer: &AccessLayerRef, + cache_manager: &Option, + file_id: FileId, + index_version: u64, +) { + if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache()) { + // Removes index file from the cache. + if delete_index { write_cache - .remove(IndexKey::new(region_id, *file_id, FileType::Parquet)) + .remove(IndexKey::new( + region_id, + file_id, + FileType::Puffin(index_version), + )) .await; } - // Purges index content in the stager. - if let Err(e) = access_layer - .puffin_manager_factory() - .purge_stager(RegionIndexId::new( - RegionFileId::new(region_id, *file_id), - *index_version, - )) - .await - { - error!(e; "Failed to purge stager with index file, file_id: {}, index_version: {}, region: {}", - file_id, index_version, region_id); - } + // Remove the SST file from the cache. + write_cache + .remove(IndexKey::new(region_id, file_id, FileType::Parquet)) + .await; + } + + // Purges index content in the stager. + if let Err(e) = access_layer + .puffin_manager_factory() + .purge_stager(RegionIndexId::new( + RegionFileId::new(region_id, file_id), + index_version, + )) + .await + { + error!(e; "Failed to purge stager with index file, file_id: {}, index_version: {}, region: {}", + file_id, index_version, region_id); } - Ok(()) } #[cfg(test)] diff --git a/src/mito2/src/sst/file_purger.rs b/src/mito2/src/sst/file_purger.rs index fd405896b0..2715223a51 100644 --- a/src/mito2/src/sst/file_purger.rs +++ b/src/mito2/src/sst/file_purger.rs @@ -21,7 +21,7 @@ use crate::access_layer::AccessLayerRef; use crate::cache::CacheManagerRef; use crate::error::Result; use crate::schedule::scheduler::SchedulerRef; -use crate::sst::file::{FileMeta, delete_files}; +use crate::sst::file::{FileMeta, delete_files, delete_index}; use crate::sst::file_ref::FileReferenceManagerRef; /// A worker to delete files in background. @@ -29,7 +29,8 @@ pub trait FilePurger: Send + Sync + fmt::Debug { /// Send a request to remove the file. /// If `is_delete` is true, the file will be deleted from the storage. /// Otherwise, only the reference will be removed. - fn remove_file(&self, file_meta: FileMeta, is_delete: bool); + /// If `index_outdated` is true, the index file will be deleted regardless of `is_delete`. + fn remove_file(&self, file_meta: FileMeta, is_delete: bool, index_outdated: bool); /// Notify the purger of a new file created. /// This is useful for object store based storage, where we need to track the file references @@ -46,7 +47,7 @@ pub type FilePurgerRef = Arc; pub struct NoopFilePurger; impl FilePurger for NoopFilePurger { - fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool) { + fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool, _index_outdated: bool) { // noop } } @@ -142,12 +143,27 @@ impl LocalFilePurger { error!(e; "Failed to schedule the file purge request"); } } + + fn delete_index(&self, file_meta: FileMeta) { + let sst_layer = self.sst_layer.clone(); + let cache_manager = self.cache_manager.clone(); + if let Err(e) = self.scheduler.schedule(Box::pin(async move { + let index_id = file_meta.index_id(); + if let Err(e) = delete_index(index_id, &sst_layer, &cache_manager).await { + error!(e; "Failed to delete index for file {:?} from storage", file_meta); + } + })) { + error!(e; "Failed to schedule the index purge request"); + } + } } impl FilePurger for LocalFilePurger { - fn remove_file(&self, file_meta: FileMeta, is_delete: bool) { + fn remove_file(&self, file_meta: FileMeta, is_delete: bool, index_outdated: bool) { if is_delete { self.delete_file(file_meta); + } else if index_outdated { + self.delete_index(file_meta); } } } @@ -158,7 +174,7 @@ pub struct ObjectStoreFilePurger { } impl FilePurger for ObjectStoreFilePurger { - fn remove_file(&self, file_meta: FileMeta, _is_delete: bool) { + fn remove_file(&self, file_meta: FileMeta, _is_delete: bool, _index_outdated: bool) { // if not on local file system, instead inform the global file purger to remove the file reference. // notice that no matter whether the file is deleted or not, we need to remove the reference // because the file is no longer in use nonetheless. diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index 12f3d9f1d7..c44535a895 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -503,6 +503,8 @@ pub type ResultMpscSender = Sender>; #[derive(Clone)] pub struct IndexBuildTask { + /// The SST file handle to build index for. + pub file: FileHandle, /// The file meta to build index for. pub file_meta: FileMeta, pub reason: IndexBuildType, @@ -651,10 +653,7 @@ impl IndexBuildTask { let mut parquet_reader = self .access_layer - .read_sst(FileHandle::new( - self.file_meta.clone(), - self.file_purger.clone(), - )) + .read_sst(self.file.clone()) // use the latest file handle instead of creating a new one .build() .await?; @@ -1498,14 +1497,19 @@ mod tests { let region_id = metadata.region_id; let indexer_builder = mock_indexer_builder(metadata, &env).await; + let file_meta = FileMeta { + region_id, + file_id: FileId::random(), + file_size: 100, + ..Default::default() + }; + + let file = FileHandle::new(file_meta.clone(), file_purger.clone()); + // Create mock task. let task = IndexBuildTask { - file_meta: FileMeta { - region_id, - file_id: FileId::random(), - file_size: 100, - ..Default::default() - }, + file, + file_meta, reason: IndexBuildType::Flush, access_layer: env.access_layer.clone(), listener: WorkerListener::default(), @@ -1555,10 +1559,13 @@ mod tests { mock_version_control(metadata.clone(), file_purger.clone(), files).await; let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await; + let file = FileHandle::new(file_meta.clone(), file_purger.clone()); + // Create mock task. let (tx, mut rx) = mpsc::channel(4); let (result_tx, mut result_rx) = mpsc::channel::>(4); let task = IndexBuildTask { + file, file_meta: file_meta.clone(), reason: IndexBuildType::Flush, access_layer: env.access_layer.clone(), @@ -1626,10 +1633,13 @@ mod tests { mock_version_control(metadata.clone(), file_purger.clone(), files).await; let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await; + let file = FileHandle::new(file_meta.clone(), file_purger.clone()); + // Create mock task. let (tx, _rx) = mpsc::channel(4); let (result_tx, mut result_rx) = mpsc::channel::>(4); let task = IndexBuildTask { + file, file_meta: file_meta.clone(), reason: IndexBuildType::Flush, access_layer: env.access_layer.clone(), @@ -1726,10 +1736,13 @@ mod tests { mock_version_control(metadata.clone(), file_purger.clone(), files).await; let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await; + let file = FileHandle::new(file_meta.clone(), file_purger.clone()); + // Create mock task. let (tx, mut rx) = mpsc::channel(4); let (result_tx, mut result_rx) = mpsc::channel::>(4); let task = IndexBuildTask { + file, file_meta: file_meta.clone(), reason: IndexBuildType::Flush, access_layer: env.access_layer.clone(), @@ -1813,10 +1826,13 @@ mod tests { let version_control = mock_version_control(metadata.clone(), file_purger.clone(), files).await; + let file = FileHandle::new(file_meta.clone(), file_purger.clone()); + // Create mock task. let (tx, mut _rx) = mpsc::channel(4); let (result_tx, mut result_rx) = mpsc::channel::>(4); let task = IndexBuildTask { + file, file_meta: file_meta.clone(), reason: IndexBuildType::Flush, access_layer: env.access_layer.clone(), @@ -1864,13 +1880,18 @@ mod tests { let (tx, _rx) = mpsc::channel(4); let (result_tx, _result_rx) = mpsc::channel::>(4); + let file_meta = FileMeta { + region_id, + file_id, + file_size: 100, + ..Default::default() + }; + + let file = FileHandle::new(file_meta.clone(), file_purger.clone()); + IndexBuildTask { - file_meta: FileMeta { - region_id, - file_id, - file_size: 100, - ..Default::default() - }, + file, + file_meta, reason, access_layer: env.access_layer.clone(), listener: WorkerListener::default(), diff --git a/src/mito2/src/sst/version.rs b/src/mito2/src/sst/version.rs index 6cae6ce83d..da78606cdb 100644 --- a/src/mito2/src/sst/version.rs +++ b/src/mito2/src/sst/version.rs @@ -57,9 +57,15 @@ impl SstVersion { ) { for file in files_to_add { let level = file.level; - self.levels[level as usize] + let new_index_version = file.index_version; + if let Some(old_file_handle) = self.levels[level as usize] .files - .insert(file.file_id, FileHandle::new(file, file_purger.clone())); + .insert(file.file_id, FileHandle::new(file, file_purger.clone())) + && old_file_handle.index_id().version < new_index_version + { + // now the old file handle's index is outdated + old_file_handle.set_index_outdated(true); + } } } diff --git a/src/mito2/src/worker/handle_rebuild_index.rs b/src/mito2/src/worker/handle_rebuild_index.rs index 54f7f8abaf..ed2390d853 100644 --- a/src/mito2/src/worker/handle_rebuild_index.rs +++ b/src/mito2/src/worker/handle_rebuild_index.rs @@ -72,6 +72,7 @@ impl RegionWorkerLoop { }); IndexBuildTask { + file: file.clone(), file_meta: file.meta_ref().clone(), reason: build_type, access_layer: access_layer.clone(),