Compare commits

...

7 Commits

Author SHA1 Message Date
discord9
122dfbf9f8 reviewing tests
Signed-off-by: discord9 <discord9@163.com>
2025-12-10 14:23:33 +08:00
discord9
3c7fa84442 chore
Signed-off-by: discord9 <discord9@163.com>
2025-12-10 14:17:51 +08:00
discord9
0cd368d1f6 feat: track index?
Signed-off-by: discord9 <discord9@163.com>
2025-12-10 14:09:10 +08:00
discord9
c82208dbc0 chore: unused
Signed-off-by: discord9 <discord9@163.com>
2025-12-09 19:19:55 +08:00
discord9
bbbe91e97a feat: better method
Signed-off-by: discord9 <discord9@163.com>
2025-12-09 19:19:55 +08:00
discord9
987e1b5a15 noop
Signed-off-by: discord9 <discord9@163.com>
2025-12-09 19:19:55 +08:00
discord9
9cac640b41 feat: delete index files properly
Signed-off-by: discord9 <discord9@163.com>
2025-12-09 19:19:55 +08:00
10 changed files with 1064 additions and 55 deletions

View File

@@ -565,6 +565,7 @@ fn new_noop_file_purger() -> FilePurgerRef {
struct Noop;
impl FilePurger for Noop {
fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool) {}
fn update_index(&self, _file_meta: FileMeta, _version: store_api::storage::IndexVersion) {}
}
Arc::new(Noop)
}

View File

@@ -228,22 +228,25 @@ impl AccessLayer {
// Delete all versions of the index file.
for version in 0..=index_file_id.version {
let path = location::index_file_path(
&self.table_dir,
RegionIndexId::new(index_file_id.file_id, version),
self.path_type,
);
self.object_store
.delete(&path)
.await
.context(DeleteIndexSnafu {
file_id: region_file_id.file_id(),
})?;
self.delete_index(&RegionIndexId::new(index_file_id.file_id, version))
.await?;
}
Ok(())
}
pub(crate) async fn delete_index(&self, region_index_id: &RegionIndexId) -> Result<()> {
let path = location::index_file_path(&self.table_dir, *region_index_id, self.path_type);
self.object_store
.delete(&path)
.await
.context(DeleteIndexSnafu {
file_id: region_index_id.file_id(),
})?;
Ok(())
}
/// Returns the directory of the region in the table.
pub fn build_region_dir(&self, region_id: RegionId) -> String {
region_dir_from_table_dir(&self.table_dir, region_id, self.path_type)

View File

@@ -21,11 +21,10 @@ use async_trait::async_trait;
use bytes::Bytes;
use index::bloom_filter::error::Result;
use index::bloom_filter::reader::{BloomFilterReadMetrics, BloomFilterReader};
use store_api::storage::{ColumnId, FileId};
use store_api::storage::{ColumnId, FileId, IndexVersion};
use crate::cache::index::{INDEX_METADATA_TYPE, IndexCache, PageKey};
use crate::metrics::{CACHE_HIT, CACHE_MISS};
use crate::sst::file::IndexVersion;
const INDEX_TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index";

View File

@@ -22,11 +22,10 @@ use bytes::Bytes;
use index::inverted_index::error::Result;
use index::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader};
use prost::Message;
use store_api::storage::FileId;
use store_api::storage::{FileId, IndexVersion};
use crate::cache::index::{INDEX_METADATA_TYPE, IndexCache, PageKey};
use crate::metrics::{CACHE_HIT, CACHE_MISS};
use crate::sst::file::IndexVersion;
const INDEX_TYPE_INVERTED_INDEX: &str = "inverted_index";

View File

@@ -28,7 +28,7 @@ use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
use store_api::metadata::ColumnMetadata;
use store_api::region_request::PathType;
use store_api::storage::{ColumnId, FileId, RegionId};
use store_api::storage::{ColumnId, FileId, IndexVersion, RegionId};
use crate::access_layer::AccessLayerRef;
use crate::cache::CacheManagerRef;
@@ -82,8 +82,6 @@ pub type Level = u8;
pub const MAX_LEVEL: Level = 2;
/// Type to store index types for a column.
pub type IndexTypes = SmallVec<[IndexType; 4]>;
/// Index version
pub type IndexVersion = u64;
/// Cross-region file id.
///
@@ -199,7 +197,7 @@ pub struct FileMeta {
/// Version of the index file.
/// Used to generate the index file name: "{file_id}.{index_version}.puffin".
/// Default is 0 (which maps to "{file_id}.puffin" for compatibility).
pub index_version: u64,
pub index_version: IndexVersion,
/// Number of rows in the file.
///
/// For historical reasons, this field might be missing in old files. Thus
@@ -540,38 +538,89 @@ pub async fn delete_files(
);
for (file_id, index_version) in file_ids {
if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache()) {
// Removes index file from the cache.
if delete_index {
write_cache
.remove(IndexKey::new(
region_id,
*file_id,
FileType::Puffin(*index_version),
))
.await;
}
purge_index_write_cache_stager(
region_id,
delete_index,
access_layer,
cache_manager,
file_id,
index_version,
)
.await;
}
Ok(())
}
// Remove the SST file from the cache.
/// Delete index file for a given SST file&index version.
pub async fn delete_index(
region_id: RegionId,
file_id: FileId,
index_version: u64,
access_layer: &AccessLayerRef,
cache_manager: &Option<CacheManagerRef>,
) -> crate::error::Result<()> {
if let Err(err) = access_layer
.delete_index(&RegionIndexId::new(
RegionFileId::new(region_id, file_id),
index_version,
))
.await
{
error!(err; "Failed to delete index file for {}/{}.{}",
region_id, file_id, index_version);
}
purge_index_write_cache_stager(
region_id,
true,
access_layer,
cache_manager,
&file_id,
&index_version,
)
.await;
Ok(())
}
pub async fn purge_index_write_cache_stager(
region_id: RegionId,
delete_index: bool,
access_layer: &Arc<crate::access_layer::AccessLayer>,
cache_manager: &Option<Arc<crate::cache::CacheManager>>,
file_id: &FileId,
index_version: &u64,
) {
if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache()) {
// Removes index file from the cache.
if delete_index {
write_cache
.remove(IndexKey::new(region_id, *file_id, FileType::Parquet))
.remove(IndexKey::new(
region_id,
*file_id,
FileType::Puffin(*index_version),
))
.await;
}
// Purges index content in the stager.
if let Err(e) = access_layer
.puffin_manager_factory()
.purge_stager(RegionIndexId::new(
RegionFileId::new(region_id, *file_id),
*index_version,
))
.await
{
error!(e; "Failed to purge stager with index file, file_id: {}, index_version: {}, region: {}",
file_id, index_version, region_id);
}
// Remove the SST file from the cache.
write_cache
.remove(IndexKey::new(region_id, *file_id, FileType::Parquet))
.await;
}
// Purges index content in the stager.
if let Err(e) = access_layer
.puffin_manager_factory()
.purge_stager(RegionIndexId::new(
RegionFileId::new(region_id, *file_id),
*index_version,
))
.await
{
error!(e; "Failed to purge stager with index file, file_id: {}, index_version: {}, region: {}",
file_id, index_version, region_id);
}
Ok(())
}
#[cfg(test)]

File diff suppressed because it is too large Load Diff

View File

@@ -132,7 +132,11 @@ impl FileReferenceManager {
let region_id = file_meta.region_id;
let mut is_new = false;
{
let file_ref = FileRef::new(file_meta.region_id, file_meta.file_id);
let file_ref = FileRef::new(
file_meta.region_id,
file_meta.file_id,
file_meta.index_version,
);
self.files_per_region
.entry(region_id)
.and_modify(|refs| {
@@ -157,7 +161,7 @@ impl FileReferenceManager {
/// If the reference count reaches zero, the file reference will be removed from the manager.
pub fn remove_file(&self, file_meta: &FileMeta) {
let region_id = file_meta.region_id;
let file_ref = FileRef::new(region_id, file_meta.file_id);
let file_ref = FileRef::new(region_id, file_meta.file_id, file_meta.index_version);
let mut remove_table_entry = false;
let mut remove_file_ref = false;
@@ -246,13 +250,13 @@ mod tests {
.get(&file_meta.region_id)
.unwrap()
.files,
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)])
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id, 0), 1)])
);
file_ref_mgr.add_file(&file_meta);
let expected_region_ref_manifest =
HashSet::from_iter([FileRef::new(file_meta.region_id, file_meta.file_id)]);
HashSet::from_iter([FileRef::new(file_meta.region_id, file_meta.file_id, 0)]);
assert_eq!(
file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(),
@@ -265,7 +269,7 @@ mod tests {
.get(&file_meta.region_id)
.unwrap()
.files,
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 2)])
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id, 0), 2)])
);
assert_eq!(
@@ -281,7 +285,7 @@ mod tests {
.get(&file_meta.region_id)
.unwrap()
.files,
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)])
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id, 0), 1)])
);
assert_eq!(

View File

@@ -778,6 +778,7 @@ impl IndexBuildTask {
self.file_meta.available_indexes = output.build_available_indexes();
self.file_meta.indexes = output.build_indexes();
self.file_meta.index_file_size = output.file_size;
let old_index_version = self.file_meta.index_version;
self.file_meta.index_version = new_index_version;
let edit = RegionEdit {
files_to_add: vec![self.file_meta.clone()],
@@ -801,6 +802,11 @@ impl IndexBuildTask {
self.file_meta.region_id,
self.reason.as_str()
);
// notify the file purger to remove the old index files if any
if new_index_version > 0 {
self.file_purger
.update_index(self.file_meta.clone(), old_index_version);
}
Ok(edit)
}
}

View File

@@ -26,6 +26,6 @@ pub use datatypes::schema::{
};
pub use self::descriptors::*;
pub use self::file::{FileId, FileRef, FileRefsManifest, GcReport, ParseIdError};
pub use self::file::{FileId, FileRef, FileRefsManifest, GcReport, IndexVersion, ParseIdError};
pub use self::requests::{ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
pub use self::types::{SequenceNumber, SequenceRange};

View File

@@ -24,6 +24,9 @@ use uuid::Uuid;
use crate::ManifestVersion;
use crate::storage::RegionId;
/// Index version, incremented when the index file is rebuilt.
pub type IndexVersion = u64;
#[derive(Debug, Snafu, PartialEq)]
pub struct ParseIdError {
source: uuid::Error,
@@ -70,15 +73,21 @@ impl FromStr for FileId {
}
}
/// Indicating holding a `FileHandle` reference for a specific file&index in a region.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct FileRef {
pub region_id: RegionId,
pub file_id: FileId,
pub index_version: IndexVersion,
}
impl FileRef {
pub fn new(region_id: RegionId, file_id: FileId) -> Self {
Self { region_id, file_id }
pub fn new(region_id: RegionId, file_id: FileId, index_version: u64) -> Self {
Self {
region_id,
file_id,
index_version,
}
}
}