From 9197e818ecf7b5731be8ae6683d8e4a693d927de Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Tue, 9 Dec 2025 15:31:12 +0800 Subject: [PATCH] refactor: use versioned index for index file (#7309) * refactor: use versioned index for index file Signed-off-by: discord9 * fix: sst entry table Signed-off-by: discord9 * update sqlness Signed-off-by: discord9 * chore: unit type Signed-off-by: discord9 * fix: missing version Signed-off-by: discord9 * more fix build index Signed-off-by: discord9 * fix: use proper index id Signed-off-by: discord9 * pcr Signed-off-by: discord9 * test: update Signed-off-by: discord9 * clippy Signed-off-by: discord9 * test: test_list_ssts fixed Signed-off-by: discord9 * test: fix test Signed-off-by: discord9 * feat: stuff Signed-off-by: discord9 * fix: clean temp index file on abort&delete all index version when delete file Signed-off-by: discord9 * docs: explain Signed-off-by: discord9 * fix: actually clean up tmp dir Signed-off-by: discord9 * clippy Signed-off-by: discord9 * clean tmp dir only when write cache enabled Signed-off-by: discord9 * refactor: add version to index cache Signed-off-by: discord9 * per review Signed-off-by: discord9 * test: update size Signed-off-by: discord9 * per review Signed-off-by: discord9 --------- Signed-off-by: discord9 --- src/cmd/src/datanode/objbench.rs | 2 +- src/metric-engine/src/engine/flush.rs | 14 +-- src/mito2/src/access_layer.rs | 51 ++++++-- src/mito2/src/cache.rs | 55 ++++----- src/mito2/src/cache/file_cache.rs | 50 +++++--- .../src/cache/index/bloom_filter_index.rs | 39 +++--- src/mito2/src/cache/index/inverted_index.rs | 36 ++++-- src/mito2/src/cache/write_cache.rs | 11 +- src/mito2/src/compaction/compactor.rs | 2 +- src/mito2/src/compaction/test_util.rs | 2 +- src/mito2/src/engine.rs | 19 +-- src/mito2/src/engine/basic_test.rs | 20 ++-- src/mito2/src/engine/index_build_test.rs | 4 +- src/mito2/src/engine/puffin_index.rs | 22 ++-- src/mito2/src/flush.rs | 2 +- src/mito2/src/gc.rs | 9 +- src/mito2/src/manifest/tests/checkpoint.rs | 4 +- src/mito2/src/read/scan_region.rs | 12 ++ src/mito2/src/region.rs | 11 +- src/mito2/src/region/opener.rs | 18 ++- src/mito2/src/remap_manifest.rs | 2 +- src/mito2/src/sst/file.rs | 105 ++++++++++------ src/mito2/src/sst/file_purger.rs | 10 +- src/mito2/src/sst/file_ref.rs | 2 +- src/mito2/src/sst/index.rs | 83 +++++++++---- .../src/sst/index/bloom_filter/applier.rs | 21 ++-- .../src/sst/index/bloom_filter/creator.rs | 7 +- .../src/sst/index/fulltext_index/applier.rs | 27 +++-- .../src/sst/index/fulltext_index/creator.rs | 14 +-- src/mito2/src/sst/index/indexer/abort.rs | 19 +++ src/mito2/src/sst/index/indexer/finish.rs | 10 +- .../src/sst/index/inverted_index/applier.rs | 26 ++-- .../src/sst/index/inverted_index/creator.rs | 7 +- src/mito2/src/sst/index/puffin_manager.rs | 34 ++++-- src/mito2/src/sst/index/store.rs | 4 + src/mito2/src/sst/location.rs | 112 ++++++++++++++++-- src/mito2/src/sst/parquet.rs | 14 ++- src/mito2/src/sst/parquet/reader.rs | 8 +- src/mito2/src/sst/parquet/writer.rs | 4 +- src/mito2/src/test_util/sst_util.rs | 2 +- src/mito2/src/test_util/version_util.rs | 4 +- src/mito2/src/worker/handle_manifest.rs | 6 +- src/mito2/src/worker/handle_rebuild_index.rs | 6 +- .../src/puffin_manager/fs_puffin_manager.rs | 4 + src/store-api/src/sst_entry.rs | 22 ++-- .../common/information_schema/ssts.result | 34 +++--- .../common/system/information_schema.result | 2 +- 47 files changed, 649 insertions(+), 323 deletions(-) diff --git a/src/cmd/src/datanode/objbench.rs b/src/cmd/src/datanode/objbench.rs index a8ff8b4daf..bdb12bcbcb 100644 --- a/src/cmd/src/datanode/objbench.rs +++ b/src/cmd/src/datanode/objbench.rs @@ -163,7 +163,7 @@ impl ObjbenchCommand { available_indexes: Default::default(), indexes: Default::default(), index_file_size: 0, - index_file_id: None, + index_version: 0, num_rows, num_row_groups, sequence: None, diff --git a/src/metric-engine/src/engine/flush.rs b/src/metric-engine/src/engine/flush.rs index cdc11db852..02726fc8be 100644 --- a/src/metric-engine/src/engine/flush.rs +++ b/src/metric-engine/src/engine/flush.rs @@ -119,7 +119,7 @@ mod tests { .index_file_path .map(|path| path.replace(&e.file_id, "")); e.file_id = "".to_string(); - e.index_file_id = e.index_file_id.map(|_| "".to_string()); + e.index_version = 0; format!("\n{:?}", e) }) .sorted() @@ -128,12 +128,12 @@ mod tests { assert_eq!( debug_format, r#" -ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_id: Some(""), level: 0, file_path: "test_metric_region/11_0000000001/data/.parquet", file_size: 3217, index_file_path: Some("test_metric_region/11_0000000001/data/index/.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(20), origin_region_id: 47244640257(11, 1), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "", index_file_id: Some(""), level: 0, file_path: "test_metric_region/11_0000000002/data/.parquet", file_size: 3217, index_file_path: Some("test_metric_region/11_0000000002/data/index/.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417473(11, 16777217), table_id: 11, region_number: 16777217, region_group: 1, region_sequence: 1, file_id: "", index_file_id: None, level: 0, file_path: "test_metric_region/11_0000000001/metadata/.parquet", file_size: 3487, index_file_path: None, index_file_size: None, num_rows: 8, num_row_groups: 1, num_series: Some(8), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(8), origin_region_id: 47261417473(11, 16777217), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "", index_file_id: None, level: 0, file_path: "test_metric_region/11_0000000002/metadata/.parquet", file_size: 3471, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 47261417474(11, 16777218), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "", index_file_id: Some(""), level: 0, file_path: "test_metric_region/22_0000000042/data/.parquet", file_size: 3217, index_file_path: Some("test_metric_region/22_0000000042/data/index/.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "", index_file_id: None, level: 0, file_path: "test_metric_region/22_0000000042/metadata/.parquet", file_size: 3471, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 94506057770(22, 16777258), node_id: None, visible: true }"# +ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000001/data/.parquet", file_size: 3217, index_file_path: Some("test_metric_region/11_0000000001/data/index/.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(20), origin_region_id: 47244640257(11, 1), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000002/data/.parquet", file_size: 3217, index_file_path: Some("test_metric_region/11_0000000002/data/index/.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417473(11, 16777217), table_id: 11, region_number: 16777217, region_group: 1, region_sequence: 1, file_id: "", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000001/metadata/.parquet", file_size: 3487, index_file_path: None, index_file_size: None, num_rows: 8, num_row_groups: 1, num_series: Some(8), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(8), origin_region_id: 47261417473(11, 16777217), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000002/metadata/.parquet", file_size: 3471, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 47261417474(11, 16777218), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "", index_version: 0, level: 0, file_path: "test_metric_region/22_0000000042/data/.parquet", file_size: 3217, index_file_path: Some("test_metric_region/22_0000000042/data/index/.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "", index_version: 0, level: 0, file_path: "test_metric_region/22_0000000042/metadata/.parquet", file_size: 3471, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 94506057770(22, 16777258), node_id: None, visible: true }"#, ); // list from storage let storage_entries = mito diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index b6891d7410..ea7cbaca97 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -37,7 +37,7 @@ use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu use crate::metrics::{COMPACTION_STAGE_ELAPSED, FLUSH_ELAPSED}; use crate::read::{FlatSource, Source}; use crate::region::options::IndexOptions; -use crate::sst::file::{FileHandle, RegionFileId}; +use crate::sst::file::{FileHandle, RegionFileId, RegionIndexId}; use crate::sst::index::IndexerBuilderImpl; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::{PuffinManagerFactory, SstPuffinManager}; @@ -216,7 +216,7 @@ impl AccessLayer { pub(crate) async fn delete_sst( &self, region_file_id: &RegionFileId, - index_file_id: &RegionFileId, + index_file_id: &RegionIndexId, ) -> Result<()> { let path = location::sst_file_path(&self.table_dir, *region_file_id, self.path_type); self.object_store @@ -226,13 +226,20 @@ impl AccessLayer { file_id: region_file_id.file_id(), })?; - let path = location::index_file_path(&self.table_dir, *index_file_id, self.path_type); - self.object_store - .delete(&path) - .await - .context(DeleteIndexSnafu { - file_id: region_file_id.file_id(), - })?; + // Delete all versions of the index file. + for version in 0..=index_file_id.version { + let path = location::index_file_path( + &self.table_dir, + RegionIndexId::new(index_file_id.file_id, version), + self.path_type, + ); + self.object_store + .delete(&path) + .await + .context(DeleteIndexSnafu { + file_id: region_file_id.file_id(), + })?; + } Ok(()) } @@ -291,6 +298,7 @@ impl AccessLayer { puffin_manager: self .puffin_manager_factory .build(store, path_provider.clone()), + write_cache_enabled: false, intermediate_manager: self.intermediate_manager.clone(), index_options: request.index_options, inverted_index_config: request.inverted_index_config, @@ -468,9 +476,10 @@ impl TempFileCleaner { } /// Removes the SST and index file from the local atomic dir by the file id. + /// This only removes the initial index, since the index version is always 0 for a new SST, this method should be safe to pass 0. pub(crate) async fn clean_by_file_id(&self, file_id: FileId) { let sst_key = IndexKey::new(self.region_id, file_id, FileType::Parquet).to_string(); - let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin).to_string(); + let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin(0)).to_string(); Self::clean_atomic_dir_files(&self.object_store, &[&sst_key, &index_key]).await; } @@ -553,9 +562,12 @@ async fn clean_dir(dir: &str) -> Result<()> { /// Path provider for SST file and index file. pub trait FilePathProvider: Send + Sync { - /// Creates index file path of given file id. + /// Creates index file path of given file id. Version default to 0, and not shown in the path. fn build_index_file_path(&self, file_id: RegionFileId) -> String; + /// Creates index file path of given index id (with version support). + fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String; + /// Creates SST file path of given file id. fn build_sst_file_path(&self, file_id: RegionFileId) -> String; } @@ -575,7 +587,16 @@ impl WriteCachePathProvider { impl FilePathProvider for WriteCachePathProvider { fn build_index_file_path(&self, file_id: RegionFileId) -> String { - let puffin_key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Puffin); + let puffin_key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Puffin(0)); + self.file_cache.cache_file_path(puffin_key) + } + + fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String { + let puffin_key = IndexKey::new( + index_id.region_id(), + index_id.file_id(), + FileType::Puffin(index_id.version), + ); self.file_cache.cache_file_path(puffin_key) } @@ -605,7 +626,11 @@ impl RegionFilePathFactory { impl FilePathProvider for RegionFilePathFactory { fn build_index_file_path(&self, file_id: RegionFileId) -> String { - location::index_file_path(&self.table_dir, file_id, self.path_type) + location::index_file_path_legacy(&self.table_dir, file_id, self.path_type) + } + + fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String { + location::index_file_path(&self.table_dir, index_id, self.path_type) } fn build_sst_file_path(&self, file_id: RegionFileId) -> String { diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index 4e7e842da9..ba0375f4cb 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -44,7 +44,7 @@ use crate::cache::index::inverted_index::{InvertedIndexCache, InvertedIndexCache use crate::cache::write_cache::WriteCacheRef; use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS}; use crate::read::Batch; -use crate::sst::file::RegionFileId; +use crate::sst::file::{RegionFileId, RegionIndexId}; use crate::sst::parquet::reader::MetadataCacheMetrics; /// Metrics type key for sst meta. @@ -180,7 +180,7 @@ impl CacheStrategy { } /// Calls [CacheManager::evict_puffin_cache()]. - pub async fn evict_puffin_cache(&self, file_id: RegionFileId) { + pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) { match self { CacheStrategy::EnableAll(cache_manager) => { cache_manager.evict_puffin_cache(file_id).await @@ -400,7 +400,7 @@ impl CacheManager { } /// Evicts every puffin-related cache entry for the given file. - pub async fn evict_puffin_cache(&self, file_id: RegionFileId) { + pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) { if let Some(cache) = &self.bloom_filter_index_cache { cache.invalidate_file(file_id.file_id()); } @@ -422,7 +422,7 @@ impl CacheManager { .remove(IndexKey::new( file_id.region_id(), file_id.file_id(), - FileType::Puffin, + FileType::Puffin(file_id.version), )) .await; } @@ -949,7 +949,7 @@ mod tests { let cache = Arc::new(cache); let region_id = RegionId::new(1, 1); - let region_file_id = RegionFileId::new(region_id, FileId::random()); + let index_id = RegionIndexId::new(RegionFileId::new(region_id, FileId::random()), 0); let column_id: ColumnId = 1; let bloom_cache = cache.bloom_filter_index_cache().unwrap().clone(); @@ -957,16 +957,21 @@ mod tests { let result_cache = cache.index_result_cache().unwrap(); let puffin_metadata_cache = cache.puffin_metadata_cache().unwrap().clone(); - let bloom_key = (region_file_id.file_id(), column_id, Tag::Skipping); + let bloom_key = ( + index_id.file_id(), + index_id.version, + column_id, + Tag::Skipping, + ); bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default())); inverted_cache.put_metadata( - region_file_id.file_id(), + (index_id.file_id(), index_id.version), Arc::new(InvertedIndexMetas::default()), ); let predicate = PredicateKey::new_bloom(Arc::new(BTreeMap::new())); let selection = Arc::new(RowGroupSelection::default()); - result_cache.put(predicate.clone(), region_file_id.file_id(), selection); - let file_id_str = region_file_id.to_string(); + result_cache.put(predicate.clone(), index_id.file_id(), selection); + let file_id_str = index_id.to_string(); let metadata = Arc::new(FileMetadata { blobs: Vec::new(), properties: HashMap::new(), @@ -976,40 +981,32 @@ mod tests { assert!(bloom_cache.get_metadata(bloom_key).is_some()); assert!( inverted_cache - .get_metadata(region_file_id.file_id()) - .is_some() - ); - assert!( - result_cache - .get(&predicate, region_file_id.file_id()) + .get_metadata((index_id.file_id(), index_id.version)) .is_some() ); + assert!(result_cache.get(&predicate, index_id.file_id()).is_some()); assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_some()); - cache.evict_puffin_cache(region_file_id).await; + cache.evict_puffin_cache(index_id).await; assert!(bloom_cache.get_metadata(bloom_key).is_none()); assert!( inverted_cache - .get_metadata(region_file_id.file_id()) - .is_none() - ); - assert!( - result_cache - .get(&predicate, region_file_id.file_id()) + .get_metadata((index_id.file_id(), index_id.version)) .is_none() ); + assert!(result_cache.get(&predicate, index_id.file_id()).is_none()); assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none()); // Refill caches and evict via CacheStrategy to ensure delegation works. bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default())); inverted_cache.put_metadata( - region_file_id.file_id(), + (index_id.file_id(), index_id.version), Arc::new(InvertedIndexMetas::default()), ); result_cache.put( predicate.clone(), - region_file_id.file_id(), + index_id.file_id(), Arc::new(RowGroupSelection::default()), ); puffin_metadata_cache.put_metadata( @@ -1021,19 +1018,15 @@ mod tests { ); let strategy = CacheStrategy::EnableAll(cache.clone()); - strategy.evict_puffin_cache(region_file_id).await; + strategy.evict_puffin_cache(index_id).await; assert!(bloom_cache.get_metadata(bloom_key).is_none()); assert!( inverted_cache - .get_metadata(region_file_id.file_id()) - .is_none() - ); - assert!( - result_cache - .get(&predicate, region_file_id.file_id()) + .get_metadata((index_id.file_id(), index_id.version)) .is_none() ); + assert!(result_cache.get(&predicate, index_id.file_id()).is_none()); assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none()); } } diff --git a/src/mito2/src/cache/file_cache.rs b/src/mito2/src/cache/file_cache.rs index a211597602..a7fe1dfb35 100644 --- a/src/mito2/src/cache/file_cache.rs +++ b/src/mito2/src/cache/file_cache.rs @@ -71,7 +71,7 @@ impl FileCacheInner { fn memory_index(&self, file_type: FileType) -> &Cache { match file_type { FileType::Parquet => &self.parquet_index, - FileType::Puffin => &self.puffin_index, + FileType::Puffin { .. } => &self.puffin_index, } } @@ -130,7 +130,7 @@ impl FileCacheInner { // Track sizes separately for each file type match key.file_type { FileType::Parquet => parquet_size += size, - FileType::Puffin => puffin_size += size, + FileType::Puffin { .. } => puffin_size += size, } } // The metrics is a signed int gauge so we can updates it finally. @@ -178,7 +178,7 @@ impl FileCacheInner { let timer = WRITE_CACHE_DOWNLOAD_ELAPSED .with_label_values(&[match file_type { FileType::Parquet => "download_parquet", - FileType::Puffin => "download_puffin", + FileType::Puffin { .. } => "download_puffin", }]) .start_timer(); @@ -607,7 +607,7 @@ impl fmt::Display for IndexKey { "{}.{}.{}", self.region_id.as_u64(), self.file_id, - self.file_type.as_str() + self.file_type ) } } @@ -618,7 +618,16 @@ pub enum FileType { /// Parquet file. Parquet, /// Puffin file. - Puffin, + Puffin(u64), +} + +impl fmt::Display for FileType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + FileType::Parquet => write!(f, "parquet"), + FileType::Puffin(version) => write!(f, "{}.puffin", version), + } + } } impl FileType { @@ -626,16 +635,16 @@ impl FileType { fn parse(s: &str) -> Option { match s { "parquet" => Some(FileType::Parquet), - "puffin" => Some(FileType::Puffin), - _ => None, - } - } - - /// Converts the file type to string. - fn as_str(&self) -> &'static str { - match self { - FileType::Parquet => "parquet", - FileType::Puffin => "puffin", + "puffin" => Some(FileType::Puffin(0)), + _ => { + // if post-fix with .puffin, try to parse the version + if let Some(version_str) = s.strip_suffix(".puffin") { + let version = version_str.parse::().ok()?; + Some(FileType::Puffin(version)) + } else { + None + } + } } } @@ -643,7 +652,7 @@ impl FileType { fn metric_label(&self) -> &'static str { match self { FileType::Parquet => FILE_TYPE, - FileType::Puffin => INDEX_TYPE, + FileType::Puffin(_) => INDEX_TYPE, } } } @@ -921,6 +930,15 @@ mod tests { IndexKey::new(region_id, file_id, FileType::Parquet), parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet").unwrap() ); + assert_eq!( + IndexKey::new(region_id, file_id, FileType::Puffin(0)), + parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.puffin").unwrap() + ); + assert_eq!( + IndexKey::new(region_id, file_id, FileType::Puffin(42)), + parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.42.puffin") + .unwrap() + ); assert!(parse_index_key("").is_none()); assert!(parse_index_key(".").is_none()); assert!(parse_index_key("5299989643269").is_none()); diff --git a/src/mito2/src/cache/index/bloom_filter_index.rs b/src/mito2/src/cache/index/bloom_filter_index.rs index 0dbb6c403e..fc39192c98 100644 --- a/src/mito2/src/cache/index/bloom_filter_index.rs +++ b/src/mito2/src/cache/index/bloom_filter_index.rs @@ -25,6 +25,7 @@ use store_api::storage::{ColumnId, FileId}; use crate::cache::index::{INDEX_METADATA_TYPE, IndexCache, PageKey}; use crate::metrics::{CACHE_HIT, CACHE_MISS}; +use crate::sst::file::IndexVersion; const INDEX_TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index"; @@ -35,8 +36,10 @@ pub enum Tag { Fulltext, } +pub type BloomFilterIndexKey = (FileId, IndexVersion, ColumnId, Tag); + /// Cache for bloom filter index. -pub type BloomFilterIndexCache = IndexCache<(FileId, ColumnId, Tag), BloomFilterMeta>; +pub type BloomFilterIndexCache = IndexCache; pub type BloomFilterIndexCacheRef = Arc; impl BloomFilterIndexCache { @@ -59,11 +62,9 @@ impl BloomFilterIndexCache { } /// Calculates weight for bloom filter index metadata. -fn bloom_filter_index_metadata_weight( - k: &(FileId, ColumnId, Tag), - meta: &Arc, -) -> u32 { +fn bloom_filter_index_metadata_weight(k: &BloomFilterIndexKey, meta: &Arc) -> u32 { let base = k.0.as_bytes().len() + + std::mem::size_of::() + std::mem::size_of::() + std::mem::size_of::() + std::mem::size_of::(); @@ -75,16 +76,14 @@ fn bloom_filter_index_metadata_weight( } /// Calculates weight for bloom filter index content. -fn bloom_filter_index_content_weight( - (k, _): &((FileId, ColumnId, Tag), PageKey), - v: &Bytes, -) -> u32 { +fn bloom_filter_index_content_weight((k, _): &(BloomFilterIndexKey, PageKey), v: &Bytes) -> u32 { (k.0.as_bytes().len() + std::mem::size_of::() + v.len()) as u32 } /// Bloom filter index blob reader with cache. pub struct CachedBloomFilterIndexBlobReader { file_id: FileId, + index_version: IndexVersion, column_id: ColumnId, tag: Tag, blob_size: u64, @@ -96,6 +95,7 @@ impl CachedBloomFilterIndexBlobReader { /// Creates a new bloom filter index blob reader with cache. pub fn new( file_id: FileId, + index_version: IndexVersion, column_id: ColumnId, tag: Tag, blob_size: u64, @@ -104,6 +104,7 @@ impl CachedBloomFilterIndexBlobReader { ) -> Self { Self { file_id, + index_version, column_id, tag, blob_size, @@ -126,7 +127,7 @@ impl BloomFilterReader for CachedBloomFilterIndexBl let (result, cache_metrics) = self .cache .get_or_load( - (self.file_id, self.column_id, self.tag), + (self.file_id, self.index_version, self.column_id, self.tag), self.blob_size, offset, size, @@ -161,7 +162,7 @@ impl BloomFilterReader for CachedBloomFilterIndexBl let (page, cache_metrics) = self .cache .get_or_load( - (self.file_id, self.column_id, self.tag), + (self.file_id, self.index_version, self.column_id, self.tag), self.blob_size, range.start, (range.end - range.start) as u32, @@ -191,9 +192,9 @@ impl BloomFilterReader for CachedBloomFilterIndexBl &self, metrics: Option<&mut BloomFilterReadMetrics>, ) -> Result { - if let Some(cached) = self - .cache - .get_metadata((self.file_id, self.column_id, self.tag)) + if let Some(cached) = + self.cache + .get_metadata((self.file_id, self.index_version, self.column_id, self.tag)) { CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc(); if let Some(m) = metrics { @@ -203,7 +204,7 @@ impl BloomFilterReader for CachedBloomFilterIndexBl } else { let meta = self.inner.metadata(metrics).await?; self.cache.put_metadata( - (self.file_id, self.column_id, self.tag), + (self.file_id, self.index_version, self.column_id, self.tag), Arc::new(meta.clone()), ); CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc(); @@ -223,6 +224,7 @@ mod test { #[test] fn bloom_filter_metadata_weight_counts_vec_contents() { let file_id = FileId::parse_str("00000000-0000-0000-0000-000000000001").unwrap(); + let version = 0; let column_id: ColumnId = 42; let tag = Tag::Skipping; @@ -246,10 +248,13 @@ mod test { ], }; - let weight = - bloom_filter_index_metadata_weight(&(file_id, column_id, tag), &Arc::new(meta.clone())); + let weight = bloom_filter_index_metadata_weight( + &(file_id, version, column_id, tag), + &Arc::new(meta.clone()), + ); let base = file_id.as_bytes().len() + + std::mem::size_of::() + std::mem::size_of::() + std::mem::size_of::() + std::mem::size_of::(); diff --git a/src/mito2/src/cache/index/inverted_index.rs b/src/mito2/src/cache/index/inverted_index.rs index 79509d0796..f99fa0218c 100644 --- a/src/mito2/src/cache/index/inverted_index.rs +++ b/src/mito2/src/cache/index/inverted_index.rs @@ -26,11 +26,12 @@ use store_api::storage::FileId; use crate::cache::index::{INDEX_METADATA_TYPE, IndexCache, PageKey}; use crate::metrics::{CACHE_HIT, CACHE_MISS}; +use crate::sst::file::IndexVersion; const INDEX_TYPE_INVERTED_INDEX: &str = "inverted_index"; /// Cache for inverted index. -pub type InvertedIndexCache = IndexCache; +pub type InvertedIndexCache = IndexCache<(FileId, IndexVersion), InvertedIndexMetas>; pub type InvertedIndexCacheRef = Arc; impl InvertedIndexCache { @@ -48,23 +49,24 @@ impl InvertedIndexCache { /// Removes all cached entries for the given `file_id`. pub fn invalidate_file(&self, file_id: FileId) { - self.invalidate_if(move |key| *key == file_id); + self.invalidate_if(move |key| key.0 == file_id); } } /// Calculates weight for inverted index metadata. -fn inverted_index_metadata_weight(k: &FileId, v: &Arc) -> u32 { - (k.as_bytes().len() + v.encoded_len()) as u32 +fn inverted_index_metadata_weight(k: &(FileId, IndexVersion), v: &Arc) -> u32 { + (k.0.as_bytes().len() + size_of::() + v.encoded_len()) as u32 } /// Calculates weight for inverted index content. -fn inverted_index_content_weight((k, _): &(FileId, PageKey), v: &Bytes) -> u32 { - (k.as_bytes().len() + v.len()) as u32 +fn inverted_index_content_weight((k, _): &((FileId, IndexVersion), PageKey), v: &Bytes) -> u32 { + (k.0.as_bytes().len() + size_of::() + v.len()) as u32 } /// Inverted index blob reader with cache. pub struct CachedInvertedIndexBlobReader { file_id: FileId, + index_version: IndexVersion, blob_size: u64, inner: R, cache: InvertedIndexCacheRef, @@ -72,9 +74,16 @@ pub struct CachedInvertedIndexBlobReader { impl CachedInvertedIndexBlobReader { /// Creates a new inverted index blob reader with cache. - pub fn new(file_id: FileId, blob_size: u64, inner: R, cache: InvertedIndexCacheRef) -> Self { + pub fn new( + file_id: FileId, + index_version: IndexVersion, + blob_size: u64, + inner: R, + cache: InvertedIndexCacheRef, + ) -> Self { Self { file_id, + index_version, blob_size, inner, cache, @@ -96,7 +105,7 @@ impl InvertedIndexReader for CachedInvertedIndexBlobRead let (result, cache_metrics) = self .cache .get_or_load( - self.file_id, + (self.file_id, self.index_version), self.blob_size, offset, size, @@ -129,7 +138,7 @@ impl InvertedIndexReader for CachedInvertedIndexBlobRead let (page, cache_metrics) = self .cache .get_or_load( - self.file_id, + (self.file_id, self.index_version), self.blob_size, range.start, (range.end - range.start) as u32, @@ -156,7 +165,7 @@ impl InvertedIndexReader for CachedInvertedIndexBlobRead &self, metrics: Option<&'a mut InvertedIndexReadMetrics>, ) -> Result> { - if let Some(cached) = self.cache.get_metadata(self.file_id) { + if let Some(cached) = self.cache.get_metadata((self.file_id, self.index_version)) { CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc(); if let Some(m) = metrics { m.cache_hit += 1; @@ -164,7 +173,8 @@ impl InvertedIndexReader for CachedInvertedIndexBlobRead Ok(cached) } else { let meta = self.inner.metadata(metrics).await?; - self.cache.put_metadata(self.file_id, meta.clone()); + self.cache + .put_metadata((self.file_id, self.index_version), meta.clone()); CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc(); Ok(meta) } @@ -299,6 +309,7 @@ mod test { // Init a test range reader in local fs. let mut env = TestEnv::new().await; let file_size = blob.len() as u64; + let index_version = 0; let store = env.init_object_store_manager(); let temp_path = "data"; store.write(temp_path, blob).await.unwrap(); @@ -314,6 +325,7 @@ mod test { let reader = InvertedIndexBlobReader::new(range_reader); let cached_reader = CachedInvertedIndexBlobReader::new( FileId::random(), + index_version, file_size, reader, Arc::new(InvertedIndexCache::new(8192, 8192, 50)), @@ -450,7 +462,7 @@ mod test { let (read, _cache_metrics) = cached_reader .cache .get_or_load( - cached_reader.file_id, + (cached_reader.file_id, cached_reader.index_version), file_size, offset, size, diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index 841de3bde0..2e53a0b999 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -215,6 +215,7 @@ impl WriteCache { puffin_manager: self .puffin_manager_factory .build(store.clone(), path_provider.clone()), + write_cache_enabled: true, intermediate_manager: self.intermediate_manager.clone(), index_options: write_request.index_options, inverted_index_config: write_request.inverted_index_config, @@ -266,7 +267,7 @@ impl WriteCache { upload_tracker.push_uploaded_file(parquet_path); if sst.index_metadata.file_size > 0 { - let puffin_key = IndexKey::new(region_id, sst.file_id, FileType::Puffin); + let puffin_key = IndexKey::new(region_id, sst.file_id, FileType::Puffin(0)); let puffin_path = upload_request .dest_path_provider .build_index_file_path(RegionFileId::new(region_id, sst.file_id)); @@ -439,7 +440,11 @@ impl UploadTracker { file_cache.remove(parquet_key).await; if sst.index_metadata.file_size > 0 { - let puffin_key = IndexKey::new(self.region_id, sst.file_id, FileType::Puffin); + let puffin_key = IndexKey::new( + self.region_id, + sst.file_id, + FileType::Puffin(sst.index_metadata.version), + ); file_cache.remove(puffin_key).await; } } @@ -548,7 +553,7 @@ mod tests { assert_eq!(remote_data.to_vec(), cache_data.to_vec()); // Check write cache contains the index key - let index_key = IndexKey::new(region_id, file_id, FileType::Puffin); + let index_key = IndexKey::new(region_id, file_id, FileType::Puffin(0)); assert!(write_cache.file_cache.contains_key(&index_key)); let remote_index_data = mock_store.read(&index_upload_path).await.unwrap(); diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index e63ae7f3f6..3945c2d31e 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -399,7 +399,7 @@ impl DefaultCompactor { available_indexes: sst_info.index_metadata.build_available_indexes(), indexes: sst_info.index_metadata.build_indexes(), index_file_size: sst_info.index_metadata.file_size, - index_file_id: None, + index_version: 0, num_rows: sst_info.num_rows as u64, num_row_groups: sst_info.num_row_groups, sequence: max_sequence, diff --git a/src/mito2/src/compaction/test_util.rs b/src/mito2/src/compaction/test_util.rs index 90960b9841..d5cf251a68 100644 --- a/src/mito2/src/compaction/test_util.rs +++ b/src/mito2/src/compaction/test_util.rs @@ -77,7 +77,7 @@ pub fn new_file_handle_with_size_and_sequence( available_indexes: Default::default(), indexes: Default::default(), index_file_size: 0, - index_file_id: None, + index_version: 0, num_rows: 0, num_row_groups: 0, num_series: 0, diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 76c9a37616..798ec5eded 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -135,7 +135,7 @@ use crate::read::stream::ScanBatchStream; use crate::region::MitoRegionRef; use crate::region::opener::PartitionExprFetcherRef; use crate::request::{RegionEditRequest, WorkerRequest}; -use crate::sst::file::{FileMeta, RegionFileId}; +use crate::sst::file::{FileMeta, RegionFileId, RegionIndexId}; use crate::sst::file_ref::FileReferenceManagerRef; use crate::wal::entry_distributor::{ DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE, build_wal_entry_distributor_and_receivers, @@ -541,22 +541,23 @@ impl MitoEngine { return Vec::new(); }; - let Some(index_file_id) = entry.index_file_id.as_ref() else { - return Vec::new(); - }; - let file_id = match FileId::parse_str(index_file_id) { + let index_version = entry.index_version; + let file_id = match FileId::parse_str(&entry.file_id) { Ok(file_id) => file_id, Err(err) => { warn!( err; "Failed to parse puffin index file id, table_dir: {}, file_id: {}", entry.table_dir, - index_file_id + entry.file_id ); return Vec::new(); } }; - let region_file_id = RegionFileId::new(entry.region_id, file_id); + let region_index_id = RegionIndexId::new( + RegionFileId::new(entry.region_id, file_id), + index_version, + ); let context = IndexEntryContext { table_dir: &entry.table_dir, index_file_path: index_file_path.as_str(), @@ -565,7 +566,7 @@ impl MitoEngine { region_number: entry.region_number, region_group: entry.region_group, region_sequence: entry.region_sequence, - file_id: index_file_id, + file_id: &entry.file_id, index_file_size: entry.index_file_size, node_id, }; @@ -576,7 +577,7 @@ impl MitoEngine { collect_index_entries_from_puffin( manager, - region_file_id, + region_index_id, context, bloom_filter_cache, inverted_index_cache, diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index f17726abef..88303d3f70 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -861,9 +861,10 @@ async fn test_cache_null_primary_key_with_format(flat_format: bool) { #[tokio::test] async fn test_list_ssts() { test_list_ssts_with_format(false, r#" -ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_id: Some(""), level: 0, file_path: "test/11_0000000001/.parquet", file_size: 2513, index_file_path: Some("test/11_0000000001/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "", index_file_id: Some(""), level: 0, file_path: "test/11_0000000002/.parquet", file_size: 2513, index_file_path: Some("test/11_0000000002/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "", index_file_id: Some(""), level: 0, file_path: "test/22_0000000042/.parquet", file_size: 2513, index_file_path: Some("test/22_0000000042/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"# ,r#" +ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_version: 0, level: 0, file_path: "test/11_0000000001/.parquet", file_size: 2513, index_file_path: Some("test/11_0000000001/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "", index_version: 0, level: 0, file_path: "test/11_0000000002/.parquet", file_size: 2513, index_file_path: Some("test/11_0000000002/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "", index_version: 0, level: 0, file_path: "test/22_0000000042/.parquet", file_size: 2513, index_file_path: Some("test/22_0000000042/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"# , +r#" StorageSstEntry { file_path: "test/11_0000000001/.parquet", file_size: None, last_modified_ms: None, node_id: None } StorageSstEntry { file_path: "test/11_0000000001/index/.puffin", file_size: None, last_modified_ms: None, node_id: None } StorageSstEntry { file_path: "test/11_0000000002/.parquet", file_size: None, last_modified_ms: None, node_id: None } @@ -871,9 +872,10 @@ StorageSstEntry { file_path: "test/11_0000000002/index/.puffin", file_s StorageSstEntry { file_path: "test/22_0000000042/.parquet", file_size: None, last_modified_ms: None, node_id: None } StorageSstEntry { file_path: "test/22_0000000042/index/.puffin", file_size: None, last_modified_ms: None, node_id: None }"#).await; test_list_ssts_with_format(true, r#" -ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_id: Some(""), level: 0, file_path: "test/11_0000000001/.parquet", file_size: 2837, index_file_path: Some("test/11_0000000001/index/.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "", index_file_id: Some(""), level: 0, file_path: "test/11_0000000002/.parquet", file_size: 2837, index_file_path: Some("test/11_0000000002/index/.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "", index_file_id: Some(""), level: 0, file_path: "test/22_0000000042/.parquet", file_size: 2837, index_file_path: Some("test/22_0000000042/index/.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"#, r#" +ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_version: 0, level: 0, file_path: "test/11_0000000001/.parquet", file_size: 2837, index_file_path: Some("test/11_0000000001/index/.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "", index_version: 0, level: 0, file_path: "test/11_0000000002/.parquet", file_size: 2837, index_file_path: Some("test/11_0000000002/index/.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "", index_version: 0, level: 0, file_path: "test/22_0000000042/.parquet", file_size: 2837, index_file_path: Some("test/22_0000000042/index/.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"#, +r#" StorageSstEntry { file_path: "test/11_0000000001/.parquet", file_size: None, last_modified_ms: None, node_id: None } StorageSstEntry { file_path: "test/11_0000000001/index/.puffin", file_size: None, last_modified_ms: None, node_id: None } StorageSstEntry { file_path: "test/11_0000000002/.parquet", file_size: None, last_modified_ms: None, node_id: None } @@ -945,13 +947,13 @@ async fn test_list_ssts_with_format( .index_file_path .map(|p| p.replace(&e.file_id, "")); e.file_id = "".to_string(); - e.index_file_id = e.index_file_id.map(|_| "".to_string()); + e.index_version = 0; format!("\n{:?}", e) }) .sorted() .collect::>() .join(""); - assert_eq!(debug_format, expected_manifest_ssts,); + assert_eq!(debug_format, expected_manifest_ssts, "{}", debug_format); // list from storage let storage_entries = engine @@ -969,7 +971,7 @@ async fn test_list_ssts_with_format( .sorted() .collect::>() .join(""); - assert_eq!(debug_format, expected_storage_ssts,); + assert_eq!(debug_format, expected_storage_ssts, "{}", debug_format); } #[tokio::test] diff --git a/src/mito2/src/engine/index_build_test.rs b/src/mito2/src/engine/index_build_test.rs index 7c5ec19c7f..441f93d1bc 100644 --- a/src/mito2/src/engine/index_build_test.rs +++ b/src/mito2/src/engine/index_build_test.rs @@ -55,10 +55,10 @@ async fn num_of_index_files(engine: &MitoEngine, scanner: &Scanner, region_id: R return 0; } let mut index_files_count: usize = 0; - for region_file_id in scanner.file_ids() { + for region_index_id in scanner.index_ids() { let index_path = location::index_file_path( access_layer.table_dir(), - region_file_id, + region_index_id, access_layer.path_type(), ); if access_layer diff --git a/src/mito2/src/engine/puffin_index.rs b/src/mito2/src/engine/puffin_index.rs index 925d547eb9..281b619bc5 100644 --- a/src/mito2/src/engine/puffin_index.rs +++ b/src/mito2/src/engine/puffin_index.rs @@ -32,7 +32,7 @@ use crate::cache::index::bloom_filter_index::{ BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag, }; use crate::cache::index::inverted_index::{CachedInvertedIndexBlobReader, InvertedIndexCacheRef}; -use crate::sst::file::RegionFileId; +use crate::sst::file::RegionIndexId; use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE as BLOOM_BLOB_TYPE; use crate::sst::index::fulltext_index::{ INDEX_BLOB_TYPE_BLOOM as FULLTEXT_BLOOM_BLOB_TYPE, @@ -66,14 +66,14 @@ pub(crate) struct IndexEntryContext<'a> { /// Collect index metadata entries present in the SST puffin file. pub(crate) async fn collect_index_entries_from_puffin( manager: SstPuffinManager, - region_file_id: RegionFileId, + region_index_id: RegionIndexId, context: IndexEntryContext<'_>, bloom_filter_cache: Option, inverted_index_cache: Option, ) -> Vec { let mut entries = Vec::new(); - let reader = match manager.reader(®ion_file_id).await { + let reader = match manager.reader(®ion_index_id).await { Ok(reader) => reader, Err(err) => { warn!( @@ -104,7 +104,7 @@ pub(crate) async fn collect_index_entries_from_puffin( Some(BlobIndexTypeTargetKey::BloomFilter(target_key)) => { let bloom_meta = try_read_bloom_meta( &reader, - region_file_id, + region_index_id, blob.blob_type.as_str(), target_key, bloom_filter_cache.as_ref(), @@ -130,7 +130,7 @@ pub(crate) async fn collect_index_entries_from_puffin( Some(BlobIndexTypeTargetKey::FulltextBloom(target_key)) => { let bloom_meta = try_read_bloom_meta( &reader, - region_file_id, + region_index_id, blob.blob_type.as_str(), target_key, bloom_filter_cache.as_ref(), @@ -172,7 +172,7 @@ pub(crate) async fn collect_index_entries_from_puffin( Some(BlobIndexTypeTargetKey::Inverted) => { let mut inverted_entries = collect_inverted_entries( &reader, - region_file_id, + region_index_id, inverted_index_cache.as_ref(), &context, ) @@ -188,12 +188,12 @@ pub(crate) async fn collect_index_entries_from_puffin( async fn collect_inverted_entries( reader: &SstPuffinReader, - region_file_id: RegionFileId, + region_index_id: RegionIndexId, cache: Option<&InvertedIndexCacheRef>, context: &IndexEntryContext<'_>, ) -> Vec { // Read the inverted index blob and surface its per-column metadata entries. - let file_id = region_file_id.file_id(); + let file_id = region_index_id.file_id(); let guard = match reader.blob(INVERTED_BLOB_TYPE).await { Ok(guard) => guard, @@ -229,6 +229,7 @@ async fn collect_inverted_entries( let metas = if let (Some(cache), Some(blob_size)) = (cache, blob_size) { let reader = CachedInvertedIndexBlobReader::new( file_id, + region_index_id.version, blob_size, InvertedIndexBlobReader::new(blob_reader), cache.clone(), @@ -289,7 +290,7 @@ fn build_inverted_entries( async fn try_read_bloom_meta( reader: &SstPuffinReader, - region_file_id: RegionFileId, + region_index_id: RegionIndexId, blob_type: &str, target_key: &str, cache: Option<&BloomFilterIndexCacheRef>, @@ -311,7 +312,8 @@ async fn try_read_bloom_meta( let result = match (cache, column_id, blob_size) { (Some(cache), Some(column_id), Some(blob_size)) => { CachedBloomFilterIndexBlobReader::new( - region_file_id.file_id(), + region_index_id.file_id(), + region_index_id.version, column_id, tag, blob_size, diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 137e703e39..eff6529223 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -643,7 +643,7 @@ impl RegionFlushTask { available_indexes: sst_info.index_metadata.build_available_indexes(), indexes: sst_info.index_metadata.build_indexes(), index_file_size: sst_info.index_metadata.file_size, - index_file_id: None, + index_version: 0, num_rows: sst_info.num_rows as u64, num_row_groups: sst_info.num_row_groups, sequence: NonZeroU64::new(max_sequence), diff --git a/src/mito2/src/gc.rs b/src/mito2/src/gc.rs index c61b569cef..06dc13f5fa 100644 --- a/src/mito2/src/gc.rs +++ b/src/mito2/src/gc.rs @@ -330,10 +330,9 @@ impl LocalGcWorker { // TODO(discord9): for now, ignore async index file as it's design is not stable, need to be improved once // index file design is stable - let file_pairs: Vec<(FileId, FileId)> = unused_files - .iter() - .map(|file_id| (*file_id, *file_id)) - .collect(); + let file_pairs: Vec<(FileId, u64)> = + unused_files.iter().map(|file_id| (*file_id, 0)).collect(); + // TODO(discord9): gc worker need another major refactor to support versioned index files debug!( "Found {} unused index files to delete for region {}", @@ -354,7 +353,7 @@ impl LocalGcWorker { Ok(unused_files) } - async fn delete_files(&self, region_id: RegionId, file_ids: &[(FileId, FileId)]) -> Result<()> { + async fn delete_files(&self, region_id: RegionId, file_ids: &[(FileId, u64)]) -> Result<()> { delete_files( region_id, file_ids, diff --git a/src/mito2/src/manifest/tests/checkpoint.rs b/src/mito2/src/manifest/tests/checkpoint.rs index da063fe242..d709bb96f0 100644 --- a/src/mito2/src/manifest/tests/checkpoint.rs +++ b/src/mito2/src/manifest/tests/checkpoint.rs @@ -247,7 +247,7 @@ async fn checkpoint_with_different_compression_types() { available_indexes: Default::default(), indexes: Default::default(), index_file_size: 0, - index_file_id: None, + index_version: 0, num_rows: 0, num_row_groups: 0, sequence: None, @@ -312,7 +312,7 @@ fn generate_action_lists(num: usize) -> (Vec, Vec) available_indexes: Default::default(), indexes: Default::default(), index_file_size: 0, - index_file_id: None, + index_version: 0, num_rows: 0, num_row_groups: 0, sequence: None, diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 0fa9a0eef9..9ec734a69d 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -135,6 +135,14 @@ impl Scanner { } } + pub(crate) fn index_ids(&self) -> Vec { + match self { + Scanner::Seq(seq_scan) => seq_scan.input().index_ids(), + Scanner::Unordered(unordered_scan) => unordered_scan.input().index_ids(), + Scanner::Series(series_scan) => series_scan.input().index_ids(), + } + } + /// Sets the target partitions for the scanner. It can controls the parallelism of the scanner. pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) { use store_api::region_engine::{PrepareRequest, RegionScanner}; @@ -1162,6 +1170,10 @@ impl ScanInput { pub(crate) fn file_ids(&self) -> Vec { self.files.iter().map(|file| file.file_id()).collect() } + + pub(crate) fn index_ids(&self) -> Vec { + self.files.iter().map(|file| file.index_id()).collect() + } } fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode { diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 3f0f8eb941..661c1d876c 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -617,17 +617,16 @@ impl MitoRegion { .map(|meta| { let region_id = self.region_id; let origin_region_id = meta.region_id; - let (index_file_id, index_file_path, index_file_size) = if meta.index_file_size > 0 + let (index_version, index_file_path, index_file_size) = if meta.index_file_size > 0 { - let index_file_path = - index_file_path(table_dir, meta.index_file_id(), path_type); + let index_file_path = index_file_path(table_dir, meta.index_id(), path_type); ( - Some(meta.index_file_id().file_id().to_string()), + meta.index_version, Some(index_file_path), Some(meta.index_file_size), ) } else { - (None, None, None) + (0, None, None) }; let visible = visible_ssts.contains(&meta.file_id); ManifestSstEntry { @@ -638,7 +637,7 @@ impl MitoRegion { region_group: region_id.region_group(), region_sequence: region_id.region_sequence(), file_id: meta.file_id.to_string(), - index_file_id, + index_version, level: meta.level, file_path: sst_file_path(table_dir, meta.file_id(), path_type), file_size: meta.file_size, diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index ffc93ad7c4..128cf6bad6 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -63,7 +63,7 @@ use crate::region_write_ctx::RegionWriteCtx; use crate::request::OptionOutputTx; use crate::schedule::scheduler::SchedulerRef; use crate::sst::FormatType; -use crate::sst::file::RegionFileId; +use crate::sst::file::{RegionFileId, RegionIndexId}; use crate::sst::file_purger::{FilePurgerRef, create_file_purger}; use crate::sst::file_ref::FileReferenceManagerRef; use crate::sst::index::intermediate::IntermediateManager; @@ -867,8 +867,8 @@ impl RegionLoadCacheTask { if file_meta.exists_index() { let puffin_key = IndexKey::new( file_meta.region_id, - file_meta.index_file_id().file_id(), - FileType::Puffin, + file_meta.file_id, + FileType::Puffin(file_meta.index_version), ); if !file_cache.contains_key(&puffin_key) { @@ -925,12 +925,18 @@ impl RegionLoadCacheTask { break; } - let index_remote_path = location::index_file_path( - table_dir, + let index_version = if let FileType::Puffin(version) = puffin_key.file_type { + version + } else { + unreachable!("`files_to_download` should only contains Puffin files"); + }; + let index_id = RegionIndexId::new( RegionFileId::new(puffin_key.region_id, puffin_key.file_id), - path_type, + index_version, ); + let index_remote_path = location::index_file_path(table_dir, index_id, path_type); + match file_cache .download(puffin_key, &index_remote_path, object_store, file_size) .await diff --git a/src/mito2/src/remap_manifest.rs b/src/mito2/src/remap_manifest.rs index 79f816bb13..1191349b04 100644 --- a/src/mito2/src/remap_manifest.rs +++ b/src/mito2/src/remap_manifest.rs @@ -428,7 +428,7 @@ mod tests { available_indexes: SmallVec::new(), indexes: Default::default(), index_file_size: 0, - index_file_id: None, + index_version: 0, num_rows: 100, num_row_groups: 1, sequence: NonZeroU64::new(1), diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index caead6e3f4..bd12720011 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -82,6 +82,8 @@ pub type Level = u8; pub const MAX_LEVEL: Level = 2; /// Type to store index types for a column. pub type IndexTypes = SmallVec<[IndexType; 4]>; +/// Index version +pub type IndexVersion = u64; /// Cross-region file id. /// @@ -117,6 +119,41 @@ impl fmt::Display for RegionFileId { } } +/// Unique identifier for an index file, combining the SST file ID and the index version. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct RegionIndexId { + pub file_id: RegionFileId, + pub version: IndexVersion, +} + +impl RegionIndexId { + pub fn new(file_id: RegionFileId, version: IndexVersion) -> Self { + Self { file_id, version } + } + + pub fn region_id(&self) -> RegionId { + self.file_id.region_id + } + + pub fn file_id(&self) -> FileId { + self.file_id.file_id + } +} + +impl fmt::Display for RegionIndexId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + if self.version == 0 { + write!(f, "{}/{}", self.file_id.region_id, self.file_id.file_id) + } else { + write!( + f, + "{}/{}.{}", + self.file_id.region_id, self.file_id.file_id, self.version + ) + } + } +} + /// Time range (min and max timestamps) of a SST file. /// Both min and max are inclusive. pub type FileTimeRange = (Timestamp, Timestamp); @@ -159,12 +196,10 @@ pub struct FileMeta { pub indexes: Vec, /// Size of the index file. pub index_file_size: u64, - /// File ID of the index file. - /// - /// When this field is None, it means the index file id is the same as the file id. - /// Only meaningful when index_file_size > 0. - /// Used for rebuilding index files. - pub index_file_id: Option, + /// Version of the index file. + /// Used to generate the index file name: "{file_id}.{index_version}.puffin". + /// Default is 0 (which maps to "{file_id}.puffin" for compatibility). + pub index_version: u64, /// Number of rows in the file. /// /// For historical reasons, this field might be missing in old files. Thus @@ -332,14 +367,9 @@ impl FileMeta { RegionFileId::new(self.region_id, self.file_id) } - /// Returns the cross-region index file id. - /// If the index file id is not set, returns the file id. - pub fn index_file_id(&self) -> RegionFileId { - if let Some(index_file_id) = self.index_file_id { - RegionFileId::new(self.region_id, index_file_id) - } else { - self.file_id() - } + /// Returns the RegionIndexId for this file. + pub fn index_id(&self) -> RegionIndexId { + RegionIndexId::new(self.file_id(), self.index_version) } } @@ -376,14 +406,9 @@ impl FileHandle { RegionFileId::new(self.inner.meta.region_id, self.inner.meta.file_id) } - /// Returns the cross-region index file id. - /// If the index file id is not set, returns the file id. - pub fn index_file_id(&self) -> RegionFileId { - if let Some(index_file_id) = self.inner.meta.index_file_id { - RegionFileId::new(self.inner.meta.region_id, index_file_id) - } else { - self.file_id() - } + /// Returns the RegionIndexId for this file. + pub fn index_id(&self) -> RegionIndexId { + RegionIndexId::new(self.file_id(), self.inner.meta.index_version) } /// Returns the complete file path of the file. @@ -468,10 +493,15 @@ impl FileHandleInner { } } -/// Delete +/// Delete files for a region. +/// - `region_id`: Region id. +/// - `file_ids`: List of (file id, index version) tuples to delete. +/// - `delete_index`: Whether to delete the index file from the cache. +/// - `access_layer`: Access layer to delete files. +/// - `cache_manager`: Cache manager to remove files from cache. pub async fn delete_files( region_id: RegionId, - file_ids: &[(FileId, FileId)], + file_ids: &[(FileId, u64)], delete_index: bool, access_layer: &AccessLayerRef, cache_manager: &Option, @@ -484,12 +514,12 @@ pub async fn delete_files( } let mut deleted_files = Vec::with_capacity(file_ids.len()); - for (file_id, index_file_id) in file_ids { + for (file_id, index_version) in file_ids { let region_file_id = RegionFileId::new(region_id, *file_id); match access_layer .delete_sst( - &RegionFileId::new(region_id, *file_id), - &RegionFileId::new(region_id, *index_file_id), + ®ion_file_id, + &RegionIndexId::new(region_file_id, *index_version), ) .await { @@ -509,12 +539,16 @@ pub async fn delete_files( deleted_files ); - for (file_id, index_file_id) in file_ids { + for (file_id, index_version) in file_ids { if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache()) { // Removes index file from the cache. if delete_index { write_cache - .remove(IndexKey::new(region_id, *index_file_id, FileType::Puffin)) + .remove(IndexKey::new( + region_id, + *file_id, + FileType::Puffin(*index_version), + )) .await; } @@ -527,11 +561,14 @@ pub async fn delete_files( // Purges index content in the stager. if let Err(e) = access_layer .puffin_manager_factory() - .purge_stager(RegionFileId::new(region_id, *index_file_id)) + .purge_stager(RegionIndexId::new( + RegionFileId::new(region_id, *file_id), + *index_version, + )) .await { - error!(e; "Failed to purge stager with index file, file_id: {}, region: {}", - index_file_id, region_id); + error!(e; "Failed to purge stager with index file, file_id: {}, index_version: {}, region: {}", + file_id, index_version, region_id); } } Ok(()) @@ -563,7 +600,7 @@ mod tests { created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]), }], index_file_size: 0, - index_file_id: None, + index_version: 0, num_rows: 0, num_row_groups: 0, sequence: None, @@ -614,7 +651,7 @@ mod tests { created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]), }], index_file_size: 0, - index_file_id: None, + index_version: 0, num_rows: 0, num_row_groups: 0, sequence: None, diff --git a/src/mito2/src/sst/file_purger.rs b/src/mito2/src/sst/file_purger.rs index bf1c2ee5c3..fd405896b0 100644 --- a/src/mito2/src/sst/file_purger.rs +++ b/src/mito2/src/sst/file_purger.rs @@ -129,7 +129,7 @@ impl LocalFilePurger { if let Err(e) = self.scheduler.schedule(Box::pin(async move { if let Err(e) = delete_files( file_meta.region_id, - &[(file_meta.file_id, file_meta.index_file_id().file_id())], + &[(file_meta.file_id, file_meta.index_id().version)], file_meta.exists_index(), &sst_layer, &cache_manager, @@ -187,6 +187,7 @@ mod tests { use crate::schedule::scheduler::{LocalScheduler, Scheduler}; use crate::sst::file::{ ColumnIndexMetadata, FileHandle, FileMeta, FileTimeRange, IndexType, RegionFileId, + RegionIndexId, }; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; @@ -237,7 +238,7 @@ mod tests { available_indexes: Default::default(), indexes: Default::default(), index_file_size: 0, - index_file_id: None, + index_version: 0, num_rows: 0, num_row_groups: 0, sequence: None, @@ -263,6 +264,7 @@ mod tests { let dir_path = dir.path().display().to_string(); let builder = Fs::default().root(&dir_path); let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random()); + let index_file_id = RegionIndexId::new(sst_file_id, 0); let sst_dir = "table1"; let index_aux_path = dir.path().join("index_aux"); @@ -285,7 +287,7 @@ mod tests { let path = location::sst_file_path(sst_dir, sst_file_id, layer.path_type()); object_store.write(&path, vec![0; 4096]).await.unwrap(); - let index_path = location::index_file_path(sst_dir, sst_file_id, layer.path_type()); + let index_path = location::index_file_path(sst_dir, index_file_id, layer.path_type()); object_store .write(&index_path, vec![0; 4096]) .await @@ -309,7 +311,7 @@ mod tests { created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]), }], index_file_size: 4096, - index_file_id: None, + index_version: 0, num_rows: 1024, num_row_groups: 1, sequence: NonZeroU64::new(4096), diff --git a/src/mito2/src/sst/file_ref.rs b/src/mito2/src/sst/file_ref.rs index e3cc38640c..5cb7dc8c88 100644 --- a/src/mito2/src/sst/file_ref.rs +++ b/src/mito2/src/sst/file_ref.rs @@ -230,7 +230,7 @@ mod tests { created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]), }], index_file_size: 4096, - index_file_id: None, + index_version: 0, num_rows: 1024, num_row_groups: 1, sequence: NonZeroU64::new(4096), diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index 2e64ca426a..12f3d9f1d7 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -61,7 +61,7 @@ use crate::request::{ }; use crate::schedule::scheduler::{Job, SchedulerRef}; use crate::sst::file::{ - ColumnIndexMetadata, FileHandle, FileMeta, IndexType, IndexTypes, RegionFileId, + ColumnIndexMetadata, FileHandle, FileMeta, IndexType, IndexTypes, RegionFileId, RegionIndexId, }; use crate::sst::file_purger::FilePurgerRef; use crate::sst::index::fulltext_index::creator::FulltextIndexer; @@ -81,6 +81,8 @@ pub(crate) const TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index"; pub struct IndexOutput { /// Size of the file. pub file_size: u64, + /// Index version. + pub version: u64, /// Inverted index output. pub inverted_index: InvertedIndexOutput, /// Fulltext index output. @@ -163,7 +165,9 @@ pub type BloomFilterOutput = IndexBaseOutput; pub struct Indexer { file_id: FileId, region_id: RegionId, + index_version: u64, puffin_manager: Option, + write_cache_enabled: bool, inverted_indexer: Option, last_mem_inverted_index: usize, fulltext_indexer: Option, @@ -236,7 +240,7 @@ impl Indexer { #[async_trait::async_trait] pub trait IndexerBuilder { /// Builds indexer of given file id to [index_file_path]. - async fn build(&self, file_id: FileId) -> Indexer; + async fn build(&self, file_id: FileId, index_version: u64) -> Indexer; } #[derive(Clone)] pub(crate) struct IndexerBuilderImpl { @@ -244,6 +248,7 @@ pub(crate) struct IndexerBuilderImpl { pub(crate) metadata: RegionMetadataRef, pub(crate) row_group_size: usize, pub(crate) puffin_manager: SstPuffinManager, + pub(crate) write_cache_enabled: bool, pub(crate) intermediate_manager: IntermediateManager, pub(crate) index_options: IndexOptions, pub(crate) inverted_index_config: InvertedIndexConfig, @@ -254,10 +259,12 @@ pub(crate) struct IndexerBuilderImpl { #[async_trait::async_trait] impl IndexerBuilder for IndexerBuilderImpl { /// Sanity check for arguments and create a new [Indexer] if arguments are valid. - async fn build(&self, file_id: FileId) -> Indexer { + async fn build(&self, file_id: FileId, index_version: u64) -> Indexer { let mut indexer = Indexer { file_id, region_id: self.metadata.region_id, + index_version, + write_cache_enabled: self.write_cache_enabled, ..Default::default() }; @@ -611,13 +618,20 @@ impl IndexBuildTask { &mut self, version_control: VersionControlRef, ) -> Result { - let index_file_id = if self.file_meta.index_file_size > 0 { - // Generate new file ID if index file exists to avoid overwrite. - FileId::random() + // Determine the new index version + let new_index_version = if self.file_meta.index_file_size > 0 { + // Increment version if index file exists to avoid overwrite. + self.file_meta.index_version + 1 } else { - self.file_meta.file_id + 0 // Default version for new index files }; - let mut indexer = self.indexer_builder.build(index_file_id).await; + + // Use the same file_id but with new version for index file + let index_file_id = self.file_meta.file_id; + let mut indexer = self + .indexer_builder + .build(index_file_id, new_index_version) + .await; // Check SST file existence before building index to avoid failure of parquet reader. if !self.check_sst_file_exists(&version_control).await { @@ -677,10 +691,10 @@ impl IndexBuildTask { } // Upload index file if write cache is enabled. - self.maybe_upload_index_file(index_output.clone(), index_file_id) + self.maybe_upload_index_file(index_output.clone(), index_file_id, new_index_version) .await?; - let worker_request = match self.update_manifest(index_output, index_file_id).await { + let worker_request = match self.update_manifest(index_output, new_index_version).await { Ok(edit) => { let index_build_finished = IndexBuildFinished { region_id: self.file_meta.region_id, @@ -712,6 +726,7 @@ impl IndexBuildTask { &self, output: IndexOutput, index_file_id: FileId, + index_version: u64, ) -> Result<()> { if let Some(write_cache) = &self.write_cache { let file_id = self.file_meta.file_id; @@ -719,12 +734,14 @@ impl IndexBuildTask { let remote_store = self.access_layer.object_store(); let mut upload_tracker = UploadTracker::new(region_id); let mut err = None; - let puffin_key = IndexKey::new(region_id, index_file_id, FileType::Puffin); + let puffin_key = + IndexKey::new(region_id, index_file_id, FileType::Puffin(output.version)); + let index_id = RegionIndexId::new(RegionFileId::new(region_id, file_id), index_version); let puffin_path = RegionFilePathFactory::new( self.access_layer.table_dir().to_string(), self.access_layer.path_type(), ) - .build_index_file_path(RegionFileId::new(region_id, file_id)); + .build_index_file_path_with_version(index_id); if let Err(e) = write_cache .upload(puffin_key, &puffin_path, remote_store) .await @@ -756,12 +773,12 @@ impl IndexBuildTask { async fn update_manifest( &mut self, output: IndexOutput, - index_file_id: FileId, + new_index_version: u64, ) -> Result { self.file_meta.available_indexes = output.build_available_indexes(); self.file_meta.indexes = output.build_indexes(); self.file_meta.index_file_size = output.file_size; - self.file_meta.index_file_id = Some(index_file_id); + self.file_meta.index_version = new_index_version; let edit = RegionEdit { files_to_add: vec![self.file_meta.clone()], files_to_remove: vec![], @@ -1163,6 +1180,10 @@ mod tests { unreachable!() } + fn build_index_file_path_with_version(&self, _index_id: RegionIndexId) -> String { + unreachable!() + } + fn build_sst_file_path(&self, _file_id: RegionFileId) -> String { unreachable!() } @@ -1236,6 +1257,7 @@ mod tests { metadata, row_group_size: 1024, puffin_manager, + write_cache_enabled: false, intermediate_manager: intm_manager, index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), @@ -1260,13 +1282,14 @@ mod tests { metadata, row_group_size: 1024, puffin_manager: factory.build(mock_object_store(), NoopPathProvider), + write_cache_enabled: false, intermediate_manager: intm_manager, index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), } - .build(FileId::random()) + .build(FileId::random(), 0) .await; assert!(indexer.inverted_indexer.is_some()); @@ -1290,6 +1313,7 @@ mod tests { metadata: metadata.clone(), row_group_size: 1024, puffin_manager: factory.build(mock_object_store(), NoopPathProvider), + write_cache_enabled: false, intermediate_manager: intm_manager.clone(), index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig { @@ -1299,7 +1323,7 @@ mod tests { fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), } - .build(FileId::random()) + .build(FileId::random(), 0) .await; assert!(indexer.inverted_indexer.is_none()); @@ -1311,6 +1335,7 @@ mod tests { metadata: metadata.clone(), row_group_size: 1024, puffin_manager: factory.build(mock_object_store(), NoopPathProvider), + write_cache_enabled: false, intermediate_manager: intm_manager.clone(), index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), @@ -1320,7 +1345,7 @@ mod tests { }, bloom_filter_index_config: BloomFilterConfig::default(), } - .build(FileId::random()) + .build(FileId::random(), 0) .await; assert!(indexer.inverted_indexer.is_some()); @@ -1332,6 +1357,7 @@ mod tests { metadata, row_group_size: 1024, puffin_manager: factory.build(mock_object_store(), NoopPathProvider), + write_cache_enabled: false, intermediate_manager: intm_manager, index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), @@ -1341,7 +1367,7 @@ mod tests { ..Default::default() }, } - .build(FileId::random()) + .build(FileId::random(), 0) .await; assert!(indexer.inverted_indexer.is_some()); @@ -1365,13 +1391,14 @@ mod tests { metadata: metadata.clone(), row_group_size: 1024, puffin_manager: factory.build(mock_object_store(), NoopPathProvider), + write_cache_enabled: false, intermediate_manager: intm_manager.clone(), index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), } - .build(FileId::random()) + .build(FileId::random(), 0) .await; assert!(indexer.inverted_indexer.is_none()); @@ -1388,13 +1415,14 @@ mod tests { metadata: metadata.clone(), row_group_size: 1024, puffin_manager: factory.build(mock_object_store(), NoopPathProvider), + write_cache_enabled: false, intermediate_manager: intm_manager.clone(), index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), } - .build(FileId::random()) + .build(FileId::random(), 0) .await; assert!(indexer.inverted_indexer.is_some()); @@ -1411,13 +1439,14 @@ mod tests { metadata: metadata.clone(), row_group_size: 1024, puffin_manager: factory.build(mock_object_store(), NoopPathProvider), + write_cache_enabled: false, intermediate_manager: intm_manager, index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), } - .build(FileId::random()) + .build(FileId::random(), 0) .await; assert!(indexer.inverted_indexer.is_some()); @@ -1441,13 +1470,14 @@ mod tests { metadata, row_group_size: 0, puffin_manager: factory.build(mock_object_store(), NoopPathProvider), + write_cache_enabled: false, intermediate_manager: intm_manager, index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), } - .build(FileId::random()) + .build(FileId::random(), 0) .await; assert!(indexer.inverted_indexer.is_none()); @@ -1619,7 +1649,7 @@ mod tests { let puffin_path = location::index_file_path( env.access_layer.table_dir(), - RegionFileId::new(region_id, file_meta.file_id), + RegionIndexId::new(RegionFileId::new(region_id, file_meta.file_id), 0), env.access_layer.path_type(), ); @@ -1761,6 +1791,7 @@ mod tests { metadata: metadata.clone(), row_group_size: 1024, puffin_manager: write_cache.build_puffin_manager().clone(), + write_cache_enabled: true, intermediate_manager: write_cache.intermediate_manager().clone(), index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), @@ -1812,7 +1843,11 @@ mod tests { } // The write cache should contain the uploaded index file. - let index_key = IndexKey::new(region_id, file_meta.file_id, FileType::Puffin); + let index_key = IndexKey::new( + region_id, + file_meta.file_id, + FileType::Puffin(sst_info.index_metadata.version), + ); assert!(write_cache.file_cache().contains_key(&index_key)); } diff --git a/src/mito2/src/sst/index/bloom_filter/applier.rs b/src/mito2/src/sst/index/bloom_filter/applier.rs index ec4fb6125d..342c147c0e 100644 --- a/src/mito2/src/sst/index/bloom_filter/applier.rs +++ b/src/mito2/src/sst/index/bloom_filter/applier.rs @@ -44,7 +44,7 @@ use crate::error::{ Result, }; use crate::metrics::INDEX_APPLY_ELAPSED; -use crate::sst::file::RegionFileId; +use crate::sst::file::RegionIndexId; use crate::sst::index::TYPE_BLOOM_FILTER_INDEX; use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE; pub use crate::sst::index::bloom_filter::applier::builder::BloomFilterIndexApplierBuilder; @@ -200,7 +200,7 @@ impl BloomFilterIndexApplier { /// * `metrics` - Optional mutable reference to collect metrics on demand pub async fn apply( &self, - file_id: RegionFileId, + file_id: RegionIndexId, file_size_hint: Option, row_groups: impl Iterator, mut metrics: Option<&mut BloomFilterIndexApplyMetrics>, @@ -242,6 +242,7 @@ impl BloomFilterIndexApplier { } let reader = CachedBloomFilterIndexBlobReader::new( file_id.file_id(), + file_id.version, *column_id, Tag::Skipping, blob_size, @@ -286,7 +287,7 @@ impl BloomFilterIndexApplier { /// Returus `None` if the column does not have an index. async fn blob_reader( &self, - file_id: RegionFileId, + file_id: RegionIndexId, column_id: ColumnId, file_size_hint: Option, metrics: Option<&mut BloomFilterIndexApplyMetrics>, @@ -328,7 +329,7 @@ impl BloomFilterIndexApplier { /// Creates a blob reader from the cached index file async fn cached_blob_reader( &self, - file_id: RegionFileId, + file_id: RegionIndexId, column_id: ColumnId, file_size_hint: Option, ) -> Result> { @@ -336,7 +337,11 @@ impl BloomFilterIndexApplier { return Ok(None); }; - let index_key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Puffin); + let index_key = IndexKey::new( + file_id.region_id(), + file_id.file_id(), + FileType::Puffin(file_id.version), + ); if file_cache.get(index_key).await.is_none() { return Ok(None); }; @@ -369,7 +374,7 @@ impl BloomFilterIndexApplier { /// Creates a blob reader from the remote index file async fn remote_blob_reader( &self, - file_id: RegionFileId, + file_id: RegionIndexId, column_id: ColumnId, file_size_hint: Option, ) -> Result { @@ -446,6 +451,7 @@ mod tests { use store_api::storage::FileId; use super::*; + use crate::sst::file::RegionFileId; use crate::sst::index::bloom_filter::creator::BloomFilterIndexer; use crate::sst::index::bloom_filter::creator::tests::{ mock_object_store, mock_region_metadata, new_batch, new_intm_mgr, @@ -457,7 +463,7 @@ mod tests { object_store: ObjectStore, metadata: &RegionMetadata, puffin_manager_factory: PuffinManagerFactory, - file_id: RegionFileId, + file_id: RegionIndexId, ) -> impl Fn(&[Expr], Vec<(usize, bool)>) -> BoxFuture<'static, Vec<(usize, Vec>)>> + use<'_> { move |exprs, row_groups| { @@ -514,6 +520,7 @@ mod tests { let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await; let memory_usage_threshold = Some(1024); let file_id = RegionFileId::new(region_metadata.region_id, FileId::random()); + let file_id = RegionIndexId::new(file_id, 0); let table_dir = "table_dir".to_string(); let mut indexer = BloomFilterIndexer::new( diff --git a/src/mito2/src/sst/index/bloom_filter/creator.rs b/src/mito2/src/sst/index/bloom_filter/creator.rs index 83ea788204..a7283f9191 100644 --- a/src/mito2/src/sst/index/bloom_filter/creator.rs +++ b/src/mito2/src/sst/index/bloom_filter/creator.rs @@ -481,7 +481,7 @@ pub(crate) mod tests { use super::*; use crate::access_layer::FilePathProvider; use crate::read::BatchColumn; - use crate::sst::file::RegionFileId; + use crate::sst::file::{RegionFileId, RegionIndexId}; use crate::sst::index::puffin_manager::PuffinManagerFactory; pub fn mock_object_store() -> ObjectStore { @@ -499,6 +499,10 @@ pub(crate) mod tests { file_id.file_id().to_string() } + fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String { + index_id.file_id.file_id().to_string() + } + fn build_sst_file_path(&self, file_id: RegionFileId) -> String { file_id.file_id().to_string() } @@ -621,6 +625,7 @@ pub(crate) mod tests { let puffin_manager = factory.build(object_store, TestPathProvider); let file_id = RegionFileId::new(region_metadata.region_id, file_id); + let file_id = RegionIndexId::new(file_id, 0); let mut puffin_writer = puffin_manager.writer(&file_id).await.unwrap(); let (row_count, byte_count) = indexer.finish(&mut puffin_writer).await.unwrap(); assert_eq!(row_count, 20); diff --git a/src/mito2/src/sst/index/fulltext_index/applier.rs b/src/mito2/src/sst/index/fulltext_index/applier.rs index 86949bf039..8a98d2e020 100644 --- a/src/mito2/src/sst/index/fulltext_index/applier.rs +++ b/src/mito2/src/sst/index/fulltext_index/applier.rs @@ -44,7 +44,7 @@ use crate::error::{ PuffinReadBlobSnafu, Result, }; use crate::metrics::INDEX_APPLY_ELAPSED; -use crate::sst::file::RegionFileId; +use crate::sst::file::RegionIndexId; use crate::sst::index::TYPE_FULLTEXT_INDEX; use crate::sst::index::fulltext_index::applier::builder::{FulltextRequest, FulltextTerm}; use crate::sst::index::fulltext_index::{INDEX_BLOB_TYPE_BLOOM, INDEX_BLOB_TYPE_TANTIVY}; @@ -221,7 +221,7 @@ impl FulltextIndexApplier { /// * `metrics` - Optional mutable reference to collect metrics on demand pub async fn apply_fine( &self, - file_id: RegionFileId, + file_id: RegionIndexId, file_size_hint: Option, mut metrics: Option<&mut FulltextIndexApplyMetrics>, ) -> Result>> { @@ -275,7 +275,7 @@ impl FulltextIndexApplier { async fn apply_fine_one_column( &self, file_size_hint: Option, - file_id: RegionFileId, + file_id: RegionIndexId, column_id: ColumnId, request: &FulltextRequest, metrics: Option<&mut FulltextIndexApplyMetrics>, @@ -356,7 +356,7 @@ impl FulltextIndexApplier { /// * `metrics` - Optional mutable reference to collect metrics on demand pub async fn apply_coarse( &self, - file_id: RegionFileId, + file_id: RegionIndexId, file_size_hint: Option, row_groups: impl Iterator, mut metrics: Option<&mut FulltextIndexApplyMetrics>, @@ -405,7 +405,7 @@ impl FulltextIndexApplier { async fn apply_coarse_one_column( &self, - file_id: RegionFileId, + file_id: RegionIndexId, file_size_hint: Option, column_id: ColumnId, terms: &[FulltextTerm], @@ -440,6 +440,7 @@ impl FulltextIndexApplier { .content_length; let reader = CachedBloomFilterIndexBlobReader::new( file_id.file_id(), + file_id.version, column_id, Tag::Fulltext, blob_size, @@ -611,7 +612,7 @@ impl IndexSource { /// Returns `None` if the blob is not found. async fn blob( &self, - file_id: RegionFileId, + file_id: RegionIndexId, key: &str, file_size_hint: Option, metrics: Option<&mut FulltextIndexApplyMetrics>, @@ -649,7 +650,7 @@ impl IndexSource { /// Returns `None` if the directory is not found. async fn dir( &self, - file_id: RegionFileId, + file_id: RegionIndexId, key: &str, file_size_hint: Option, mut metrics: Option<&mut FulltextIndexApplyMetrics>, @@ -699,7 +700,7 @@ impl IndexSource { /// Return reader and whether it is fallbacked to remote store. async fn ensure_reader( &self, - file_id: RegionFileId, + file_id: RegionIndexId, file_size_hint: Option, ) -> Result<(SstPuffinReader, bool)> { match self.build_local_cache(file_id, file_size_hint).await { @@ -711,14 +712,18 @@ impl IndexSource { async fn build_local_cache( &self, - file_id: RegionFileId, + file_id: RegionIndexId, file_size_hint: Option, ) -> Result> { let Some(file_cache) = &self.file_cache else { return Ok(None); }; - let index_key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Puffin); + let index_key = IndexKey::new( + file_id.region_id(), + file_id.file_id(), + FileType::Puffin(file_id.version), + ); if file_cache.get(index_key).await.is_none() { return Ok(None); }; @@ -740,7 +745,7 @@ impl IndexSource { async fn build_remote( &self, - file_id: RegionFileId, + file_id: RegionIndexId, file_size_hint: Option, ) -> Result { let puffin_manager = self diff --git a/src/mito2/src/sst/index/fulltext_index/creator.rs b/src/mito2/src/sst/index/fulltext_index/creator.rs index 32ad178d3b..58c3f1a9bc 100644 --- a/src/mito2/src/sst/index/fulltext_index/creator.rs +++ b/src/mito2/src/sst/index/fulltext_index/creator.rs @@ -481,7 +481,7 @@ mod tests { use super::*; use crate::access_layer::RegionFilePathFactory; use crate::read::{Batch, BatchColumn}; - use crate::sst::file::RegionFileId; + use crate::sst::file::{RegionFileId, RegionIndexId}; use crate::sst::index::fulltext_index::applier::FulltextIndexApplier; use crate::sst::index::fulltext_index::applier::builder::{ FulltextQuery, FulltextRequest, FulltextTerm, @@ -672,7 +672,8 @@ mod tests { RegionFilePathFactory::new(table_dir.clone(), PathType::Bare), ); let region_file_id = RegionFileId::new(region_metadata.region_id, sst_file_id); - let mut writer = puffin_manager.writer(®ion_file_id).await.unwrap(); + let index_id = RegionIndexId::new(region_file_id, 0); + let mut writer = puffin_manager.writer(&index_id).await.unwrap(); let _ = indexer.finish(&mut writer).await.unwrap(); writer.finish().await.unwrap(); @@ -723,16 +724,15 @@ mod tests { let backend = backend.clone(); async move { match backend { - FulltextBackend::Tantivy => applier - .apply_fine(region_file_id, None, None) - .await - .unwrap(), + FulltextBackend::Tantivy => { + applier.apply_fine(index_id, None, None).await.unwrap() + } FulltextBackend::Bloom => { let coarse_mask = coarse_mask.unwrap_or_default(); let row_groups = (0..coarse_mask.len()).map(|i| (1, coarse_mask[i])); // row group id == row id let resp = applier - .apply_coarse(region_file_id, None, row_groups, None) + .apply_coarse(index_id, None, row_groups, None) .await .unwrap(); resp.map(|r| { diff --git a/src/mito2/src/sst/index/indexer/abort.rs b/src/mito2/src/sst/index/indexer/abort.rs index b285b4891d..9a95554f22 100644 --- a/src/mito2/src/sst/index/indexer/abort.rs +++ b/src/mito2/src/sst/index/indexer/abort.rs @@ -14,6 +14,8 @@ use common_telemetry::warn; +use crate::access_layer::TempFileCleaner; +use crate::sst::file::{RegionFileId, RegionIndexId}; use crate::sst::index::Indexer; impl Indexer { @@ -22,6 +24,9 @@ impl Indexer { self.do_abort_fulltext_index().await; self.do_abort_bloom_filter().await; self.do_prune_intm_sst_dir().await; + if self.write_cache_enabled { + self.do_abort_clean_fs_temp_dir().await; + } self.puffin_manager = None; } @@ -87,4 +92,18 @@ impl Indexer { ); } } + + async fn do_abort_clean_fs_temp_dir(&mut self) { + let Some(puffin_manager) = &self.puffin_manager else { + return; + }; + let fs_accessor = puffin_manager.file_accessor(); + + let fs_handle = RegionIndexId::new( + RegionFileId::new(self.region_id, self.file_id), + self.index_version, + ) + .to_string(); + TempFileCleaner::clean_atomic_dir_files(fs_accessor.store().store(), &[&fs_handle]).await; + } } diff --git a/src/mito2/src/sst/index/indexer/finish.rs b/src/mito2/src/sst/index/indexer/finish.rs index 632b0a68d1..4f620dfe42 100644 --- a/src/mito2/src/sst/index/indexer/finish.rs +++ b/src/mito2/src/sst/index/indexer/finish.rs @@ -16,7 +16,7 @@ use common_telemetry::{debug, warn}; use puffin::puffin_manager::{PuffinManager, PuffinWriter}; use store_api::storage::ColumnId; -use crate::sst::file::RegionFileId; +use crate::sst::file::{RegionFileId, RegionIndexId}; use crate::sst::index::puffin_manager::SstPuffinWriter; use crate::sst::index::statistics::{ByteCount, RowCount}; use crate::sst::index::{ @@ -56,14 +56,18 @@ impl Indexer { self.do_prune_intm_sst_dir().await; output.file_size = self.do_finish_puffin_writer(writer).await; + output.version = self.index_version; output } async fn build_puffin_writer(&mut self) -> Option { - let puffin_manager = self.puffin_manager.take()?; + let puffin_manager = self.puffin_manager.clone()?; let err = match puffin_manager - .writer(&RegionFileId::new(self.region_id, self.file_id)) + .writer(&RegionIndexId::new( + RegionFileId::new(self.region_id, self.file_id), + self.index_version, + )) .await { Ok(writer) => return Some(writer), diff --git a/src/mito2/src/sst/index/inverted_index/applier.rs b/src/mito2/src/sst/index/inverted_index/applier.rs index f22b886131..dcbb0db58f 100644 --- a/src/mito2/src/sst/index/inverted_index/applier.rs +++ b/src/mito2/src/sst/index/inverted_index/applier.rs @@ -40,7 +40,7 @@ use crate::error::{ ApplyInvertedIndexSnafu, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result, }; use crate::metrics::{INDEX_APPLY_ELAPSED, INDEX_APPLY_MEMORY_USAGE}; -use crate::sst::file::RegionFileId; +use crate::sst::file::RegionIndexId; use crate::sst::index::TYPE_INVERTED_INDEX; use crate::sst::index::inverted_index::INDEX_BLOB_TYPE; use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory}; @@ -194,7 +194,7 @@ impl InvertedIndexApplier { /// * `metrics` - Optional mutable reference to collect metrics on demand pub async fn apply( &self, - file_id: RegionFileId, + file_id: RegionIndexId, file_size_hint: Option, mut metrics: Option<&mut InvertedIndexApplyMetrics>, ) -> Result { @@ -222,6 +222,7 @@ impl InvertedIndexApplier { let result = if let Some(index_cache) = &self.inverted_index_cache { let mut index_reader = CachedInvertedIndexBlobReader::new( file_id.file_id(), + file_id.version, blob_size, InvertedIndexBlobReader::new(blob), index_cache.clone(), @@ -268,14 +269,18 @@ impl InvertedIndexApplier { /// Creates a blob reader from the cached index file. async fn cached_blob_reader( &self, - file_id: RegionFileId, + file_id: RegionIndexId, file_size_hint: Option, ) -> Result> { let Some(file_cache) = &self.file_cache else { return Ok(None); }; - let index_key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Puffin); + let index_key = IndexKey::new( + file_id.region_id(), + file_id.file_id(), + FileType::Puffin(file_id.version), + ); if file_cache.get(index_key).await.is_none() { return Ok(None); }; @@ -303,7 +308,7 @@ impl InvertedIndexApplier { /// Creates a blob reader from the remote index file. async fn remote_blob_reader( &self, - file_id: RegionFileId, + file_id: RegionIndexId, file_size_hint: Option, ) -> Result { let puffin_manager = self @@ -349,6 +354,7 @@ mod tests { use store_api::storage::FileId; use super::*; + use crate::sst::index::RegionFileId; #[tokio::test] async fn test_index_applier_apply_basic() { @@ -356,13 +362,14 @@ mod tests { PuffinManagerFactory::new_for_test_async("test_index_applier_apply_basic_").await; let object_store = ObjectStore::new(Memory::default()).unwrap().finish(); let file_id = RegionFileId::new(0.into(), FileId::random()); + let index_id = RegionIndexId::new(file_id, 0); let table_dir = "table_dir".to_string(); let puffin_manager = puffin_manager_factory.build( object_store.clone(), RegionFilePathFactory::new(table_dir.clone(), PathType::Bare), ); - let mut writer = puffin_manager.writer(&file_id).await.unwrap(); + let mut writer = puffin_manager.writer(&index_id).await.unwrap(); writer .put_blob( INDEX_BLOB_TYPE, @@ -392,7 +399,7 @@ mod tests { puffin_manager_factory, Default::default(), ); - let output = sst_index_applier.apply(file_id, None, None).await.unwrap(); + let output = sst_index_applier.apply(index_id, None, None).await.unwrap(); assert_eq!( output, ApplyOutput { @@ -410,13 +417,14 @@ mod tests { .await; let object_store = ObjectStore::new(Memory::default()).unwrap().finish(); let file_id = RegionFileId::new(0.into(), FileId::random()); + let index_id = RegionIndexId::new(file_id, 0); let table_dir = "table_dir".to_string(); let puffin_manager = puffin_manager_factory.build( object_store.clone(), RegionFilePathFactory::new(table_dir.clone(), PathType::Bare), ); - let mut writer = puffin_manager.writer(&file_id).await.unwrap(); + let mut writer = puffin_manager.writer(&index_id).await.unwrap(); writer .put_blob( "invalid_blob_type", @@ -440,7 +448,7 @@ mod tests { puffin_manager_factory, Default::default(), ); - let res = sst_index_applier.apply(file_id, None, None).await; + let res = sst_index_applier.apply(index_id, None, None).await; assert!(format!("{:?}", res.unwrap_err()).contains("Blob not found")); } } diff --git a/src/mito2/src/sst/index/inverted_index/creator.rs b/src/mito2/src/sst/index/inverted_index/creator.rs index a784d01c21..386ee11b9b 100644 --- a/src/mito2/src/sst/index/inverted_index/creator.rs +++ b/src/mito2/src/sst/index/inverted_index/creator.rs @@ -466,7 +466,7 @@ mod tests { use crate::cache::index::inverted_index::InvertedIndexCache; use crate::metrics::CACHE_BYTES; use crate::read::BatchColumn; - use crate::sst::file::RegionFileId; + use crate::sst::file::{RegionFileId, RegionIndexId}; use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder; use crate::sst::index::puffin_manager::PuffinManagerFactory; @@ -591,7 +591,8 @@ mod tests { ); let sst_file_id = RegionFileId::new(region_metadata.region_id, sst_file_id); - let mut writer = puffin_manager.writer(&sst_file_id).await.unwrap(); + let index_id = RegionIndexId::new(sst_file_id, 0); + let mut writer = puffin_manager.writer(&index_id).await.unwrap(); let (row_count, _) = creator.finish(&mut writer).await.unwrap(); assert_eq!(row_count, rows.len() * segment_row_count); writer.finish().await.unwrap(); @@ -615,7 +616,7 @@ mod tests { .unwrap(); Box::pin(async move { applier - .apply(sst_file_id, None, None) + .apply(index_id, None, None) .await .unwrap() .matched_segment_ids diff --git a/src/mito2/src/sst/index/puffin_manager.rs b/src/mito2/src/sst/index/puffin_manager.rs index 0a9d25e1fe..edff8aab58 100644 --- a/src/mito2/src/sst/index/puffin_manager.rs +++ b/src/mito2/src/sst/index/puffin_manager.rs @@ -32,14 +32,14 @@ use crate::metrics::{ INDEX_PUFFIN_FLUSH_OP_TOTAL, INDEX_PUFFIN_READ_BYTES_TOTAL, INDEX_PUFFIN_READ_OP_TOTAL, INDEX_PUFFIN_WRITE_BYTES_TOTAL, INDEX_PUFFIN_WRITE_OP_TOTAL, StagerMetrics, }; -use crate::sst::file::RegionFileId; +use crate::sst::file::RegionIndexId; use crate::sst::index::store::{self, InstrumentedStore}; type InstrumentedRangeReader = store::InstrumentedRangeReader<'static>; type InstrumentedAsyncWrite = store::InstrumentedAsyncWrite<'static, FuturesAsyncWriter>; pub(crate) type SstPuffinManager = - FsPuffinManager>, ObjectStorePuffinFileAccessor>; + FsPuffinManager>, ObjectStorePuffinFileAccessor>; pub(crate) type SstPuffinReader = ::Reader; pub(crate) type SstPuffinWriter = ::Writer; pub(crate) type SstPuffinBlob = ::Blob; @@ -52,7 +52,7 @@ const STAGING_DIR: &str = "staging"; #[derive(Clone)] pub struct PuffinManagerFactory { /// The stager used by the puffin manager. - stager: Arc>, + stager: Arc>, /// The size of the write buffer used to create object store. write_buffer_size: Option, @@ -92,7 +92,7 @@ impl PuffinManagerFactory { SstPuffinManager::new(self.stager.clone(), puffin_file_accessor) } - pub(crate) async fn purge_stager(&self, file_id: RegionFileId) -> Result<()> { + pub(crate) async fn purge_stager(&self, file_id: RegionIndexId) -> Result<()> { self.stager .purge(&file_id) .await @@ -136,16 +136,22 @@ impl ObjectStorePuffinFileAccessor { path_provider, } } + + pub fn store(&self) -> &InstrumentedStore { + &self.object_store + } } #[async_trait] impl PuffinFileAccessor for ObjectStorePuffinFileAccessor { type Reader = InstrumentedRangeReader; type Writer = InstrumentedAsyncWrite; - type FileHandle = RegionFileId; + type FileHandle = RegionIndexId; - async fn reader(&self, handle: &RegionFileId) -> PuffinResult { - let file_path = self.path_provider.build_index_file_path(*handle); + async fn reader(&self, handle: &RegionIndexId) -> PuffinResult { + let file_path = self + .path_provider + .build_index_file_path_with_version(*handle); self.object_store .range_reader( &file_path, @@ -157,8 +163,10 @@ impl PuffinFileAccessor for ObjectStorePuffinFileAccessor { .context(puffin_error::ExternalSnafu) } - async fn writer(&self, handle: &RegionFileId) -> PuffinResult { - let file_path = self.path_provider.build_index_file_path(*handle); + async fn writer(&self, handle: &RegionIndexId) -> PuffinResult { + let file_path = self + .path_provider + .build_index_file_path_with_version(*handle); self.object_store .writer( &file_path, @@ -184,7 +192,7 @@ mod tests { use store_api::storage::FileId; use super::*; - use crate::sst::file::RegionFileId; + use crate::sst::file::{RegionFileId, RegionIndexId}; struct TestFilePathProvider; @@ -193,6 +201,10 @@ mod tests { file_id.file_id().to_string() } + fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String { + index_id.file_id.file_id().to_string() + } + fn build_sst_file_path(&self, file_id: RegionFileId) -> String { file_id.file_id().to_string() } @@ -206,7 +218,7 @@ mod tests { let object_store = ObjectStore::new(Memory::default()).unwrap().finish(); let manager = factory.build(object_store, TestFilePathProvider); - let file_id = RegionFileId::new(0.into(), FileId::random()); + let file_id = RegionIndexId::new(RegionFileId::new(0.into(), FileId::random()), 0); let blob_key = "blob-key"; let dir_key = "dir-key"; let raw_data = b"hello world!"; diff --git a/src/mito2/src/sst/index/store.rs b/src/mito2/src/sst/index/store.rs index f37fdc5c7a..1662c6d876 100644 --- a/src/mito2/src/sst/index/store.rs +++ b/src/mito2/src/sst/index/store.rs @@ -49,6 +49,10 @@ impl InstrumentedStore { } } + pub fn store(&self) -> &ObjectStore { + &self.object_store + } + /// Set the size of the write buffer. pub fn with_write_buffer_size(mut self, write_buffer_size: Option) -> Self { self.write_buffer_size = write_buffer_size.filter(|&size| size > 0); diff --git a/src/mito2/src/sst/location.rs b/src/mito2/src/sst/location.rs index 8b9fa9f88a..f3d9e1bdeb 100644 --- a/src/mito2/src/sst/location.rs +++ b/src/mito2/src/sst/location.rs @@ -20,7 +20,7 @@ use store_api::region_request::PathType; use store_api::storage::{FileId, RegionId}; use crate::error::UnexpectedSnafu; -use crate::sst::file::RegionFileId; +use crate::sst::file::{RegionFileId, RegionIndexId}; /// Generate region dir from table_dir, region_id and path_type pub fn region_dir_from_table_dir( @@ -46,14 +46,68 @@ pub fn sst_file_path(table_dir: &str, region_file_id: RegionFileId, path_type: P ) } -pub fn index_file_path( +pub fn index_file_path(table_dir: &str, index_id: RegionIndexId, path_type: PathType) -> String { + let region_dir = region_dir_from_table_dir(table_dir, index_id.file_id.region_id(), path_type); + let index_dir = util::join_dir(®ion_dir, "index"); + + let filename = if index_id.version == 0 { + format!("{}.puffin", index_id.file_id.file_id()) + } else { + format!("{}.{}.puffin", index_id.file_id.file_id(), index_id.version) + }; + + util::join_path(&index_dir, &filename) +} + +/// Legacy function for backward compatibility - creates index file path using RegionFileId with version 0 +pub fn index_file_path_legacy( table_dir: &str, region_file_id: RegionFileId, path_type: PathType, ) -> String { - let region_dir = region_dir_from_table_dir(table_dir, region_file_id.region_id(), path_type); - let index_dir = util::join_dir(®ion_dir, "index"); - util::join_path(&index_dir, &format!("{}.puffin", region_file_id.file_id())) + let index_id = RegionIndexId::new(region_file_id, 0); + index_file_path(table_dir, index_id, path_type) +} + +/// Parse file ID and version from index filename +pub fn parse_index_file_info(filepath: &str) -> crate::error::Result<(FileId, u64)> { + let filename = filepath.rsplit('/').next().context(UnexpectedSnafu { + reason: format!("invalid file path: {}", filepath), + })?; + let parts: Vec<&str> = filename.split('.').collect(); + + if parts.len() == 2 && parts[1] == "puffin" { + // Legacy format: {file_id}.puffin (version 0) + let file_id = parts[0]; + FileId::parse_str(file_id).map(|id| (id, 0)).map_err(|e| { + UnexpectedSnafu { + reason: format!("invalid file id: {}, err: {}", file_id, e), + } + .build() + }) + } else if parts.len() == 3 && parts[2] == "puffin" { + // New format: {file_id}.{version}.puffin + let file_id = parts[0]; + let version = parts[1].parse::().map_err(|_| { + UnexpectedSnafu { + reason: format!("invalid version in file name: {}", filename), + } + .build() + })?; + FileId::parse_str(file_id) + .map(|id| (id, version)) + .map_err(|e| { + UnexpectedSnafu { + reason: format!("invalid file id: {}, err: {}", file_id, e), + } + .build() + }) + } else { + UnexpectedSnafu { + reason: format!("invalid index file name: {}", filename), + } + .fail() + } } /// Get RegionFileId from sst or index filename @@ -111,17 +165,59 @@ mod tests { fn test_index_file_path() { let file_id = FileId::random(); let region_file_id = RegionFileId::new(RegionId::new(1, 2), file_id); + let index_id = RegionIndexId::new(region_file_id, 0); assert_eq!( - index_file_path("table_dir", region_file_id, PathType::Bare), + index_file_path("table_dir", index_id, PathType::Bare), format!("table_dir/1_0000000002/index/{}.puffin", file_id) ); assert_eq!( - index_file_path("table_dir", region_file_id, PathType::Data), + index_file_path("table_dir", index_id, PathType::Data), format!("table_dir/1_0000000002/data/index/{}.puffin", file_id) ); assert_eq!( - index_file_path("table_dir", region_file_id, PathType::Metadata), + index_file_path("table_dir", index_id, PathType::Metadata), format!("table_dir/1_0000000002/metadata/index/{}.puffin", file_id) ); } + + #[test] + fn test_index_file_path_versioned() { + let file_id = FileId::random(); + let region_file_id = RegionFileId::new(RegionId::new(1, 2), file_id); + let index_id_v1 = RegionIndexId::new(region_file_id, 1); + let index_id_v2 = RegionIndexId::new(region_file_id, 2); + + assert_eq!( + index_file_path("table_dir", index_id_v1, PathType::Bare), + format!("table_dir/1_0000000002/index/{}.1.puffin", file_id) + ); + assert_eq!( + index_file_path("table_dir", index_id_v2, PathType::Bare), + format!("table_dir/1_0000000002/index/{}.2.puffin", file_id) + ); + } + + #[test] + fn test_parse_index_file_info() { + // Test legacy format + let file_id = FileId::random(); + let result = + parse_index_file_info(&format!("table_dir/1_0000000002/index/{file_id}.puffin")) + .unwrap(); + assert_eq!(result.0.to_string(), file_id.to_string()); + assert_eq!(result.1, 0); + + // Test versioned format + let result = + parse_index_file_info(&format!("table_dir/1_0000000002/index/{file_id}.1.puffin")) + .unwrap(); + assert_eq!(result.0.to_string(), file_id.to_string()); + assert_eq!(result.1, 1); + + let result = + parse_index_file_info(&format!("table_dir/1_0000000002/index/{file_id}.42.puffin")) + .unwrap(); + assert_eq!(result.0.to_string(), file_id.to_string()); + assert_eq!(result.1, 42); + } } diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 4553372569..37918553d5 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -117,7 +117,7 @@ mod tests { use crate::config::IndexConfig; use crate::read::{BatchBuilder, BatchReader, FlatSource}; use crate::region::options::{IndexOptions, InvertedIndexOptions}; - use crate::sst::file::{FileHandle, FileMeta, RegionFileId}; + use crate::sst::file::{FileHandle, FileMeta, RegionFileId, RegionIndexId}; use crate::sst::file_purger::NoopFilePurger; use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierBuilder; use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder; @@ -144,7 +144,11 @@ mod tests { impl FilePathProvider for FixedPathProvider { fn build_index_file_path(&self, _file_id: RegionFileId) -> String { - location::index_file_path(FILE_DIR, self.region_file_id, PathType::Bare) + location::index_file_path_legacy(FILE_DIR, self.region_file_id, PathType::Bare) + } + + fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String { + location::index_file_path(FILE_DIR, index_id, PathType::Bare) } fn build_sst_file_path(&self, _file_id: RegionFileId) -> String { @@ -156,7 +160,7 @@ mod tests { #[async_trait::async_trait] impl IndexerBuilder for NoopIndexBuilder { - async fn build(&self, _file_id: FileId) -> Indexer { + async fn build(&self, _file_id: FileId, _index_version: u64) -> Indexer { Indexer::default() } } @@ -711,6 +715,7 @@ mod tests { metadata: metadata.clone(), row_group_size, puffin_manager, + write_cache_enabled: false, intermediate_manager, index_options: IndexOptions { inverted_index: InvertedIndexOptions { @@ -769,7 +774,7 @@ mod tests { available_indexes: info.index_metadata.build_available_indexes(), indexes: info.index_metadata.build_indexes(), index_file_size: info.index_metadata.file_size, - index_file_id: None, + index_version: 0, num_row_groups: info.num_row_groups, num_rows: info.num_rows as u64, sequence: None, @@ -1090,6 +1095,7 @@ mod tests { metadata: metadata.clone(), row_group_size, puffin_manager, + write_cache_enabled: false, intermediate_manager, index_options: IndexOptions { inverted_index: InvertedIndexOptions { diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 3fbb656471..c9d3664743 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -558,7 +558,7 @@ impl ParquetReaderBuilder { let file_size_hint = self.file_handle.meta_ref().index_file_size(); let apply_res = index_applier .apply_fine( - self.file_handle.file_id(), + self.file_handle.index_id(), Some(file_size_hint), metrics.fulltext_index_apply_metrics.as_mut(), ) @@ -630,7 +630,7 @@ impl ParquetReaderBuilder { let file_size_hint = self.file_handle.meta_ref().index_file_size(); let apply_res = index_applier .apply( - self.file_handle.file_id(), + self.file_handle.index_id(), Some(file_size_hint), metrics.inverted_index_apply_metrics.as_mut(), ) @@ -709,7 +709,7 @@ impl ParquetReaderBuilder { }); let apply_res = index_applier .apply( - self.file_handle.file_id(), + self.file_handle.index_id(), Some(file_size_hint), rgs, metrics.bloom_filter_apply_metrics.as_mut(), @@ -792,7 +792,7 @@ impl ParquetReaderBuilder { }); let apply_res = index_applier .apply_coarse( - self.file_handle.file_id(), + self.file_handle.index_id(), Some(file_size_hint), rgs, metrics.fulltext_index_apply_metrics.as_mut(), diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index 5247e2eec8..e30c7fa0b8 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -153,7 +153,7 @@ where metrics: &'a mut Metrics, ) -> ParquetWriter<'a, F, I, P> { let init_file = FileId::random(); - let indexer = indexer_builder.build(init_file).await; + let indexer = indexer_builder.build(init_file, 0).await; ParquetWriter { path_provider, @@ -482,7 +482,7 @@ where .context(WriteParquetSnafu)?; self.writer = Some(arrow_writer); - let indexer = self.indexer_builder.build(self.current_file).await; + let indexer = self.indexer_builder.build(self.current_file, 0).await; self.current_indexer = Some(indexer); // safety: self.writer is assigned above diff --git a/src/mito2/src/test_util/sst_util.rs b/src/mito2/src/test_util/sst_util.rs index 8299c9e3da..d92c628067 100644 --- a/src/mito2/src/test_util/sst_util.rs +++ b/src/mito2/src/test_util/sst_util.rs @@ -126,7 +126,7 @@ pub fn sst_file_handle_with_file_id(file_id: FileId, start_ms: i64, end_ms: i64) available_indexes: Default::default(), indexes: Default::default(), index_file_size: 0, - index_file_id: None, + index_version: 0, num_rows: 0, num_row_groups: 0, num_series: 0, diff --git a/src/mito2/src/test_util/version_util.rs b/src/mito2/src/test_util/version_util.rs index 68fe723ab0..b381b16c05 100644 --- a/src/mito2/src/test_util/version_util.rs +++ b/src/mito2/src/test_util/version_util.rs @@ -104,7 +104,7 @@ impl VersionControlBuilder { available_indexes: Default::default(), indexes: Default::default(), index_file_size: 0, - index_file_id: None, + index_version: 0, num_rows: 0, num_row_groups: 0, num_series: 0, @@ -195,7 +195,7 @@ pub(crate) fn apply_edit( available_indexes: Default::default(), indexes: Default::default(), index_file_size: 0, - index_file_id: None, + index_version: 0, num_rows: 0, num_row_groups: 0, num_series: 0, diff --git a/src/mito2/src/worker/handle_manifest.rs b/src/mito2/src/worker/handle_manifest.rs index 433c440639..a1abde753b 100644 --- a/src/mito2/src/worker/handle_manifest.rs +++ b/src/mito2/src/worker/handle_manifest.rs @@ -481,12 +481,12 @@ async fn edit_region( let index_file_index_key = IndexKey::new( region_id, - file_meta.index_file_id().file_id(), - FileType::Puffin, + file_meta.index_id().file_id.file_id(), + FileType::Puffin(file_meta.index_version), ); let index_remote_path = location::index_file_path( layer.table_dir(), - file_meta.file_id(), + file_meta.index_id(), layer.path_type(), ); diff --git a/src/mito2/src/worker/handle_rebuild_index.rs b/src/mito2/src/worker/handle_rebuild_index.rs index 5030cd77cd..54f7f8abaf 100644 --- a/src/mito2/src/worker/handle_rebuild_index.rs +++ b/src/mito2/src/worker/handle_rebuild_index.rs @@ -28,7 +28,7 @@ use crate::region::MitoRegionRef; use crate::request::{ BuildIndexRequest, IndexBuildFailed, IndexBuildFinished, IndexBuildStopped, OptionOutputTx, }; -use crate::sst::file::{FileHandle, RegionFileId}; +use crate::sst::file::{FileHandle, RegionFileId, RegionIndexId}; use crate::sst::index::{ IndexBuildOutcome, IndexBuildTask, IndexBuildType, IndexerBuilderImpl, ResultMpscSender, }; @@ -68,6 +68,7 @@ impl RegionWorkerLoop { row_group_size: WriteOptions::default().row_group_size, intermediate_manager, puffin_manager, + write_cache_enabled: self.cache_manager.write_cache().is_some(), }); IndexBuildTask { @@ -216,7 +217,8 @@ impl RegionWorkerLoop { let cache_strategy = CacheStrategy::EnableAll(self.cache_manager.clone()); for file_meta in &request.edit.files_to_add { let region_file_id = RegionFileId::new(region_id, file_meta.file_id); - cache_strategy.evict_puffin_cache(region_file_id).await; + let index_id = RegionIndexId::new(region_file_id, file_meta.index_version); + cache_strategy.evict_puffin_cache(index_id).await; } region.version_control.apply_edit( diff --git a/src/puffin/src/puffin_manager/fs_puffin_manager.rs b/src/puffin/src/puffin_manager/fs_puffin_manager.rs index 61f6d5b597..8b4e6e64a7 100644 --- a/src/puffin/src/puffin_manager/fs_puffin_manager.rs +++ b/src/puffin/src/puffin_manager/fs_puffin_manager.rs @@ -56,6 +56,10 @@ impl FsPuffinManager { self.puffin_metadata_cache = puffin_metadata_cache; self } + + pub fn file_accessor(&self) -> &F { + &self.puffin_file_accessor + } } #[async_trait] diff --git a/src/store-api/src/sst_entry.rs b/src/store-api/src/sst_entry.rs index d71e5f0cdc..832bfc1155 100644 --- a/src/store-api/src/sst_entry.rs +++ b/src/store-api/src/sst_entry.rs @@ -47,8 +47,8 @@ pub struct ManifestSstEntry { pub region_sequence: RegionSeq, /// Engine-specific file identifier (string form). pub file_id: String, - /// Engine-specific index file identifier (string form). - pub index_file_id: Option, + /// Index version, increment when the index file is rebuilt. + pub index_version: u64, /// SST level. pub level: u8, /// Full path of the SST file in object store. @@ -91,7 +91,7 @@ impl ManifestSstEntry { ColumnSchema::new("region_group", Ty::uint8_datatype(), false), ColumnSchema::new("region_sequence", Ty::uint32_datatype(), false), ColumnSchema::new("file_id", Ty::string_datatype(), false), - ColumnSchema::new("index_file_id", Ty::string_datatype(), true), + ColumnSchema::new("index_version", Ty::uint64_datatype(), false), ColumnSchema::new("level", Ty::uint8_datatype(), false), ColumnSchema::new("file_path", Ty::string_datatype(), false), ColumnSchema::new("file_size", Ty::uint64_datatype(), false), @@ -119,7 +119,7 @@ impl ManifestSstEntry { let region_groups = entries.iter().map(|e| e.region_group); let region_sequences = entries.iter().map(|e| e.region_sequence); let file_ids = entries.iter().map(|e| e.file_id.as_str()); - let index_file_ids = entries.iter().map(|e| e.index_file_id.as_ref()); + let index_versions = entries.iter().map(|e| e.index_version); let levels = entries.iter().map(|e| e.level); let file_paths = entries.iter().map(|e| e.file_path.as_str()); let file_sizes = entries.iter().map(|e| e.file_size); @@ -151,7 +151,7 @@ impl ManifestSstEntry { Arc::new(UInt8Array::from_iter_values(region_groups)), Arc::new(UInt32Array::from_iter_values(region_sequences)), Arc::new(StringArray::from_iter_values(file_ids)), - Arc::new(StringArray::from_iter(index_file_ids)), + Arc::new(UInt64Array::from_iter(index_versions)), Arc::new(UInt8Array::from_iter_values(levels)), Arc::new(StringArray::from_iter_values(file_paths)), Arc::new(UInt64Array::from_iter_values(file_sizes)), @@ -437,7 +437,7 @@ mod tests { region_group: region_group1, region_sequence: region_seq1, file_id: "f1".to_string(), - index_file_id: None, + index_version: 0, level: 1, file_path: "/p1".to_string(), file_size: 100, @@ -461,7 +461,7 @@ mod tests { region_group: region_group2, region_sequence: region_seq2, file_id: "f2".to_string(), - index_file_id: Some("idx".to_string()), + index_version: 1, level: 3, file_path: "/p2".to_string(), file_size: 200, @@ -548,13 +548,13 @@ mod tests { assert_eq!("f1", file_ids.value(0)); assert_eq!("f2", file_ids.value(1)); - let index_file_ids = batch + let index_versions = batch .column(7) .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap(); - assert!(index_file_ids.is_null(0)); - assert_eq!("idx", index_file_ids.value(1)); + assert_eq!(0, index_versions.value(0)); + assert_eq!(1, index_versions.value(1)); let levels = batch .column(8) diff --git a/tests/cases/standalone/common/information_schema/ssts.result b/tests/cases/standalone/common/information_schema/ssts.result index f9ac0dd47b..bf0642f667 100644 --- a/tests/cases/standalone/common/information_schema/ssts.result +++ b/tests/cases/standalone/common/information_schema/ssts.result @@ -10,7 +10,7 @@ DESC TABLE information_schema.ssts_manifest; | region_group | UInt8 | | NO | | FIELD | | region_sequence | UInt32 | | NO | | FIELD | | file_id | String | | NO | | FIELD | -| index_file_id | String | | YES | | FIELD | +| index_version | UInt64 | | NO | | FIELD | | level | UInt8 | | NO | | FIELD | | file_path | String | | NO | | FIELD | | file_size | UInt64 | | NO | | FIELD | @@ -97,13 +97,13 @@ ADMIN FLUSH_TABLE('sst_case'); -- SQLNESS REPLACE (/public/\d+) /public/ SELECT * FROM information_schema.ssts_manifest order by file_path; -+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+ -| table_dir | region_id | table_id | region_number | region_group | region_sequence | file_id | index_file_id | level | file_path | file_size | index_file_path | index_file_size | num_rows | num_row_groups | num_series | min_ts | max_ts | sequence | origin_region_id | node_id | visible | -+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+ -| data/greptime/public// |||||| | || data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin ||||| | |||| true | -| data/greptime/public// |||||| | || data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin ||||| | |||| true | -| data/greptime/public// |||||| | || data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin ||||| | |||| true | -+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+ ++----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+---------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+ +| table_dir | region_id | table_id | region_number | region_group | region_sequence | file_id | index_version | level | file_path | file_size | index_file_path | index_file_size | num_rows | num_row_groups | num_series | min_ts | max_ts | sequence | origin_region_id | node_id | visible | ++----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+---------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+ +| data/greptime/public// |||||| ||| data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin ||||| | |||| true | +| data/greptime/public// |||||| ||| data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin ||||| | |||| true | +| data/greptime/public// |||||| ||| data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin ||||| | |||| true | ++----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+---------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+ -- SQLNESS REPLACE (\s+\d+\s+) -- SQLNESS REPLACE ([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}) @@ -165,15 +165,15 @@ ADMIN FLUSH_TABLE('sst_case'); -- SQLNESS REPLACE (/public/\d+) /public/ SELECT * FROM information_schema.ssts_manifest order by file_path; -+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+ -| table_dir | region_id | table_id | region_number | region_group | region_sequence | file_id | index_file_id | level | file_path | file_size | index_file_path | index_file_size | num_rows | num_row_groups | num_series | min_ts | max_ts | sequence | origin_region_id | node_id | visible | -+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+ -| data/greptime/public// |||||| | || data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin ||||| | |||| true | -| data/greptime/public// |||||| | || data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin ||||| | |||| true | -| data/greptime/public// |||||| | || data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin ||||| | |||| true | -| data/greptime/public// |||||| | || data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin ||||| | |||| true | -| data/greptime/public// |||||| | || data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin ||||| | |||| true | -+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+ ++----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+---------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+ +| table_dir | region_id | table_id | region_number | region_group | region_sequence | file_id | index_version | level | file_path | file_size | index_file_path | index_file_size | num_rows | num_row_groups | num_series | min_ts | max_ts | sequence | origin_region_id | node_id | visible | ++----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+---------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+ +| data/greptime/public// |||||| ||| data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin ||||| | |||| true | +| data/greptime/public// |||||| ||| data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin ||||| | |||| true | +| data/greptime/public// |||||| ||| data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin ||||| | |||| true | +| data/greptime/public// |||||| ||| data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin ||||| | |||| true | +| data/greptime/public// |||||| ||| data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin ||||| | |||| true | ++----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+---------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+ -- SQLNESS REPLACE (\s+\d+\s+) -- SQLNESS REPLACE ([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}) diff --git a/tests/cases/standalone/common/system/information_schema.result b/tests/cases/standalone/common/system/information_schema.result index 9c124de6d9..04ed933355 100644 --- a/tests/cases/standalone/common/system/information_schema.result +++ b/tests/cases/standalone/common/system/information_schema.result @@ -400,9 +400,9 @@ select * from information_schema.columns order by table_schema, table_name, colu | greptime | information_schema | ssts_manifest | file_id | 7 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | ssts_manifest | file_path | 10 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | ssts_manifest | file_size | 11 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | | -| greptime | information_schema | ssts_manifest | index_file_id | 8 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | | greptime | information_schema | ssts_manifest | index_file_path | 12 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | | greptime | information_schema | ssts_manifest | index_file_size | 13 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | | +| greptime | information_schema | ssts_manifest | index_version | 8 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | | | greptime | information_schema | ssts_manifest | level | 9 | | | 3 | 0 | | | | | | select,insert | | UInt8 | tinyint unsigned | FIELD | | No | tinyint unsigned | | | | greptime | information_schema | ssts_manifest | max_ts | 18 | | | | | 9 | | | | | select,insert | | TimestampNanosecond | timestamp(9) | FIELD | | Yes | timestamp(9) | | | | greptime | information_schema | ssts_manifest | min_ts | 17 | | | | | 9 | | | | | select,insert | | TimestampNanosecond | timestamp(9) | FIELD | | Yes | timestamp(9) | | |