From f06a64ff90632cf258ff09b6bd9b64eeef574936 Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Thu, 11 Dec 2025 20:08:45 +0800 Subject: [PATCH 1/2] feat: mark index outdated (#7383) * feat: mark index outdated Signed-off-by: discord9 * refactor: move IndexVerwsion to store-api Signed-off-by: discord9 * per review Signed-off-by: discord9 * fix: condition for add files Signed-off-by: discord9 * cleanup Signed-off-by: discord9 * refactor(sst): extract index version check into method Signed-off-by: discord9 --------- Signed-off-by: discord9 --- src/cmd/src/datanode/objbench.rs | 2 +- src/mito2/src/access_layer.rs | 31 +++-- .../src/cache/index/bloom_filter_index.rs | 3 +- src/mito2/src/cache/index/inverted_index.rs | 3 +- src/mito2/src/sst/file.rs | 120 +++++++++++++----- src/mito2/src/sst/file_purger.rs | 26 +++- src/mito2/src/sst/index.rs | 53 +++++--- src/mito2/src/sst/version.rs | 21 ++- src/mito2/src/worker/handle_rebuild_index.rs | 1 + src/store-api/src/storage.rs | 2 +- src/store-api/src/storage/file.rs | 3 + 11 files changed, 195 insertions(+), 70 deletions(-) diff --git a/src/cmd/src/datanode/objbench.rs b/src/cmd/src/datanode/objbench.rs index bdb12bcbcb..61b47552eb 100644 --- a/src/cmd/src/datanode/objbench.rs +++ b/src/cmd/src/datanode/objbench.rs @@ -564,7 +564,7 @@ fn new_noop_file_purger() -> FilePurgerRef { #[derive(Debug)] struct Noop; impl FilePurger for Noop { - fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool) {} + fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool, _index_outdated: bool) {} } Arc::new(Noop) } diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index ea7cbaca97..8888ade815 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -228,22 +228,31 @@ impl AccessLayer { // Delete all versions of the index file. for version in 0..=index_file_id.version { - let path = location::index_file_path( - &self.table_dir, - RegionIndexId::new(index_file_id.file_id, version), - self.path_type, - ); - self.object_store - .delete(&path) - .await - .context(DeleteIndexSnafu { - file_id: region_file_id.file_id(), - })?; + let index_id = RegionIndexId::new(*region_file_id, version); + self.delete_index(index_id).await?; } Ok(()) } + pub(crate) async fn delete_index( + &self, + index_file_id: RegionIndexId, + ) -> Result<(), crate::error::Error> { + let path = location::index_file_path( + &self.table_dir, + RegionIndexId::new(index_file_id.file_id, index_file_id.version), + self.path_type, + ); + self.object_store + .delete(&path) + .await + .context(DeleteIndexSnafu { + file_id: index_file_id.file_id(), + })?; + Ok(()) + } + /// Returns the directory of the region in the table. pub fn build_region_dir(&self, region_id: RegionId) -> String { region_dir_from_table_dir(&self.table_dir, region_id, self.path_type) diff --git a/src/mito2/src/cache/index/bloom_filter_index.rs b/src/mito2/src/cache/index/bloom_filter_index.rs index fc39192c98..707afb18a6 100644 --- a/src/mito2/src/cache/index/bloom_filter_index.rs +++ b/src/mito2/src/cache/index/bloom_filter_index.rs @@ -21,11 +21,10 @@ use async_trait::async_trait; use bytes::Bytes; use index::bloom_filter::error::Result; use index::bloom_filter::reader::{BloomFilterReadMetrics, BloomFilterReader}; -use store_api::storage::{ColumnId, FileId}; +use store_api::storage::{ColumnId, FileId, IndexVersion}; use crate::cache::index::{INDEX_METADATA_TYPE, IndexCache, PageKey}; use crate::metrics::{CACHE_HIT, CACHE_MISS}; -use crate::sst::file::IndexVersion; const INDEX_TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index"; diff --git a/src/mito2/src/cache/index/inverted_index.rs b/src/mito2/src/cache/index/inverted_index.rs index f99fa0218c..4c1b07126c 100644 --- a/src/mito2/src/cache/index/inverted_index.rs +++ b/src/mito2/src/cache/index/inverted_index.rs @@ -22,11 +22,10 @@ use bytes::Bytes; use index::inverted_index::error::Result; use index::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader}; use prost::Message; -use store_api::storage::FileId; +use store_api::storage::{FileId, IndexVersion}; use crate::cache::index::{INDEX_METADATA_TYPE, IndexCache, PageKey}; use crate::metrics::{CACHE_HIT, CACHE_MISS}; -use crate::sst::file::IndexVersion; const INDEX_TYPE_INVERTED_INDEX: &str = "inverted_index"; diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index bd12720011..7ab8ee132b 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -28,7 +28,7 @@ use serde::{Deserialize, Serialize}; use smallvec::SmallVec; use store_api::metadata::ColumnMetadata; use store_api::region_request::PathType; -use store_api::storage::{ColumnId, FileId, RegionId}; +use store_api::storage::{ColumnId, FileId, IndexVersion, RegionId}; use crate::access_layer::AccessLayerRef; use crate::cache::CacheManagerRef; @@ -82,8 +82,6 @@ pub type Level = u8; pub const MAX_LEVEL: Level = 2; /// Type to store index types for a column. pub type IndexTypes = SmallVec<[IndexType; 4]>; -/// Index version -pub type IndexVersion = u64; /// Cross-region file id. /// @@ -308,6 +306,11 @@ impl FileMeta { !self.available_indexes.is_empty() } + /// Whether the index file is up-to-date comparing to another file meta. + pub fn is_index_up_to_date(&self, other: &FileMeta) -> bool { + self.exists_index() && other.exists_index() && self.index_version >= other.index_version + } + /// Returns true if the file has an inverted index pub fn inverted_index_available(&self) -> bool { self.available_indexes.contains(&IndexType::InvertedIndex) @@ -434,6 +437,16 @@ impl FileHandle { self.inner.compacting.store(compacting, Ordering::Relaxed); } + pub fn index_outdated(&self) -> bool { + self.inner.index_outdated.load(Ordering::Relaxed) + } + + pub fn set_index_outdated(&self, index_outdated: bool) { + self.inner + .index_outdated + .store(index_outdated, Ordering::Relaxed); + } + /// Returns a reference to the [FileMeta]. pub fn meta_ref(&self) -> &FileMeta { &self.inner.meta @@ -471,23 +484,29 @@ struct FileHandleInner { meta: FileMeta, compacting: AtomicBool, deleted: AtomicBool, + index_outdated: AtomicBool, file_purger: FilePurgerRef, } impl Drop for FileHandleInner { fn drop(&mut self) { - self.file_purger - .remove_file(self.meta.clone(), self.deleted.load(Ordering::Relaxed)); + self.file_purger.remove_file( + self.meta.clone(), + self.deleted.load(Ordering::Acquire), + self.index_outdated.load(Ordering::Acquire), + ); } } impl FileHandleInner { + /// There should only be one `FileHandleInner` for each file on a datanode fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandleInner { file_purger.new_file(&meta); FileHandleInner { meta, compacting: AtomicBool::new(false), deleted: AtomicBool::new(false), + index_outdated: AtomicBool::new(false), file_purger, } } @@ -540,38 +559,77 @@ pub async fn delete_files( ); for (file_id, index_version) in file_ids { - if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache()) { - // Removes index file from the cache. - if delete_index { - write_cache - .remove(IndexKey::new( - region_id, - *file_id, - FileType::Puffin(*index_version), - )) - .await; - } + purge_index_cache_stager( + region_id, + delete_index, + access_layer, + cache_manager, + *file_id, + *index_version, + ) + .await; + } + Ok(()) +} - // Remove the SST file from the cache. +pub async fn delete_index( + region_index_id: RegionIndexId, + access_layer: &AccessLayerRef, + cache_manager: &Option, +) -> crate::error::Result<()> { + access_layer.delete_index(region_index_id).await?; + + purge_index_cache_stager( + region_index_id.region_id(), + true, + access_layer, + cache_manager, + region_index_id.file_id(), + region_index_id.version, + ) + .await; + + Ok(()) +} + +async fn purge_index_cache_stager( + region_id: RegionId, + delete_index: bool, + access_layer: &AccessLayerRef, + cache_manager: &Option, + file_id: FileId, + index_version: u64, +) { + if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache()) { + // Removes index file from the cache. + if delete_index { write_cache - .remove(IndexKey::new(region_id, *file_id, FileType::Parquet)) + .remove(IndexKey::new( + region_id, + file_id, + FileType::Puffin(index_version), + )) .await; } - // Purges index content in the stager. - if let Err(e) = access_layer - .puffin_manager_factory() - .purge_stager(RegionIndexId::new( - RegionFileId::new(region_id, *file_id), - *index_version, - )) - .await - { - error!(e; "Failed to purge stager with index file, file_id: {}, index_version: {}, region: {}", - file_id, index_version, region_id); - } + // Remove the SST file from the cache. + write_cache + .remove(IndexKey::new(region_id, file_id, FileType::Parquet)) + .await; + } + + // Purges index content in the stager. + if let Err(e) = access_layer + .puffin_manager_factory() + .purge_stager(RegionIndexId::new( + RegionFileId::new(region_id, file_id), + index_version, + )) + .await + { + error!(e; "Failed to purge stager with index file, file_id: {}, index_version: {}, region: {}", + file_id, index_version, region_id); } - Ok(()) } #[cfg(test)] diff --git a/src/mito2/src/sst/file_purger.rs b/src/mito2/src/sst/file_purger.rs index fd405896b0..2715223a51 100644 --- a/src/mito2/src/sst/file_purger.rs +++ b/src/mito2/src/sst/file_purger.rs @@ -21,7 +21,7 @@ use crate::access_layer::AccessLayerRef; use crate::cache::CacheManagerRef; use crate::error::Result; use crate::schedule::scheduler::SchedulerRef; -use crate::sst::file::{FileMeta, delete_files}; +use crate::sst::file::{FileMeta, delete_files, delete_index}; use crate::sst::file_ref::FileReferenceManagerRef; /// A worker to delete files in background. @@ -29,7 +29,8 @@ pub trait FilePurger: Send + Sync + fmt::Debug { /// Send a request to remove the file. /// If `is_delete` is true, the file will be deleted from the storage. /// Otherwise, only the reference will be removed. - fn remove_file(&self, file_meta: FileMeta, is_delete: bool); + /// If `index_outdated` is true, the index file will be deleted regardless of `is_delete`. + fn remove_file(&self, file_meta: FileMeta, is_delete: bool, index_outdated: bool); /// Notify the purger of a new file created. /// This is useful for object store based storage, where we need to track the file references @@ -46,7 +47,7 @@ pub type FilePurgerRef = Arc; pub struct NoopFilePurger; impl FilePurger for NoopFilePurger { - fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool) { + fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool, _index_outdated: bool) { // noop } } @@ -142,12 +143,27 @@ impl LocalFilePurger { error!(e; "Failed to schedule the file purge request"); } } + + fn delete_index(&self, file_meta: FileMeta) { + let sst_layer = self.sst_layer.clone(); + let cache_manager = self.cache_manager.clone(); + if let Err(e) = self.scheduler.schedule(Box::pin(async move { + let index_id = file_meta.index_id(); + if let Err(e) = delete_index(index_id, &sst_layer, &cache_manager).await { + error!(e; "Failed to delete index for file {:?} from storage", file_meta); + } + })) { + error!(e; "Failed to schedule the index purge request"); + } + } } impl FilePurger for LocalFilePurger { - fn remove_file(&self, file_meta: FileMeta, is_delete: bool) { + fn remove_file(&self, file_meta: FileMeta, is_delete: bool, index_outdated: bool) { if is_delete { self.delete_file(file_meta); + } else if index_outdated { + self.delete_index(file_meta); } } } @@ -158,7 +174,7 @@ pub struct ObjectStoreFilePurger { } impl FilePurger for ObjectStoreFilePurger { - fn remove_file(&self, file_meta: FileMeta, _is_delete: bool) { + fn remove_file(&self, file_meta: FileMeta, _is_delete: bool, _index_outdated: bool) { // if not on local file system, instead inform the global file purger to remove the file reference. // notice that no matter whether the file is deleted or not, we need to remove the reference // because the file is no longer in use nonetheless. diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index 12f3d9f1d7..c44535a895 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -503,6 +503,8 @@ pub type ResultMpscSender = Sender>; #[derive(Clone)] pub struct IndexBuildTask { + /// The SST file handle to build index for. + pub file: FileHandle, /// The file meta to build index for. pub file_meta: FileMeta, pub reason: IndexBuildType, @@ -651,10 +653,7 @@ impl IndexBuildTask { let mut parquet_reader = self .access_layer - .read_sst(FileHandle::new( - self.file_meta.clone(), - self.file_purger.clone(), - )) + .read_sst(self.file.clone()) // use the latest file handle instead of creating a new one .build() .await?; @@ -1498,14 +1497,19 @@ mod tests { let region_id = metadata.region_id; let indexer_builder = mock_indexer_builder(metadata, &env).await; + let file_meta = FileMeta { + region_id, + file_id: FileId::random(), + file_size: 100, + ..Default::default() + }; + + let file = FileHandle::new(file_meta.clone(), file_purger.clone()); + // Create mock task. let task = IndexBuildTask { - file_meta: FileMeta { - region_id, - file_id: FileId::random(), - file_size: 100, - ..Default::default() - }, + file, + file_meta, reason: IndexBuildType::Flush, access_layer: env.access_layer.clone(), listener: WorkerListener::default(), @@ -1555,10 +1559,13 @@ mod tests { mock_version_control(metadata.clone(), file_purger.clone(), files).await; let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await; + let file = FileHandle::new(file_meta.clone(), file_purger.clone()); + // Create mock task. let (tx, mut rx) = mpsc::channel(4); let (result_tx, mut result_rx) = mpsc::channel::>(4); let task = IndexBuildTask { + file, file_meta: file_meta.clone(), reason: IndexBuildType::Flush, access_layer: env.access_layer.clone(), @@ -1626,10 +1633,13 @@ mod tests { mock_version_control(metadata.clone(), file_purger.clone(), files).await; let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await; + let file = FileHandle::new(file_meta.clone(), file_purger.clone()); + // Create mock task. let (tx, _rx) = mpsc::channel(4); let (result_tx, mut result_rx) = mpsc::channel::>(4); let task = IndexBuildTask { + file, file_meta: file_meta.clone(), reason: IndexBuildType::Flush, access_layer: env.access_layer.clone(), @@ -1726,10 +1736,13 @@ mod tests { mock_version_control(metadata.clone(), file_purger.clone(), files).await; let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await; + let file = FileHandle::new(file_meta.clone(), file_purger.clone()); + // Create mock task. let (tx, mut rx) = mpsc::channel(4); let (result_tx, mut result_rx) = mpsc::channel::>(4); let task = IndexBuildTask { + file, file_meta: file_meta.clone(), reason: IndexBuildType::Flush, access_layer: env.access_layer.clone(), @@ -1813,10 +1826,13 @@ mod tests { let version_control = mock_version_control(metadata.clone(), file_purger.clone(), files).await; + let file = FileHandle::new(file_meta.clone(), file_purger.clone()); + // Create mock task. let (tx, mut _rx) = mpsc::channel(4); let (result_tx, mut result_rx) = mpsc::channel::>(4); let task = IndexBuildTask { + file, file_meta: file_meta.clone(), reason: IndexBuildType::Flush, access_layer: env.access_layer.clone(), @@ -1864,13 +1880,18 @@ mod tests { let (tx, _rx) = mpsc::channel(4); let (result_tx, _result_rx) = mpsc::channel::>(4); + let file_meta = FileMeta { + region_id, + file_id, + file_size: 100, + ..Default::default() + }; + + let file = FileHandle::new(file_meta.clone(), file_purger.clone()); + IndexBuildTask { - file_meta: FileMeta { - region_id, - file_id, - file_size: 100, - ..Default::default() - }, + file, + file_meta, reason, access_layer: env.access_layer.clone(), listener: WorkerListener::default(), diff --git a/src/mito2/src/sst/version.rs b/src/mito2/src/sst/version.rs index 6cae6ce83d..a9e71eb5d9 100644 --- a/src/mito2/src/sst/version.rs +++ b/src/mito2/src/sst/version.rs @@ -57,9 +57,28 @@ impl SstVersion { ) { for file in files_to_add { let level = file.level; + let new_index_version = file.index_version; + // If the file already exists, then we should only replace the handle when the index is outdated. self.levels[level as usize] .files - .insert(file.file_id, FileHandle::new(file, file_purger.clone())); + .entry(file.file_id) + .and_modify(|f| { + if *f.meta_ref() == file || f.meta_ref().is_index_up_to_date(&file) { + // same file meta or current file handle's index is up-to-date, skip adding + if f.index_id().version > new_index_version { + // what does it mean for us to see older index version? + common_telemetry::warn!( + "Adding file with older index version, existing: {:?}, new: {:?}, ignoring new file", + f.meta_ref(), + file + ); + } + } else { + // include case like old file have no index or index is outdated + *f = FileHandle::new(file.clone(), file_purger.clone()); + } + }) + .or_insert_with(|| FileHandle::new(file.clone(), file_purger.clone())); } } diff --git a/src/mito2/src/worker/handle_rebuild_index.rs b/src/mito2/src/worker/handle_rebuild_index.rs index 54f7f8abaf..ed2390d853 100644 --- a/src/mito2/src/worker/handle_rebuild_index.rs +++ b/src/mito2/src/worker/handle_rebuild_index.rs @@ -72,6 +72,7 @@ impl RegionWorkerLoop { }); IndexBuildTask { + file: file.clone(), file_meta: file.meta_ref().clone(), reason: build_type, access_layer: access_layer.clone(), diff --git a/src/store-api/src/storage.rs b/src/store-api/src/storage.rs index 2cafaf027c..36b28b511c 100644 --- a/src/store-api/src/storage.rs +++ b/src/store-api/src/storage.rs @@ -26,6 +26,6 @@ pub use datatypes::schema::{ }; pub use self::descriptors::*; -pub use self::file::{FileId, FileRef, FileRefsManifest, GcReport, ParseIdError}; +pub use self::file::{FileId, FileRef, FileRefsManifest, GcReport, IndexVersion, ParseIdError}; pub use self::requests::{ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector}; pub use self::types::{SequenceNumber, SequenceRange}; diff --git a/src/store-api/src/storage/file.rs b/src/store-api/src/storage/file.rs index e05a818066..bb7490ccf5 100644 --- a/src/store-api/src/storage/file.rs +++ b/src/store-api/src/storage/file.rs @@ -24,6 +24,9 @@ use uuid::Uuid; use crate::ManifestVersion; use crate::storage::RegionId; +/// Index version +pub type IndexVersion = u64; + #[derive(Debug, Snafu, PartialEq)] pub struct ParseIdError { source: uuid::Error, From ba4eda40e5552a1dd50e4c42868d6c9db3efccd5 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Thu, 11 Dec 2025 21:32:11 +0800 Subject: [PATCH 2/2] refactor: optimize heartbeat channel and etcd client keepalive settings (#7390) Signed-off-by: WenyXu --- src/common/meta/src/distributed_time_constants.rs | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/src/common/meta/src/distributed_time_constants.rs b/src/common/meta/src/distributed_time_constants.rs index fefb175860..688e7a424a 100644 --- a/src/common/meta/src/distributed_time_constants.rs +++ b/src/common/meta/src/distributed_time_constants.rs @@ -47,21 +47,16 @@ pub const META_KEEP_ALIVE_INTERVAL_SECS: u64 = META_LEASE_SECS / 2; pub const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS + 1); /// The keep-alive interval of the heartbeat channel. -pub const HEARTBEAT_CHANNEL_KEEP_ALIVE_INTERVAL_SECS: Duration = - Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS + 1); +pub const HEARTBEAT_CHANNEL_KEEP_ALIVE_INTERVAL_SECS: Duration = Duration::from_secs(15); /// The keep-alive timeout of the heartbeat channel. -pub const HEARTBEAT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECS: Duration = - Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS + 1); +pub const HEARTBEAT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECS: Duration = Duration::from_secs(5); /// The default options for the etcd client. pub fn default_etcd_client_options() -> ConnectOptions { ConnectOptions::new() .with_keep_alive_while_idle(true) - .with_keep_alive( - Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS + 1), - Duration::from_secs(10), - ) + .with_keep_alive(Duration::from_secs(15), Duration::from_secs(5)) .with_connect_timeout(Duration::from_secs(10)) }