diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index ea7cbaca97..cde76c1abb 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -228,22 +228,25 @@ impl AccessLayer { // Delete all versions of the index file. for version in 0..=index_file_id.version { - let path = location::index_file_path( - &self.table_dir, - RegionIndexId::new(index_file_id.file_id, version), - self.path_type, - ); - self.object_store - .delete(&path) - .await - .context(DeleteIndexSnafu { - file_id: region_file_id.file_id(), - })?; + self.delete_index(&RegionIndexId::new(index_file_id.file_id, version)) + .await?; } Ok(()) } + pub(crate) async fn delete_index(&self, region_index_id: &RegionIndexId) -> Result<()> { + let path = location::index_file_path(&self.table_dir, *region_index_id, self.path_type); + self.object_store + .delete(&path) + .await + .context(DeleteIndexSnafu { + file_id: region_index_id.file_id(), + })?; + + Ok(()) + } + /// Returns the directory of the region in the table. pub fn build_region_dir(&self, region_id: RegionId) -> String { region_dir_from_table_dir(&self.table_dir, region_id, self.path_type) diff --git a/src/mito2/src/sst/file_purger.rs b/src/mito2/src/sst/file_purger.rs index fd405896b0..8ef82728b6 100644 --- a/src/mito2/src/sst/file_purger.rs +++ b/src/mito2/src/sst/file_purger.rs @@ -21,7 +21,7 @@ use crate::access_layer::AccessLayerRef; use crate::cache::CacheManagerRef; use crate::error::Result; use crate::schedule::scheduler::SchedulerRef; -use crate::sst::file::{FileMeta, delete_files}; +use crate::sst::file::{FileMeta, IndexVersion, RegionIndexId, delete_files}; use crate::sst::file_ref::FileReferenceManagerRef; /// A worker to delete files in background. @@ -31,6 +31,10 @@ pub trait FilePurger: Send + Sync + fmt::Debug { /// Otherwise, only the reference will be removed. fn remove_file(&self, file_meta: FileMeta, is_delete: bool); + /// Remove the index with given version. Notice that this only removes one index file + /// given by `version`, ignore the version in `file_meta`. + fn remove_index(&self, file_meta: FileMeta, version: IndexVersion, is_delete: bool); + /// Notify the purger of a new file created. /// This is useful for object store based storage, where we need to track the file references /// The default implementation is a no-op. @@ -49,6 +53,10 @@ impl FilePurger for NoopFilePurger { fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool) { // noop } + + fn remove_index(&self, _file_meta: FileMeta, _version: IndexVersion, _is_delete: bool) { + // noop + } } /// Purger that purges file for current region. @@ -142,6 +150,18 @@ impl LocalFilePurger { error!(e; "Failed to schedule the file purge request"); } } + + fn delete_index(&self, file_meta: FileMeta, version: IndexVersion) { + let sst_layer = self.sst_layer.clone(); + if let Err(e) = self.scheduler.schedule(Box::pin(async move { + let index_id = RegionIndexId::new(file_meta.file_id(), version); + if let Err(e) = sst_layer.delete_index(&index_id).await { + error!(e; "Failed to delete index {:?} from storage", index_id); + } + })) { + error!(e; "Failed to schedule the index purge request"); + } + } } impl FilePurger for LocalFilePurger { @@ -150,6 +170,12 @@ impl FilePurger for LocalFilePurger { self.delete_file(file_meta); } } + + fn remove_index(&self, file_meta: FileMeta, version: IndexVersion, is_delete: bool) { + if is_delete { + self.delete_index(file_meta, version); + } + } } #[derive(Debug)] @@ -166,6 +192,10 @@ impl FilePurger for ObjectStoreFilePurger { // TODO(discord9): consider impl a .tombstone file to reduce files needed to list } + fn remove_index(&self, _file_meta: FileMeta, _version: IndexVersion, _is_delete: bool) { + // TODO(discord9): add index reference management for object store based storage + } + fn new_file(&self, file_meta: &FileMeta) { self.file_ref_manager.add_file(file_meta); } diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index 12f3d9f1d7..22a6333ace 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -801,6 +801,11 @@ impl IndexBuildTask { self.file_meta.region_id, self.reason.as_str() ); + // notify the file purger to remove the old index files if any + if new_index_version > 0 { + self.file_purger + .remove_index(self.file_meta.clone(), new_index_version - 1, true); + } Ok(edit) } }