feat: better method

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2025-12-09 12:04:22 +08:00
parent 987e1b5a15
commit bbbe91e97a
9 changed files with 48 additions and 39 deletions

View File

@@ -565,13 +565,7 @@ fn new_noop_file_purger() -> FilePurgerRef {
struct Noop;
impl FilePurger for Noop {
fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool) {}
fn remove_index(
&self,
_file_meta: FileMeta,
_version: mito2::sst::file::IndexVersion,
_is_delete: bool,
) {
}
fn update_index(&self, _file_meta: FileMeta, _version: store_api::storage::IndexVersion) {}
}
Arc::new(Noop)
}

View File

@@ -21,11 +21,10 @@ use async_trait::async_trait;
use bytes::Bytes;
use index::bloom_filter::error::Result;
use index::bloom_filter::reader::{BloomFilterReadMetrics, BloomFilterReader};
use store_api::storage::{ColumnId, FileId};
use store_api::storage::{ColumnId, FileId, IndexVersion};
use crate::cache::index::{INDEX_METADATA_TYPE, IndexCache, PageKey};
use crate::metrics::{CACHE_HIT, CACHE_MISS};
use crate::sst::file::IndexVersion;
const INDEX_TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index";

View File

@@ -22,11 +22,10 @@ use bytes::Bytes;
use index::inverted_index::error::Result;
use index::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader};
use prost::Message;
use store_api::storage::FileId;
use store_api::storage::{FileId, IndexVersion};
use crate::cache::index::{INDEX_METADATA_TYPE, IndexCache, PageKey};
use crate::metrics::{CACHE_HIT, CACHE_MISS};
use crate::sst::file::IndexVersion;
const INDEX_TYPE_INVERTED_INDEX: &str = "inverted_index";

View File

@@ -28,7 +28,7 @@ use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
use store_api::metadata::ColumnMetadata;
use store_api::region_request::PathType;
use store_api::storage::{ColumnId, FileId, RegionId};
use store_api::storage::{ColumnId, FileId, IndexVersion, RegionId};
use crate::access_layer::AccessLayerRef;
use crate::cache::CacheManagerRef;
@@ -82,8 +82,6 @@ pub type Level = u8;
pub const MAX_LEVEL: Level = 2;
/// Type to store index types for a column.
pub type IndexTypes = SmallVec<[IndexType; 4]>;
/// Index version
pub type IndexVersion = u64;
/// Cross-region file id.
///
@@ -199,7 +197,7 @@ pub struct FileMeta {
/// Version of the index file.
/// Used to generate the index file name: "{file_id}.{index_version}.puffin".
/// Default is 0 (which maps to "{file_id}.puffin" for compatibility).
pub index_version: u64,
pub index_version: IndexVersion,
/// Number of rows in the file.
///
/// For historical reasons, this field might be missing in old files. Thus

View File

@@ -16,12 +16,13 @@ use std::fmt;
use std::sync::Arc;
use common_telemetry::error;
use store_api::storage::IndexVersion;
use crate::access_layer::AccessLayerRef;
use crate::cache::CacheManagerRef;
use crate::error::Result;
use crate::schedule::scheduler::SchedulerRef;
use crate::sst::file::{FileMeta, IndexVersion, RegionIndexId, delete_files};
use crate::sst::file::{FileMeta, RegionIndexId, delete_files};
use crate::sst::file_ref::FileReferenceManagerRef;
/// A worker to delete files in background.
@@ -31,9 +32,9 @@ pub trait FilePurger: Send + Sync + fmt::Debug {
/// Otherwise, only the reference will be removed.
fn remove_file(&self, file_meta: FileMeta, is_delete: bool);
/// Remove the index with given version. Notice that this only removes one index file
/// given by `version`, ignore the version in `file_meta`.
fn remove_index(&self, file_meta: FileMeta, version: IndexVersion, is_delete: bool);
/// Update index version of the file. The new `FileMeta` contains the updated index version.
/// `old_version` is the previous index version before update.
fn update_index(&self, file_meta: FileMeta, old_version: IndexVersion);
/// Notify the purger of a new file created.
/// This is useful for object store based storage, where we need to track the file references
@@ -54,7 +55,7 @@ impl FilePurger for NoopFilePurger {
// noop
}
fn remove_index(&self, _file_meta: FileMeta, _version: IndexVersion, _is_delete: bool) {
fn update_index(&self, _file_meta: FileMeta, _old_version: IndexVersion) {
// noop
}
}
@@ -151,10 +152,14 @@ impl LocalFilePurger {
}
}
fn delete_index(&self, file_meta: FileMeta, version: IndexVersion) {
fn delete_index(&self, file_meta: FileMeta, old_version: IndexVersion) {
if file_meta.index_version == 0 {
// no index to delete
return;
}
let sst_layer = self.sst_layer.clone();
if let Err(e) = self.scheduler.schedule(Box::pin(async move {
let index_id = RegionIndexId::new(file_meta.file_id(), version);
let index_id = RegionIndexId::new(file_meta.file_id(), old_version);
if let Err(e) = sst_layer.delete_index(&index_id).await {
error!(e; "Failed to delete index {:?} from storage", index_id);
}
@@ -171,10 +176,8 @@ impl FilePurger for LocalFilePurger {
}
}
fn remove_index(&self, file_meta: FileMeta, version: IndexVersion, is_delete: bool) {
if is_delete {
self.delete_index(file_meta, version);
}
fn update_index(&self, file_meta: FileMeta, old_version: IndexVersion) {
self.delete_index(file_meta, old_version);
}
}
@@ -192,8 +195,9 @@ impl FilePurger for ObjectStoreFilePurger {
// TODO(discord9): consider impl a .tombstone file to reduce files needed to list
}
fn remove_index(&self, _file_meta: FileMeta, _version: IndexVersion, _is_delete: bool) {
// TODO(discord9): add index reference management for object store based storage
fn update_index(&self, _file_meta: FileMeta, _old_version: IndexVersion) {
// nothing need to do for object store
// as new file reference with new index version will be added when new `FileHandle` is created
}
fn new_file(&self, file_meta: &FileMeta) {

View File

@@ -13,11 +13,12 @@
// limitations under the License.
use std::collections::{HashMap, HashSet};
use std::f64::consts::E;
use std::sync::Arc;
use common_telemetry::debug;
use dashmap::{DashMap, Entry};
use store_api::storage::{FileRef, FileRefsManifest, RegionId};
use store_api::storage::{FileRef, FileRefsManifest, IndexVersion, RegionId};
use crate::error::Result;
use crate::metrics::GC_REF_FILE_CNT;
@@ -132,7 +133,11 @@ impl FileReferenceManager {
let region_id = file_meta.region_id;
let mut is_new = false;
{
let file_ref = FileRef::new(file_meta.region_id, file_meta.file_id);
let file_ref = FileRef::new(
file_meta.region_id,
file_meta.file_id,
file_meta.index_version,
);
self.files_per_region
.entry(region_id)
.and_modify(|refs| {
@@ -157,7 +162,7 @@ impl FileReferenceManager {
/// If the reference count reaches zero, the file reference will be removed from the manager.
pub fn remove_file(&self, file_meta: &FileMeta) {
let region_id = file_meta.region_id;
let file_ref = FileRef::new(region_id, file_meta.file_id);
let file_ref = FileRef::new(region_id, file_meta.file_id, file_meta.index_version);
let mut remove_table_entry = false;
let mut remove_file_ref = false;
@@ -246,13 +251,13 @@ mod tests {
.get(&file_meta.region_id)
.unwrap()
.files,
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)])
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id, 0), 1)])
);
file_ref_mgr.add_file(&file_meta);
let expected_region_ref_manifest =
HashSet::from_iter([FileRef::new(file_meta.region_id, file_meta.file_id)]);
HashSet::from_iter([FileRef::new(file_meta.region_id, file_meta.file_id, 0)]);
assert_eq!(
file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(),
@@ -265,7 +270,7 @@ mod tests {
.get(&file_meta.region_id)
.unwrap()
.files,
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 2)])
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id, 0), 2)])
);
assert_eq!(
@@ -281,7 +286,7 @@ mod tests {
.get(&file_meta.region_id)
.unwrap()
.files,
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)])
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id, 0), 1)])
);
assert_eq!(

View File

@@ -778,6 +778,7 @@ impl IndexBuildTask {
self.file_meta.available_indexes = output.build_available_indexes();
self.file_meta.indexes = output.build_indexes();
self.file_meta.index_file_size = output.file_size;
let old_index_version = self.file_meta.index_version;
self.file_meta.index_version = new_index_version;
let edit = RegionEdit {
files_to_add: vec![self.file_meta.clone()],
@@ -804,7 +805,7 @@ impl IndexBuildTask {
// notify the file purger to remove the old index files if any
if new_index_version > 0 {
self.file_purger
.remove_index(self.file_meta.clone(), new_index_version - 1, true);
.update_index(self.file_meta.clone(), old_index_version);
}
Ok(edit)
}

View File

@@ -26,6 +26,6 @@ pub use datatypes::schema::{
};
pub use self::descriptors::*;
pub use self::file::{FileId, FileRef, FileRefsManifest, GcReport, ParseIdError};
pub use self::file::{FileId, FileRef, FileRefsManifest, GcReport, IndexVersion, ParseIdError};
pub use self::requests::{ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
pub use self::types::{SequenceNumber, SequenceRange};

View File

@@ -24,6 +24,9 @@ use uuid::Uuid;
use crate::ManifestVersion;
use crate::storage::RegionId;
/// Index version, incremented when the index file is rebuilt.
pub type IndexVersion = u64;
#[derive(Debug, Snafu, PartialEq)]
pub struct ParseIdError {
source: uuid::Error,
@@ -70,15 +73,21 @@ impl FromStr for FileId {
}
}
/// Indicating holding a `FileHandle` reference for a specific file&index in a region.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct FileRef {
pub region_id: RegionId,
pub file_id: FileId,
pub index_version: IndexVersion,
}
impl FileRef {
pub fn new(region_id: RegionId, file_id: FileId) -> Self {
Self { region_id, file_id }
pub fn new(region_id: RegionId, file_id: FileId, index_version: u64) -> Self {
Self {
region_id,
file_id,
index_version,
}
}
}