feat: track index?

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2025-12-10 14:09:10 +08:00
parent c82208dbc0
commit 0cd368d1f6
3 changed files with 987 additions and 30 deletions

View File

@@ -37,7 +37,7 @@ use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu
use crate::metrics::{COMPACTION_STAGE_ELAPSED, FLUSH_ELAPSED};
use crate::read::{FlatSource, Source};
use crate::region::options::IndexOptions;
use crate::sst::file::{FileHandle, RegionFileId, RegionIndexId};
use crate::sst::file::{FileHandle, RegionFileId, RegionIndexId, delete_index};
use crate::sst::index::IndexerBuilderImpl;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::{PuffinManagerFactory, SstPuffinManager};

View File

@@ -538,38 +538,89 @@ pub async fn delete_files(
);
for (file_id, index_version) in file_ids {
if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache()) {
// Removes index file from the cache.
if delete_index {
write_cache
.remove(IndexKey::new(
region_id,
*file_id,
FileType::Puffin(*index_version),
))
.await;
}
purge_index_write_cache_stager(
region_id,
delete_index,
access_layer,
cache_manager,
file_id,
index_version,
)
.await;
}
Ok(())
}
// Remove the SST file from the cache.
/// Delete index file for a given SST file&index version.
pub async fn delete_index(
region_id: RegionId,
file_id: FileId,
index_version: u64,
access_layer: &AccessLayerRef,
cache_manager: &Option<CacheManagerRef>,
) -> crate::error::Result<()> {
if let Err(err) = access_layer
.delete_index(&RegionIndexId::new(
RegionFileId::new(region_id, file_id),
index_version,
))
.await
{
error!(err; "Failed to delete index file for {}/{}.{}",
region_id, file_id, index_version);
}
purge_index_write_cache_stager(
region_id,
true,
access_layer,
cache_manager,
&file_id,
&index_version,
)
.await;
Ok(())
}
pub async fn purge_index_write_cache_stager(
region_id: RegionId,
delete_index: bool,
access_layer: &Arc<crate::access_layer::AccessLayer>,
cache_manager: &Option<Arc<crate::cache::CacheManager>>,
file_id: &FileId,
index_version: &u64,
) {
if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache()) {
// Removes index file from the cache.
if delete_index {
write_cache
.remove(IndexKey::new(region_id, *file_id, FileType::Parquet))
.remove(IndexKey::new(
region_id,
*file_id,
FileType::Puffin(*index_version),
))
.await;
}
// Purges index content in the stager.
if let Err(e) = access_layer
.puffin_manager_factory()
.purge_stager(RegionIndexId::new(
RegionFileId::new(region_id, *file_id),
*index_version,
))
.await
{
error!(e; "Failed to purge stager with index file, file_id: {}, index_version: {}, region: {}",
file_id, index_version, region_id);
}
// Remove the SST file from the cache.
write_cache
.remove(IndexKey::new(region_id, *file_id, FileType::Parquet))
.await;
}
// Purges index content in the stager.
if let Err(e) = access_layer
.puffin_manager_factory()
.purge_stager(RegionIndexId::new(
RegionFileId::new(region_id, *file_id),
*index_version,
))
.await
{
error!(e; "Failed to purge stager with index file, file_id: {}, index_version: {}, region: {}",
file_id, index_version, region_id);
}
Ok(())
}
#[cfg(test)]

View File

@@ -16,15 +16,23 @@ use std::fmt;
use std::sync::Arc;
use common_telemetry::error;
use dashmap::DashMap;
use store_api::storage::IndexVersion;
use crate::access_layer::AccessLayerRef;
use crate::cache::CacheManagerRef;
use crate::error::Result;
use crate::schedule::scheduler::SchedulerRef;
use crate::sst::file::{FileMeta, RegionIndexId, delete_files};
use crate::sst::file::{FileMeta, RegionIndexId, delete_files, delete_index};
use crate::sst::file_ref::FileReferenceManagerRef;
/// Index file reference tracking information
#[derive(Debug, Default)]
struct IndexRefInfo {
ref_count: usize,
is_deleted: bool,
}
/// A worker to delete files in background.
pub trait FilePurger: Send + Sync + fmt::Debug {
/// Send a request to remove the file.
@@ -65,6 +73,8 @@ pub struct LocalFilePurger {
scheduler: SchedulerRef,
sst_layer: AccessLayerRef,
cache_manager: Option<CacheManagerRef>,
/// Maps RegionIndexId to (ref_count, is_deleted) for index files
index_refs: Arc<DashMap<RegionIndexId, IndexRefInfo>>,
}
impl fmt::Debug for LocalFilePurger {
@@ -123,6 +133,7 @@ impl LocalFilePurger {
scheduler,
sst_layer,
cache_manager,
index_refs: Arc::new(DashMap::new()),
}
}
@@ -131,6 +142,66 @@ impl LocalFilePurger {
self.scheduler.stop(true).await
}
/// Add a reference to an index file
pub fn add_index_reference(&self, index_id: RegionIndexId) {
self.index_refs
.entry(index_id)
.and_modify(|info| {
info.ref_count += 1;
})
.or_insert(IndexRefInfo {
ref_count: 1,
is_deleted: false,
});
}
/// Remove a reference to an index file, returns should_delete
pub fn remove_index_reference(&self, index_id: &RegionIndexId) -> bool {
let mut should_delete = false;
if let Some(mut info) = self.index_refs.get_mut(index_id) {
info.ref_count -= 1;
should_delete = info.is_deleted && info.ref_count == 0;
if info.ref_count == 0 {
// Remove from map if no more references
drop(info); // Release the lock before removing
self.index_refs.remove(index_id);
}
}
should_delete
}
/// Mark an index file as deleted (but don't actually delete yet)
pub fn mark_index_deleted(&self, index_id: &RegionIndexId) -> bool {
let should_delete;
if let Some(mut info) = self.index_refs.get_mut(index_id) {
info.is_deleted = true;
should_delete = info.ref_count == 0;
if should_delete {
// Remove from map if no references
drop(info); // Release the lock before removing
self.index_refs.remove(index_id);
}
} else {
// No references exist, can delete immediately
should_delete = true;
}
should_delete
}
/// Check and delete index if it's marked deleted and has no references
fn check_and_delete_index(&self, file_meta: FileMeta, index_version: IndexVersion) {
let index_id = RegionIndexId::new(file_meta.file_id(), index_version);
if self.mark_index_deleted(&index_id) {
self.delete_index(file_meta, index_version);
}
}
/// Deletes the file(and it's index, if any) from cache and storage.
fn delete_file(&self, file_meta: FileMeta) {
let sst_layer = self.sst_layer.clone();
@@ -158,9 +229,18 @@ impl LocalFilePurger {
return;
}
let sst_layer = self.sst_layer.clone();
let cache_manager = self.cache_manager.clone();
if let Err(e) = self.scheduler.schedule(Box::pin(async move {
let index_id = RegionIndexId::new(file_meta.file_id(), old_version);
if let Err(e) = sst_layer.delete_index(&index_id).await {
if let Err(e) = delete_index(
file_meta.region_id,
file_meta.file_id,
old_version,
&sst_layer,
&cache_manager,
)
.await
{
error!(e; "Failed to delete index {:?} from storage", index_id);
}
})) {
@@ -171,13 +251,34 @@ impl LocalFilePurger {
impl FilePurger for LocalFilePurger {
fn remove_file(&self, file_meta: FileMeta, is_delete: bool) {
// Release reference to current index version if it exists
if file_meta.exists_index() {
let index_id = RegionIndexId::new(file_meta.file_id(), file_meta.index_version);
let should_delete = self.remove_index_reference(&index_id);
// If this index was marked for deletion and ref count reached 0, delete it
if should_delete {
self.delete_index(file_meta.clone(), file_meta.index_version);
}
}
if is_delete {
self.delete_file(file_meta);
}
}
fn update_index(&self, file_meta: FileMeta, old_version: IndexVersion) {
self.delete_index(file_meta, old_version);
// Mark old index as deleted, but only actually delete if no references
self.check_and_delete_index(file_meta, old_version);
}
fn new_file(&self, file_meta: &FileMeta) {
if file_meta.exists_index() {
self.add_index_reference(RegionIndexId::new(
file_meta.file_id(),
file_meta.index_version,
));
}
}
}
@@ -363,4 +464,809 @@ mod tests {
assert!(!object_store.exists(&path).await.unwrap());
assert!(!object_store.exists(&index_path).await.unwrap());
}
// Tests for Index Reference Counting Logic
#[tokio::test]
async fn test_add_index_reference() {
let scheduler = Arc::new(LocalScheduler::new(3));
let dir = create_temp_dir("test-add-index-ref");
let dir_path = dir.path().display().to_string();
let builder = Fs::default().root(&dir_path);
let object_store = ObjectStore::new(builder).unwrap().finish();
let index_aux_path = dir.path().join("index_aux");
let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
.await
.unwrap();
let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
.await
.unwrap();
let layer = Arc::new(AccessLayer::new(
"table1",
PathType::Bare,
object_store,
puffin_mgr,
intm_mgr,
));
let file_purger = LocalFilePurger::new(scheduler.clone(), layer, None);
// Test adding a new index reference
let file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
let index_id = RegionIndexId::new(file_id, 1);
file_purger.add_index_reference(index_id);
{
// Check that the reference was added
let info = file_purger.index_refs.get(&index_id).unwrap();
assert_eq!(info.ref_count, 1);
assert!(!info.is_deleted);
}
// Test adding another reference to the same index
file_purger.add_index_reference(index_id);
{
let info = file_purger.index_refs.get(&index_id).unwrap();
assert_eq!(info.ref_count, 2);
assert!(!info.is_deleted);
}
scheduler.stop(true).await.unwrap();
}
#[tokio::test]
async fn test_remove_index_reference() {
let scheduler = Arc::new(LocalScheduler::new(3));
let dir = create_temp_dir("test-remove-index-ref");
let dir_path = dir.path().display().to_string();
let builder = Fs::default().root(&dir_path);
let object_store = ObjectStore::new(builder).unwrap().finish();
let index_aux_path = dir.path().join("index_aux");
let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
.await
.unwrap();
let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
.await
.unwrap();
let layer = Arc::new(AccessLayer::new(
"table1",
PathType::Bare,
object_store,
puffin_mgr,
intm_mgr,
));
let file_purger = LocalFilePurger::new(scheduler.clone(), layer, None);
// Add an index reference first
let file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
let index_id = RegionIndexId::new(file_id, 1);
file_purger.add_index_reference(index_id);
file_purger.add_index_reference(index_id); // Add twice for ref_count = 2
// Remove one reference
let should_delete = file_purger.remove_index_reference(&index_id);
assert!(!should_delete); // Should not delete as ref_count is still 1
{
let info = file_purger.index_refs.get(&index_id).unwrap();
assert_eq!(info.ref_count, 1);
} // info is dropped here
// Remove the second reference
let should_delete = file_purger.remove_index_reference(&index_id);
assert!(!should_delete); // Should not delete as is_deleted is false
// Index should be removed from map as ref_count is 0
assert!(file_purger.index_refs.get(&index_id).is_none());
scheduler.stop(true).await.unwrap();
}
#[tokio::test]
async fn test_remove_index_reference_with_deleted_flag() {
let scheduler = Arc::new(LocalScheduler::new(3));
let dir = create_temp_dir("test-remove-index-ref-deleted");
let dir_path = dir.path().display().to_string();
let builder = Fs::default().root(&dir_path);
let object_store = ObjectStore::new(builder).unwrap().finish();
let index_aux_path = dir.path().join("index_aux");
let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
.await
.unwrap();
let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
.await
.unwrap();
let layer = Arc::new(AccessLayer::new(
"table1",
PathType::Bare,
object_store,
puffin_mgr,
intm_mgr,
));
let file_purger = LocalFilePurger::new(scheduler.clone(), layer, None);
// Add an index reference
let file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
let index_id = RegionIndexId::new(file_id, 1);
file_purger.add_index_reference(index_id);
// Mark as deleted
let should_delete = file_purger.mark_index_deleted(&index_id);
assert!(!should_delete); // Should not delete as ref_count is 1
// Remove the reference
let should_delete = file_purger.remove_index_reference(&index_id);
assert!(should_delete); // Should delete as is_deleted is true and ref_count is 0
// Index should be removed from map
assert!(file_purger.index_refs.get(&index_id).is_none());
scheduler.stop(true).await.unwrap();
}
// Tests for Deletion Logic
#[tokio::test]
async fn test_mark_index_deleted() {
let scheduler = Arc::new(LocalScheduler::new(3));
let dir = create_temp_dir("test-mark-index-deleted");
let dir_path = dir.path().display().to_string();
let builder = Fs::default().root(&dir_path);
let object_store = ObjectStore::new(builder).unwrap().finish();
let index_aux_path = dir.path().join("index_aux");
let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
.await
.unwrap();
let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
.await
.unwrap();
let layer = Arc::new(AccessLayer::new(
"table1",
PathType::Bare,
object_store,
puffin_mgr,
intm_mgr,
));
let file_purger = LocalFilePurger::new(scheduler.clone(), layer, None);
// Test marking a non-existent index as deleted
let file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
let index_id = RegionIndexId::new(file_id, 1);
let should_delete = file_purger.mark_index_deleted(&index_id);
assert!(should_delete); // Should delete immediately as no references exist
// Test marking an existing index as deleted
file_purger.add_index_reference(index_id);
let should_delete = file_purger.mark_index_deleted(&index_id);
assert!(!should_delete); // Should not delete as ref_count is 1
{
let info = file_purger.index_refs.get(&index_id).unwrap();
assert!(info.is_deleted);
assert_eq!(info.ref_count, 1);
} // info is dropped here
scheduler.stop(true).await.unwrap();
}
#[tokio::test]
async fn test_check_and_delete_index() {
common_telemetry::init_default_ut_logging();
let scheduler = Arc::new(LocalScheduler::new(3));
let dir = create_temp_dir("test-check-delete-index");
let dir_path = dir.path().display().to_string();
let builder = Fs::default().root(&dir_path);
let object_store = ObjectStore::new(builder).unwrap().finish();
let index_aux_path = dir.path().join("index_aux");
let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
.await
.unwrap();
let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
.await
.unwrap();
let layer = Arc::new(AccessLayer::new(
"table1",
PathType::Bare,
object_store.clone(),
puffin_mgr,
intm_mgr,
));
let file_purger = LocalFilePurger::new(scheduler.clone(), layer, None);
// Create an index file
let file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
let index_id = RegionIndexId::new(file_id, 1);
let index_path = location::index_file_path("table1", index_id, PathType::Bare);
object_store
.write(&index_path, vec![0; 4096])
.await
.unwrap();
// Create a file meta with the index
let file_meta = FileMeta {
region_id: file_id.region_id(),
file_id: file_id.file_id(),
time_range: FileTimeRange::default(),
level: 0,
file_size: 4096,
available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
indexes: vec![ColumnIndexMetadata {
column_id: 0,
created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
}],
index_file_size: 4096,
index_version: 1,
num_rows: 1024,
num_row_groups: 1,
sequence: NonZeroU64::new(4096),
partition_expr: None,
num_series: 0,
};
// Add reference and check that index is not deleted
file_purger.add_index_reference(index_id);
file_purger.check_and_delete_index(file_meta.clone(), 1);
// Index should still exist
assert!(object_store.exists(&index_path).await.unwrap());
// Remove reference and mark as deleted
file_purger.remove_index_reference(&index_id);
file_purger.check_and_delete_index(file_meta, 1);
// Wait for async deletion
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
// Index should be deleted
assert!(!object_store.exists(&index_path).await.unwrap());
scheduler.stop(true).await.unwrap();
}
// Tests for FileHandle Lifecycle Integration
#[tokio::test]
async fn test_file_handle_lifecycle_with_index() {
common_telemetry::init_default_ut_logging();
let scheduler = Arc::new(LocalScheduler::new(3));
let dir = create_temp_dir("test-file-handle-lifecycle");
let dir_path = dir.path().display().to_string();
let builder = Fs::default().root(&dir_path);
let object_store = ObjectStore::new(builder).unwrap().finish();
let index_aux_path = dir.path().join("index_aux");
let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
.await
.unwrap();
let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
.await
.unwrap();
let layer = Arc::new(AccessLayer::new(
"table1",
PathType::Bare,
object_store.clone(),
puffin_mgr,
intm_mgr,
));
let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer.clone(), None));
// Create a file with index
let file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
let index_id = RegionIndexId::new(file_id, 1);
let file_path = location::sst_file_path("table1", file_id, PathType::Bare);
let index_path = location::index_file_path("table1", index_id, PathType::Bare);
object_store.write(&file_path, vec![0; 4096]).await.unwrap();
object_store
.write(&index_path, vec![0; 4096])
.await
.unwrap();
let file_meta = FileMeta {
region_id: file_id.region_id(),
file_id: file_id.file_id(),
time_range: FileTimeRange::default(),
level: 0,
file_size: 4096,
available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
indexes: vec![ColumnIndexMetadata {
column_id: 0,
created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
}],
index_file_size: 4096,
index_version: 1,
num_rows: 1024,
num_row_groups: 1,
sequence: NonZeroU64::new(4096),
partition_expr: None,
num_series: 0,
};
// Create FileHandle - this should add a reference
let handle = FileHandle::new(file_meta.clone(), file_purger.clone());
// Check that reference was added
{
let info = file_purger.index_refs.get(&index_id).unwrap();
assert_eq!(info.ref_count, 1);
assert!(!info.is_deleted);
} // info is dropped here
// Clone the handle to increase reference count
let handle2 = handle.clone();
// Check that reference count is still 1 (only one FileHandleInner)
{
let info = file_purger.index_refs.get(&index_id).unwrap();
assert_eq!(info.ref_count, 1);
} // info is dropped here
// Mark file as deleted
handle.mark_deleted();
// Drop one handle - should not delete yet
drop(handle);
// File and index should still exist
assert!(object_store.exists(&file_path).await.unwrap());
assert!(object_store.exists(&index_path).await.unwrap());
// Drop the second handle - should delete now
drop(handle2);
// Wait for async deletion
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
// File and index should be deleted
assert!(!object_store.exists(&file_path).await.unwrap());
assert!(!object_store.exists(&index_path).await.unwrap());
scheduler.stop(true).await.unwrap();
}
#[tokio::test]
async fn test_update_index_version() {
common_telemetry::init_default_ut_logging();
let scheduler = Arc::new(LocalScheduler::new(3));
let dir = create_temp_dir("test-update-index-version");
let dir_path = dir.path().display().to_string();
let builder = Fs::default().root(&dir_path);
let object_store = ObjectStore::new(builder).unwrap().finish();
let index_aux_path = dir.path().join("index_aux");
let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
.await
.unwrap();
let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
.await
.unwrap();
let layer = Arc::new(AccessLayer::new(
"table1",
PathType::Bare,
object_store.clone(),
puffin_mgr,
intm_mgr,
));
let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer.clone(), None));
// Create a file with index version 1
let file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
let old_index_id = RegionIndexId::new(file_id, 1);
let new_index_id = RegionIndexId::new(file_id, 2);
let old_index_path = location::index_file_path("table1", old_index_id, PathType::Bare);
let new_index_path = location::index_file_path("table1", new_index_id, PathType::Bare);
object_store
.write(&old_index_path, vec![0; 4096])
.await
.unwrap();
object_store
.write(&new_index_path, vec![0; 4096])
.await
.unwrap();
let file_meta = FileMeta {
region_id: file_id.region_id(),
file_id: file_id.file_id(),
time_range: FileTimeRange::default(),
level: 0,
file_size: 4096,
available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
indexes: vec![ColumnIndexMetadata {
column_id: 0,
created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
}],
index_file_size: 4096,
index_version: 2, // New version
num_rows: 1024,
num_row_groups: 1,
sequence: NonZeroU64::new(4096),
partition_expr: None,
num_series: 0,
};
// Create FileHandle with new index version - this should add reference to new index
let handle = FileHandle::new(file_meta.clone(), file_purger.clone());
// Check that reference was added to new index
{
let info = file_purger.index_refs.get(&new_index_id).unwrap();
assert_eq!(info.ref_count, 1);
assert!(!info.is_deleted);
} // info is dropped here
// Update index - this should mark old index as deleted
file_purger.update_index(file_meta.clone(), 1);
// Check that old index is marked as deleted
{
let info = file_purger.index_refs.get(&old_index_id);
// Old index should not exist as it had no references
assert!(info.is_none());
} // info is dropped here
// New index should still have its reference
{
let info = file_purger.index_refs.get(&new_index_id).unwrap();
assert_eq!(info.ref_count, 1);
assert!(!info.is_deleted);
} // info is dropped here
// Drop the handle
drop(handle);
// Wait for async operations
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
// New index should still exist as it was the current one
assert!(object_store.exists(&new_index_path).await.unwrap());
scheduler.stop(true).await.unwrap();
}
// Tests for Edge Cases and Error Conditions
#[tokio::test]
async fn test_concurrent_index_operations() {
let scheduler = Arc::new(LocalScheduler::new(3));
let dir = create_temp_dir("test-concurrent-index-ops");
let dir_path = dir.path().display().to_string();
let builder = Fs::default().root(&dir_path);
let object_store = ObjectStore::new(builder).unwrap().finish();
let index_aux_path = dir.path().join("index_aux");
let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
.await
.unwrap();
let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
.await
.unwrap();
let layer = Arc::new(AccessLayer::new(
"table1",
PathType::Bare,
object_store,
puffin_mgr,
intm_mgr,
));
let file_purger = Arc::new(LocalFilePurger::new(scheduler, layer, None));
let file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
let index_id = RegionIndexId::new(file_id, 1);
let mut handles = vec![];
// Spawn multiple threads to add references concurrently
for _ in 0..10 {
let purger = file_purger.clone();
let idx = index_id;
let handle = tokio::spawn(async move {
purger.add_index_reference(idx);
});
handles.push(handle);
}
// Wait for all operations to complete
for handle in handles {
handle.await.unwrap();
}
// Check that all 10 references were added
{
let info = file_purger.index_refs.get(&index_id).unwrap();
assert_eq!(info.ref_count, 10);
assert!(!info.is_deleted);
} // info is dropped here
// Spawn multiple threads to remove references concurrently
let mut handles = vec![];
for _ in 0..10 {
let purger = file_purger.clone();
let idx = index_id;
let handle = tokio::spawn(async move {
purger.remove_index_reference(&idx);
});
handles.push(handle);
}
// Wait for all operations to complete
for handle in handles {
handle.await.unwrap();
}
// Index should be removed as ref_count is 0
assert!(file_purger.index_refs.get(&index_id).is_none());
}
#[tokio::test]
async fn test_multiple_file_handles_same_index() {
let scheduler = Arc::new(LocalScheduler::new(3));
let dir = create_temp_dir("test-multiple-handles-same-index");
let dir_path = dir.path().display().to_string();
let builder = Fs::default().root(&dir_path);
let object_store = ObjectStore::new(builder).unwrap().finish();
let index_aux_path = dir.path().join("index_aux");
let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
.await
.unwrap();
let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
.await
.unwrap();
let layer = Arc::new(AccessLayer::new(
"table1",
PathType::Bare,
object_store,
puffin_mgr,
intm_mgr,
));
let file_purger = Arc::new(LocalFilePurger::new(scheduler, layer, None));
let file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
let index_id = RegionIndexId::new(file_id, 1);
let file_meta = FileMeta {
region_id: file_id.region_id(),
file_id: file_id.file_id(),
time_range: FileTimeRange::default(),
level: 0,
file_size: 4096,
available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
indexes: vec![ColumnIndexMetadata {
column_id: 0,
created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
}],
index_file_size: 4096,
index_version: 1,
num_rows: 1024,
num_row_groups: 1,
sequence: NonZeroU64::new(4096),
partition_expr: None,
num_series: 0,
};
// Create multiple FileHandles for the same file
let handle1 = FileHandle::new(file_meta.clone(), file_purger.clone());
let handle2 = FileHandle::new(file_meta.clone(), file_purger.clone());
let handle3 = FileHandle::new(file_meta, file_purger.clone());
// Each FileHandle should add a reference
{
let info = file_purger.index_refs.get(&index_id).unwrap();
assert_eq!(info.ref_count, 3);
} // info is dropped here
// Mark all handles as deleted
handle1.mark_deleted();
handle2.mark_deleted();
handle3.mark_deleted();
// Drop one handle - should not delete yet
drop(handle1);
{
let info = file_purger.index_refs.get(&index_id).unwrap();
assert_eq!(info.ref_count, 2);
} // info is dropped here
// Drop another handle - should not delete yet
drop(handle2);
{
let info = file_purger.index_refs.get(&index_id).unwrap();
assert_eq!(info.ref_count, 1);
} // info is dropped here
// Drop the last handle - should delete now
drop(handle3);
// Index should be removed from map
assert!(file_purger.index_refs.get(&index_id).is_none());
}
#[tokio::test]
async fn test_error_handling_during_deletion() {
common_telemetry::init_default_ut_logging();
let scheduler = Arc::new(LocalScheduler::new(3));
let dir = create_temp_dir("test-error-handling-deletion");
let dir_path = dir.path().display().to_string();
let builder = Fs::default().root(&dir_path);
let object_store = ObjectStore::new(builder).unwrap().finish();
let index_aux_path = dir.path().join("index_aux");
let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
.await
.unwrap();
let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
.await
.unwrap();
let layer = Arc::new(AccessLayer::new(
"table1",
PathType::Bare,
object_store.clone(),
puffin_mgr,
intm_mgr,
));
let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer.clone(), None));
// Create a file with index
let file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
let index_id = RegionIndexId::new(file_id, 1);
let file_path = location::sst_file_path("table1", file_id, PathType::Bare);
let index_path = location::index_file_path("table1", index_id, PathType::Bare);
object_store.write(&file_path, vec![0; 4096]).await.unwrap();
object_store
.write(&index_path, vec![0; 4096])
.await
.unwrap();
let file_meta = FileMeta {
region_id: file_id.region_id(),
file_id: file_id.file_id(),
time_range: FileTimeRange::default(),
level: 0,
file_size: 4096,
available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
indexes: vec![ColumnIndexMetadata {
column_id: 0,
created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
}],
index_file_size: 4096,
index_version: 1,
num_rows: 1024,
num_row_groups: 1,
sequence: NonZeroU64::new(4096),
partition_expr: None,
num_series: 0,
};
// Create FileHandle
let handle = FileHandle::new(file_meta, file_purger.clone());
// Manually delete the index file to simulate an error during deletion
object_store.delete(&index_path).await.unwrap();
// Mark file as deleted and drop handle
handle.mark_deleted();
drop(handle);
// Wait for async deletion - should handle error gracefully
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
// File should still be deleted despite index deletion error
assert!(!object_store.exists(&file_path).await.unwrap());
scheduler.stop(true).await.unwrap();
}
// Tests for Integration with Storage Layer
#[tokio::test]
async fn test_local_file_system_integration() {
common_telemetry::init_default_ut_logging();
let scheduler = Arc::new(LocalScheduler::new(3));
let dir = create_temp_dir("test-local-fs-integration");
let dir_path = dir.path().display().to_string();
let builder = Fs::default().root(&dir_path);
let object_store = ObjectStore::new(builder).unwrap().finish();
let index_aux_path = dir.path().join("index_aux");
let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
.await
.unwrap();
let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
.await
.unwrap();
let layer = Arc::new(AccessLayer::new(
"table1",
PathType::Bare,
object_store.clone(),
puffin_mgr,
intm_mgr,
));
let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer.clone(), None));
// Create a file with index
let file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
let index_id = RegionIndexId::new(file_id, 1);
let file_path = location::sst_file_path("table1", file_id, PathType::Bare);
let index_path = location::index_file_path("table1", index_id, PathType::Bare);
object_store.write(&file_path, vec![0; 4096]).await.unwrap();
object_store
.write(&index_path, vec![0; 4096])
.await
.unwrap();
let file_meta = FileMeta {
region_id: file_id.region_id(),
file_id: file_id.file_id(),
time_range: FileTimeRange::default(),
level: 0,
file_size: 4096,
available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
indexes: vec![ColumnIndexMetadata {
column_id: 0,
created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
}],
index_file_size: 4096,
index_version: 1,
num_rows: 1024,
num_row_groups: 1,
sequence: NonZeroU64::new(4096),
partition_expr: None,
num_series: 0,
};
// Create FileHandle
let handle = FileHandle::new(file_meta, file_purger.clone());
// Verify files exist
assert!(object_store.exists(&file_path).await.unwrap());
assert!(object_store.exists(&index_path).await.unwrap());
// Mark file as deleted and drop handle
handle.mark_deleted();
drop(handle);
// Wait for async deletion
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
// Verify files are deleted
assert!(!object_store.exists(&file_path).await.unwrap());
assert!(!object_store.exists(&index_path).await.unwrap());
scheduler.stop(true).await.unwrap();
}
}