mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
feat: delete index files properly
Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
@@ -228,22 +228,25 @@ impl AccessLayer {
|
||||
|
||||
// Delete all versions of the index file.
|
||||
for version in 0..=index_file_id.version {
|
||||
let path = location::index_file_path(
|
||||
&self.table_dir,
|
||||
RegionIndexId::new(index_file_id.file_id, version),
|
||||
self.path_type,
|
||||
);
|
||||
self.object_store
|
||||
.delete(&path)
|
||||
.await
|
||||
.context(DeleteIndexSnafu {
|
||||
file_id: region_file_id.file_id(),
|
||||
})?;
|
||||
self.delete_index(&RegionIndexId::new(index_file_id.file_id, version))
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_index(&self, region_index_id: &RegionIndexId) -> Result<()> {
|
||||
let path = location::index_file_path(&self.table_dir, *region_index_id, self.path_type);
|
||||
self.object_store
|
||||
.delete(&path)
|
||||
.await
|
||||
.context(DeleteIndexSnafu {
|
||||
file_id: region_index_id.file_id(),
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns the directory of the region in the table.
|
||||
pub fn build_region_dir(&self, region_id: RegionId) -> String {
|
||||
region_dir_from_table_dir(&self.table_dir, region_id, self.path_type)
|
||||
|
||||
@@ -21,7 +21,7 @@ use crate::access_layer::AccessLayerRef;
|
||||
use crate::cache::CacheManagerRef;
|
||||
use crate::error::Result;
|
||||
use crate::schedule::scheduler::SchedulerRef;
|
||||
use crate::sst::file::{FileMeta, delete_files};
|
||||
use crate::sst::file::{FileMeta, IndexVersion, RegionIndexId, delete_files};
|
||||
use crate::sst::file_ref::FileReferenceManagerRef;
|
||||
|
||||
/// A worker to delete files in background.
|
||||
@@ -31,6 +31,10 @@ pub trait FilePurger: Send + Sync + fmt::Debug {
|
||||
/// Otherwise, only the reference will be removed.
|
||||
fn remove_file(&self, file_meta: FileMeta, is_delete: bool);
|
||||
|
||||
/// Remove the index with given version. Notice that this only removes one index file
|
||||
/// given by `version`, ignore the version in `file_meta`.
|
||||
fn remove_index(&self, file_meta: FileMeta, version: IndexVersion, is_delete: bool);
|
||||
|
||||
/// Notify the purger of a new file created.
|
||||
/// This is useful for object store based storage, where we need to track the file references
|
||||
/// The default implementation is a no-op.
|
||||
@@ -49,6 +53,10 @@ impl FilePurger for NoopFilePurger {
|
||||
fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool) {
|
||||
// noop
|
||||
}
|
||||
|
||||
fn remove_index(&self, _file_meta: FileMeta, _version: IndexVersion, _is_delete: bool) {
|
||||
// noop
|
||||
}
|
||||
}
|
||||
|
||||
/// Purger that purges file for current region.
|
||||
@@ -142,6 +150,18 @@ impl LocalFilePurger {
|
||||
error!(e; "Failed to schedule the file purge request");
|
||||
}
|
||||
}
|
||||
|
||||
fn delete_index(&self, file_meta: FileMeta, version: IndexVersion) {
|
||||
let sst_layer = self.sst_layer.clone();
|
||||
if let Err(e) = self.scheduler.schedule(Box::pin(async move {
|
||||
let index_id = RegionIndexId::new(file_meta.file_id(), version);
|
||||
if let Err(e) = sst_layer.delete_index(&index_id).await {
|
||||
error!(e; "Failed to delete index {:?} from storage", index_id);
|
||||
}
|
||||
})) {
|
||||
error!(e; "Failed to schedule the index purge request");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl FilePurger for LocalFilePurger {
|
||||
@@ -150,6 +170,12 @@ impl FilePurger for LocalFilePurger {
|
||||
self.delete_file(file_meta);
|
||||
}
|
||||
}
|
||||
|
||||
fn remove_index(&self, file_meta: FileMeta, version: IndexVersion, is_delete: bool) {
|
||||
if is_delete {
|
||||
self.delete_index(file_meta, version);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -166,6 +192,10 @@ impl FilePurger for ObjectStoreFilePurger {
|
||||
// TODO(discord9): consider impl a .tombstone file to reduce files needed to list
|
||||
}
|
||||
|
||||
fn remove_index(&self, _file_meta: FileMeta, _version: IndexVersion, _is_delete: bool) {
|
||||
// TODO(discord9): add index reference management for object store based storage
|
||||
}
|
||||
|
||||
fn new_file(&self, file_meta: &FileMeta) {
|
||||
self.file_ref_manager.add_file(file_meta);
|
||||
}
|
||||
|
||||
@@ -801,6 +801,11 @@ impl IndexBuildTask {
|
||||
self.file_meta.region_id,
|
||||
self.reason.as_str()
|
||||
);
|
||||
// notify the file purger to remove the old index files if any
|
||||
if new_index_version > 0 {
|
||||
self.file_purger
|
||||
.remove_index(self.file_meta.clone(), new_index_version - 1, true);
|
||||
}
|
||||
Ok(edit)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user