diff --git a/src/cmd/src/datanode/objbench.rs b/src/cmd/src/datanode/objbench.rs index 0c5262797c..211c8932cd 100644 --- a/src/cmd/src/datanode/objbench.rs +++ b/src/cmd/src/datanode/objbench.rs @@ -565,13 +565,7 @@ fn new_noop_file_purger() -> FilePurgerRef { struct Noop; impl FilePurger for Noop { fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool) {} - fn remove_index( - &self, - _file_meta: FileMeta, - _version: mito2::sst::file::IndexVersion, - _is_delete: bool, - ) { - } + fn update_index(&self, _file_meta: FileMeta, _version: store_api::storage::IndexVersion) {} } Arc::new(Noop) } diff --git a/src/mito2/src/cache/index/bloom_filter_index.rs b/src/mito2/src/cache/index/bloom_filter_index.rs index fc39192c98..707afb18a6 100644 --- a/src/mito2/src/cache/index/bloom_filter_index.rs +++ b/src/mito2/src/cache/index/bloom_filter_index.rs @@ -21,11 +21,10 @@ use async_trait::async_trait; use bytes::Bytes; use index::bloom_filter::error::Result; use index::bloom_filter::reader::{BloomFilterReadMetrics, BloomFilterReader}; -use store_api::storage::{ColumnId, FileId}; +use store_api::storage::{ColumnId, FileId, IndexVersion}; use crate::cache::index::{INDEX_METADATA_TYPE, IndexCache, PageKey}; use crate::metrics::{CACHE_HIT, CACHE_MISS}; -use crate::sst::file::IndexVersion; const INDEX_TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index"; diff --git a/src/mito2/src/cache/index/inverted_index.rs b/src/mito2/src/cache/index/inverted_index.rs index f99fa0218c..4c1b07126c 100644 --- a/src/mito2/src/cache/index/inverted_index.rs +++ b/src/mito2/src/cache/index/inverted_index.rs @@ -22,11 +22,10 @@ use bytes::Bytes; use index::inverted_index::error::Result; use index::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader}; use prost::Message; -use store_api::storage::FileId; +use store_api::storage::{FileId, IndexVersion}; use crate::cache::index::{INDEX_METADATA_TYPE, IndexCache, PageKey}; use crate::metrics::{CACHE_HIT, CACHE_MISS}; -use crate::sst::file::IndexVersion; const INDEX_TYPE_INVERTED_INDEX: &str = "inverted_index"; diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index bd12720011..af34503d0a 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -28,7 +28,7 @@ use serde::{Deserialize, Serialize}; use smallvec::SmallVec; use store_api::metadata::ColumnMetadata; use store_api::region_request::PathType; -use store_api::storage::{ColumnId, FileId, RegionId}; +use store_api::storage::{ColumnId, FileId, IndexVersion, RegionId}; use crate::access_layer::AccessLayerRef; use crate::cache::CacheManagerRef; @@ -82,8 +82,6 @@ pub type Level = u8; pub const MAX_LEVEL: Level = 2; /// Type to store index types for a column. pub type IndexTypes = SmallVec<[IndexType; 4]>; -/// Index version -pub type IndexVersion = u64; /// Cross-region file id. /// @@ -199,7 +197,7 @@ pub struct FileMeta { /// Version of the index file. /// Used to generate the index file name: "{file_id}.{index_version}.puffin". /// Default is 0 (which maps to "{file_id}.puffin" for compatibility). - pub index_version: u64, + pub index_version: IndexVersion, /// Number of rows in the file. /// /// For historical reasons, this field might be missing in old files. Thus diff --git a/src/mito2/src/sst/file_purger.rs b/src/mito2/src/sst/file_purger.rs index 8ef82728b6..710ba43677 100644 --- a/src/mito2/src/sst/file_purger.rs +++ b/src/mito2/src/sst/file_purger.rs @@ -16,12 +16,13 @@ use std::fmt; use std::sync::Arc; use common_telemetry::error; +use store_api::storage::IndexVersion; use crate::access_layer::AccessLayerRef; use crate::cache::CacheManagerRef; use crate::error::Result; use crate::schedule::scheduler::SchedulerRef; -use crate::sst::file::{FileMeta, IndexVersion, RegionIndexId, delete_files}; +use crate::sst::file::{FileMeta, RegionIndexId, delete_files}; use crate::sst::file_ref::FileReferenceManagerRef; /// A worker to delete files in background. @@ -31,9 +32,9 @@ pub trait FilePurger: Send + Sync + fmt::Debug { /// Otherwise, only the reference will be removed. fn remove_file(&self, file_meta: FileMeta, is_delete: bool); - /// Remove the index with given version. Notice that this only removes one index file - /// given by `version`, ignore the version in `file_meta`. - fn remove_index(&self, file_meta: FileMeta, version: IndexVersion, is_delete: bool); + /// Update index version of the file. The new `FileMeta` contains the updated index version. + /// `old_version` is the previous index version before update. + fn update_index(&self, file_meta: FileMeta, old_version: IndexVersion); /// Notify the purger of a new file created. /// This is useful for object store based storage, where we need to track the file references @@ -54,7 +55,7 @@ impl FilePurger for NoopFilePurger { // noop } - fn remove_index(&self, _file_meta: FileMeta, _version: IndexVersion, _is_delete: bool) { + fn update_index(&self, _file_meta: FileMeta, _old_version: IndexVersion) { // noop } } @@ -151,10 +152,14 @@ impl LocalFilePurger { } } - fn delete_index(&self, file_meta: FileMeta, version: IndexVersion) { + fn delete_index(&self, file_meta: FileMeta, old_version: IndexVersion) { + if file_meta.index_version == 0 { + // no index to delete + return; + } let sst_layer = self.sst_layer.clone(); if let Err(e) = self.scheduler.schedule(Box::pin(async move { - let index_id = RegionIndexId::new(file_meta.file_id(), version); + let index_id = RegionIndexId::new(file_meta.file_id(), old_version); if let Err(e) = sst_layer.delete_index(&index_id).await { error!(e; "Failed to delete index {:?} from storage", index_id); } @@ -171,10 +176,8 @@ impl FilePurger for LocalFilePurger { } } - fn remove_index(&self, file_meta: FileMeta, version: IndexVersion, is_delete: bool) { - if is_delete { - self.delete_index(file_meta, version); - } + fn update_index(&self, file_meta: FileMeta, old_version: IndexVersion) { + self.delete_index(file_meta, old_version); } } @@ -192,8 +195,9 @@ impl FilePurger for ObjectStoreFilePurger { // TODO(discord9): consider impl a .tombstone file to reduce files needed to list } - fn remove_index(&self, _file_meta: FileMeta, _version: IndexVersion, _is_delete: bool) { - // TODO(discord9): add index reference management for object store based storage + fn update_index(&self, _file_meta: FileMeta, _old_version: IndexVersion) { + // nothing need to do for object store + // as new file reference with new index version will be added when new `FileHandle` is created } fn new_file(&self, file_meta: &FileMeta) { diff --git a/src/mito2/src/sst/file_ref.rs b/src/mito2/src/sst/file_ref.rs index 5cb7dc8c88..a058a77e14 100644 --- a/src/mito2/src/sst/file_ref.rs +++ b/src/mito2/src/sst/file_ref.rs @@ -13,11 +13,12 @@ // limitations under the License. use std::collections::{HashMap, HashSet}; +use std::f64::consts::E; use std::sync::Arc; use common_telemetry::debug; use dashmap::{DashMap, Entry}; -use store_api::storage::{FileRef, FileRefsManifest, RegionId}; +use store_api::storage::{FileRef, FileRefsManifest, IndexVersion, RegionId}; use crate::error::Result; use crate::metrics::GC_REF_FILE_CNT; @@ -132,7 +133,11 @@ impl FileReferenceManager { let region_id = file_meta.region_id; let mut is_new = false; { - let file_ref = FileRef::new(file_meta.region_id, file_meta.file_id); + let file_ref = FileRef::new( + file_meta.region_id, + file_meta.file_id, + file_meta.index_version, + ); self.files_per_region .entry(region_id) .and_modify(|refs| { @@ -157,7 +162,7 @@ impl FileReferenceManager { /// If the reference count reaches zero, the file reference will be removed from the manager. pub fn remove_file(&self, file_meta: &FileMeta) { let region_id = file_meta.region_id; - let file_ref = FileRef::new(region_id, file_meta.file_id); + let file_ref = FileRef::new(region_id, file_meta.file_id, file_meta.index_version); let mut remove_table_entry = false; let mut remove_file_ref = false; @@ -246,13 +251,13 @@ mod tests { .get(&file_meta.region_id) .unwrap() .files, - HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)]) + HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id, 0), 1)]) ); file_ref_mgr.add_file(&file_meta); let expected_region_ref_manifest = - HashSet::from_iter([FileRef::new(file_meta.region_id, file_meta.file_id)]); + HashSet::from_iter([FileRef::new(file_meta.region_id, file_meta.file_id, 0)]); assert_eq!( file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(), @@ -265,7 +270,7 @@ mod tests { .get(&file_meta.region_id) .unwrap() .files, - HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 2)]) + HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id, 0), 2)]) ); assert_eq!( @@ -281,7 +286,7 @@ mod tests { .get(&file_meta.region_id) .unwrap() .files, - HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)]) + HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id, 0), 1)]) ); assert_eq!( diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index 22a6333ace..4886469bd5 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -778,6 +778,7 @@ impl IndexBuildTask { self.file_meta.available_indexes = output.build_available_indexes(); self.file_meta.indexes = output.build_indexes(); self.file_meta.index_file_size = output.file_size; + let old_index_version = self.file_meta.index_version; self.file_meta.index_version = new_index_version; let edit = RegionEdit { files_to_add: vec![self.file_meta.clone()], @@ -804,7 +805,7 @@ impl IndexBuildTask { // notify the file purger to remove the old index files if any if new_index_version > 0 { self.file_purger - .remove_index(self.file_meta.clone(), new_index_version - 1, true); + .update_index(self.file_meta.clone(), old_index_version); } Ok(edit) } diff --git a/src/store-api/src/storage.rs b/src/store-api/src/storage.rs index 2cafaf027c..36b28b511c 100644 --- a/src/store-api/src/storage.rs +++ b/src/store-api/src/storage.rs @@ -26,6 +26,6 @@ pub use datatypes::schema::{ }; pub use self::descriptors::*; -pub use self::file::{FileId, FileRef, FileRefsManifest, GcReport, ParseIdError}; +pub use self::file::{FileId, FileRef, FileRefsManifest, GcReport, IndexVersion, ParseIdError}; pub use self::requests::{ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector}; pub use self::types::{SequenceNumber, SequenceRange}; diff --git a/src/store-api/src/storage/file.rs b/src/store-api/src/storage/file.rs index e05a818066..75098519a2 100644 --- a/src/store-api/src/storage/file.rs +++ b/src/store-api/src/storage/file.rs @@ -24,6 +24,9 @@ use uuid::Uuid; use crate::ManifestVersion; use crate::storage::RegionId; +/// Index version, incremented when the index file is rebuilt. +pub type IndexVersion = u64; + #[derive(Debug, Snafu, PartialEq)] pub struct ParseIdError { source: uuid::Error, @@ -70,15 +73,21 @@ impl FromStr for FileId { } } +/// Indicating holding a `FileHandle` reference for a specific file&index in a region. #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct FileRef { pub region_id: RegionId, pub file_id: FileId, + pub index_version: IndexVersion, } impl FileRef { - pub fn new(region_id: RegionId, file_id: FileId) -> Self { - Self { region_id, file_id } + pub fn new(region_id: RegionId, file_id: FileId, index_version: u64) -> Self { + Self { + region_id, + file_id, + index_version, + } } }