From 4eb0771afe22959745f7f5c53d08d42a92efaff5 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Fri, 21 Mar 2025 13:19:23 +0800 Subject: [PATCH] feat: introduce `install_manifest_to` for `RegionManifestManager` (#5742) * feat: introduce `install_manifest_changes` for `RegionManifestManager` * chore: rename function to `install_manifest_to` * Apply suggestions from code review Co-authored-by: jeremyhi * chore: add comments * chore: add comments * chore: update logic and add comments * chore: add more check * Update src/mito2/src/manifest/manager.rs Co-authored-by: Yingwen --------- Co-authored-by: jeremyhi Co-authored-by: Yingwen --- src/mito2/src/error.rs | 49 ++++++- src/mito2/src/manifest/manager.rs | 162 ++++++++++++++++++++- src/mito2/src/manifest/storage.rs | 35 ++++- src/mito2/src/manifest/tests/checkpoint.rs | 150 +++++++++++++++++++ 4 files changed, 388 insertions(+), 8 deletions(-) diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index e5d494f57f..5df76f1e5f 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -781,6 +781,50 @@ pub enum Error { #[snafu(display("checksum mismatch (actual: {}, expected: {})", actual, expected))] ChecksumMismatch { actual: u32, expected: u32 }, + #[snafu(display( + "No checkpoint found, region: {}, last_version: {}", + region_id, + last_version + ))] + NoCheckpoint { + region_id: RegionId, + last_version: ManifestVersion, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display( + "No manifests found in range: [{}..{}), region: {}, last_version: {}", + start_version, + end_version, + region_id, + last_version + ))] + NoManifests { + region_id: RegionId, + start_version: ManifestVersion, + end_version: ManifestVersion, + last_version: ManifestVersion, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display( + "Failed to install manifest to {}, region: {}, available manifest version: {}, last version: {}", + target_version, + available_version, + region_id, + last_version + ))] + InstallManifestTo { + region_id: RegionId, + target_version: ManifestVersion, + available_version: ManifestVersion, + #[snafu(implicit)] + location: Location, + last_version: ManifestVersion, + }, + #[snafu(display("Region {} is stopped", region_id))] RegionStopped { region_id: RegionId, @@ -1019,7 +1063,10 @@ impl ErrorExt for Error { | OperateAbortedIndex { .. } | UnexpectedReplay { .. } | IndexEncodeNull { .. } - | UnexpectedImpureDefault { .. } => StatusCode::Unexpected, + | UnexpectedImpureDefault { .. } + | NoCheckpoint { .. } + | NoManifests { .. } + | InstallManifestTo { .. } => StatusCode::Unexpected, RegionNotFound { .. } => StatusCode::RegionNotFound, ObjectStoreNotFound { .. } | InvalidScanIndex { .. } diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index b77111d570..ead8ce7bfc 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -23,7 +23,9 @@ use snafu::{ensure, OptionExt, ResultExt}; use store_api::manifest::{ManifestVersion, MAX_VERSION, MIN_VERSION}; use store_api::metadata::RegionMetadataRef; -use crate::error::{self, RegionStoppedSnafu, Result}; +use crate::error::{ + self, InstallManifestToSnafu, NoCheckpointSnafu, NoManifestsSnafu, RegionStoppedSnafu, Result, +}; use crate::manifest::action::{ RegionChange, RegionCheckpoint, RegionManifest, RegionManifestBuilder, RegionMetaAction, RegionMetaActionList, @@ -197,9 +199,9 @@ impl RegionManifestManager { let checkpoint = Self::last_checkpoint(&mut store).await?; let last_checkpoint_version = checkpoint .as_ref() - .map(|checkpoint| checkpoint.last_version) + .map(|(checkpoint, _)| checkpoint.last_version) .unwrap_or(MIN_VERSION); - let mut manifest_builder = if let Some(checkpoint) = checkpoint { + let mut manifest_builder = if let Some((checkpoint, _)) = checkpoint { info!( "Recover region manifest {} from checkpoint version {}", options.manifest_dir, checkpoint.last_version @@ -275,6 +277,153 @@ impl RegionManifestManager { self.stopped = true; } + /// Installs the manifest changes from the current version to the target version (inclusive). + /// + /// Returns installed version. + /// **Note**: This function is not guaranteed to install the target version strictly. + /// The installed version may be greater than the target version. + pub async fn install_manifest_to( + &mut self, + target_version: ManifestVersion, + ) -> Result { + let _t = MANIFEST_OP_ELAPSED + .with_label_values(&["install_manifest_to"]) + .start_timer(); + + // Case 1: If the target version is less than the current version, return the current version. + if self.last_version >= target_version { + debug!( + "Target version {} is less than or equal to the current version {}, region: {}, skip install", + target_version, self.last_version, self.manifest.metadata.region_id + ); + return Ok(self.last_version); + } + + ensure!( + !self.stopped, + RegionStoppedSnafu { + region_id: self.manifest.metadata.region_id, + } + ); + + // Fetches manifests from the last version strictly. + let mut manifests = self + .store + // Invariant: last_version < target_version. + .fetch_manifests_strict_from(self.last_version + 1, target_version + 1) + .await?; + + // Case 2: No manifests in range: [current_version+1, target_version+1) + // + // |---------Has been deleted------------| [Checkpoint Version]...[Latest Version] + // [Leader region] + // [Current Version]......[Target Version] + // [Follower region] + if manifests.is_empty() { + debug!( + "Manifests are not strict from {}, region: {}, tries to install the last checkpoint", + self.last_version, self.manifest.metadata.region_id + ); + let last_version = self.install_last_checkpoint().await?; + // Case 2.1: If the installed checkpoint version is greater than or equal to the target version, return the last version. + if last_version >= target_version { + return Ok(last_version); + } + + // Fetches manifests from the installed version strictly. + manifests = self + .store + // Invariant: last_version < target_version. + .fetch_manifests_strict_from(last_version + 1, target_version + 1) + .await?; + } + + if manifests.is_empty() { + return NoManifestsSnafu { + region_id: self.manifest.metadata.region_id, + start_version: self.last_version + 1, + end_version: target_version + 1, + last_version: self.last_version, + } + .fail(); + } + + debug_assert_eq!(manifests.first().unwrap().0, self.last_version + 1); + let mut manifest_builder = + RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone())); + + for (manifest_version, raw_action_list) in manifests { + self.store + .set_delta_file_size(manifest_version, raw_action_list.len() as u64); + let action_list = RegionMetaActionList::decode(&raw_action_list)?; + for action in action_list.actions { + match action { + RegionMetaAction::Change(action) => { + manifest_builder.apply_change(manifest_version, action); + } + RegionMetaAction::Edit(action) => { + manifest_builder.apply_edit(manifest_version, action); + } + RegionMetaAction::Remove(_) => { + debug!( + "Unhandled action for region {}, action: {:?}", + self.manifest.metadata.region_id, action + ); + } + RegionMetaAction::Truncate(action) => { + manifest_builder.apply_truncate(manifest_version, action); + } + } + } + } + + let new_manifest = manifest_builder.try_build()?; + ensure!( + new_manifest.manifest_version >= target_version, + InstallManifestToSnafu { + region_id: self.manifest.metadata.region_id, + target_version, + available_version: new_manifest.manifest_version, + last_version: self.last_version, + } + ); + + let version = self.last_version; + self.manifest = Arc::new(new_manifest); + self.last_version = self.manifest.manifest_version; + info!( + "Install manifest changes from {} to {}, region: {}", + version, self.last_version, self.manifest.metadata.region_id + ); + + Ok(self.last_version) + } + + /// Installs the last checkpoint. + pub(crate) async fn install_last_checkpoint(&mut self) -> Result { + let Some((checkpoint, checkpoint_size)) = Self::last_checkpoint(&mut self.store).await? + else { + return NoCheckpointSnafu { + region_id: self.manifest.metadata.region_id, + last_version: self.last_version, + } + .fail(); + }; + self.store.reset_manifest_size(); + self.store + .set_checkpoint_file_size(checkpoint.last_version, checkpoint_size); + let builder = RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint); + let manifest = builder.try_build()?; + self.last_version = manifest.manifest_version; + self.manifest = Arc::new(manifest); + info!( + "Installed region manifest from checkpoint: {}, region: {}", + checkpoint.last_version, self.manifest.metadata.region_id + ); + + Ok(self.last_version) + } + /// Updates the manifest. Returns the current manifest version number. pub async fn update(&mut self, action_list: RegionMetaActionList) -> Result { let _t = MANIFEST_OP_ELAPSED @@ -371,14 +520,17 @@ impl RegionManifestManager { } /// Fetches the last [RegionCheckpoint] from storage. + /// + /// If the checkpoint is not found, returns `None`. + /// Otherwise, returns the checkpoint and the size of the checkpoint. pub(crate) async fn last_checkpoint( store: &mut ManifestObjectStore, - ) -> Result> { + ) -> Result> { let last_checkpoint = store.load_last_checkpoint().await?; if let Some((_, bytes)) = last_checkpoint { let checkpoint = RegionCheckpoint::decode(&bytes)?; - Ok(Some(checkpoint)) + Ok(Some((checkpoint, bytes.len() as u64))) } else { Ok(None) } diff --git a/src/mito2/src/manifest/storage.rs b/src/mito2/src/manifest/storage.rs index be0ead8f88..c0ee01ba60 100644 --- a/src/mito2/src/manifest/storage.rs +++ b/src/mito2/src/manifest/storage.rs @@ -236,7 +236,31 @@ impl ManifestObjectStore { Ok(entries) } - /// Fetch all manifests in concurrent. + /// Fetches manifests in range [start_version, end_version). + /// + /// This functions is guaranteed to return manifests from the `start_version` strictly (must contain `start_version`). + pub async fn fetch_manifests_strict_from( + &self, + start_version: ManifestVersion, + end_version: ManifestVersion, + ) -> Result)>> { + let mut manifests = self.fetch_manifests(start_version, end_version).await?; + let start_index = manifests.iter().position(|(v, _)| *v == start_version); + debug!( + "fetches manifests in range [{},{}), start_index: {:?}", + start_version, end_version, start_index + ); + if let Some(start_index) = start_index { + Ok(manifests.split_off(start_index)) + } else { + Ok(vec![]) + } + } + + /// Fetch all manifests in concurrent, and return the manifests in range [start_version, end_version) + /// + /// **Notes**: This function is no guarantee to return manifests from the `start_version` strictly. + /// Uses [fetch_manifests_strict_from](ManifestObjectStore::fetch_manifests_strict_from) to get manifests from the `start_version`. pub async fn fetch_manifests( &self, start_version: ManifestVersion, @@ -576,6 +600,12 @@ impl ManifestObjectStore { self.manifest_size_map.read().unwrap().values().sum() } + /// Resets the size of all files. + pub(crate) fn reset_manifest_size(&mut self) { + self.manifest_size_map.write().unwrap().clear(); + self.total_manifest_size.store(0, Ordering::Relaxed); + } + /// Set the size of the delta file by delta version. pub(crate) fn set_delta_file_size(&mut self, version: ManifestVersion, size: u64) { let mut m = self.manifest_size_map.write().unwrap(); @@ -585,7 +615,7 @@ impl ManifestObjectStore { } /// Set the size of the checkpoint file by checkpoint version. - fn set_checkpoint_file_size(&self, version: ManifestVersion, size: u64) { + pub(crate) fn set_checkpoint_file_size(&self, version: ManifestVersion, size: u64) { let mut m = self.manifest_size_map.write().unwrap(); m.insert(FileKey::Checkpoint(version), size); @@ -595,6 +625,7 @@ impl ManifestObjectStore { fn unset_file_size(&self, key: &FileKey) { let mut m = self.manifest_size_map.write().unwrap(); if let Some(val) = m.remove(key) { + debug!("Unset file size: {:?}, size: {}", key, val); self.dec_total_manifest_size(val); } } diff --git a/src/mito2/src/manifest/tests/checkpoint.rs b/src/mito2/src/manifest/tests/checkpoint.rs index 332d94be12..2ebf7cd5bf 100644 --- a/src/mito2/src/manifest/tests/checkpoint.rs +++ b/src/mito2/src/manifest/tests/checkpoint.rs @@ -44,6 +44,18 @@ async fn build_manager( (env, manager) } +async fn build_manager_with_initial_metadata( + env: &TestEnv, + checkpoint_distance: u64, + compress_type: CompressionType, +) -> RegionManifestManager { + let metadata = Arc::new(basic_region_metadata()); + env.create_manifest_manager(compress_type, checkpoint_distance, Some(metadata.clone())) + .await + .unwrap() + .unwrap() +} + async fn reopen_manager( env: &TestEnv, checkpoint_distance: u64, @@ -265,4 +277,142 @@ async fn generate_checkpoint_with_compression_types( .await .unwrap() .unwrap() + .0 +} + +fn generate_action_lists(num: usize) -> (Vec, Vec) { + let mut files = vec![]; + let mut actions = vec![]; + for _ in 0..num { + let file_id = FileId::random(); + files.push(file_id); + let file_meta = FileMeta { + region_id: RegionId::new(123, 456), + file_id, + time_range: (0.into(), 10000000.into()), + level: 0, + file_size: 1024000, + available_indexes: Default::default(), + index_file_size: 0, + num_rows: 0, + num_row_groups: 0, + sequence: None, + }; + let action = RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit { + files_to_add: vec![file_meta], + files_to_remove: vec![], + compaction_time_window: None, + flushed_entry_id: None, + flushed_sequence: None, + })]); + actions.push(action); + } + (files, actions) +} + +#[tokio::test] +async fn manifest_install_manifest_to() { + common_telemetry::init_default_ut_logging(); + let (env, mut manager) = build_manager(0, CompressionType::Uncompressed).await; + let (files, actions) = generate_action_lists(10); + for action in actions { + manager.update(action).await.unwrap(); + } + + // Nothing to install + let target_version = manager.manifest().manifest_version; + let installed_version = manager.install_manifest_to(target_version).await.unwrap(); + assert_eq!(target_version, installed_version); + + let mut another_manager = + build_manager_with_initial_metadata(&env, 0, CompressionType::Uncompressed).await; + + // install manifest changes + let target_version = manager.manifest().manifest_version; + let installed_version = another_manager + .install_manifest_to(target_version - 1) + .await + .unwrap(); + assert_eq!(target_version - 1, installed_version); + for file_id in files[0..9].iter() { + assert!(another_manager.manifest().files.contains_key(file_id)); + } + + let installed_version = another_manager + .install_manifest_to(target_version) + .await + .unwrap(); + assert_eq!(target_version, installed_version); + for file_id in files.iter() { + assert!(another_manager.manifest().files.contains_key(file_id)); + } +} + +#[tokio::test] +async fn manifest_install_manifest_to_with_checkpoint() { + common_telemetry::init_default_ut_logging(); + let (env, mut manager) = build_manager(3, CompressionType::Uncompressed).await; + let (files, actions) = generate_action_lists(10); + for action in actions { + manager.update(action).await.unwrap(); + + while manager.checkpointer().is_doing_checkpoint() { + tokio::time::sleep(Duration::from_millis(10)).await; + } + } + + // has checkpoint + assert!(manager + .store() + .load_last_checkpoint() + .await + .unwrap() + .is_some()); + + // check files + let mut expected = vec![ + "/", + "00000000000000000006.checkpoint", + "00000000000000000007.json", + "00000000000000000008.json", + "00000000000000000009.checkpoint", + "00000000000000000009.json", + "00000000000000000010.json", + "_last_checkpoint", + ]; + expected.sort_unstable(); + let mut paths = manager + .store() + .get_paths(|e| Some(e.name().to_string())) + .await + .unwrap(); + + paths.sort_unstable(); + assert_eq!(expected, paths); + + let mut another_manager = + build_manager_with_initial_metadata(&env, 0, CompressionType::Uncompressed).await; + + // Install 9 manifests + let target_version = manager.manifest().manifest_version; + let installed_version = another_manager + .install_manifest_to(target_version - 1) + .await + .unwrap(); + assert_eq!(target_version - 1, installed_version); + for file_id in files[0..9].iter() { + assert!(another_manager.manifest().files.contains_key(file_id)); + } + + // Install all manifests + let target_version = manager.manifest().manifest_version; + let installed_version = another_manager + .install_manifest_to(target_version) + .await + .unwrap(); + assert_eq!(target_version, installed_version); + for file_id in files.iter() { + assert!(another_manager.manifest().files.contains_key(file_id)); + } + assert_eq!(4217, another_manager.store().total_manifest_size()); }