feat: introduce install_manifest_to for RegionManifestManager (#5742)

* feat: introduce `install_manifest_changes` for `RegionManifestManager`

* chore: rename function to `install_manifest_to`

* Apply suggestions from code review

Co-authored-by: jeremyhi <jiachun_feng@proton.me>

* chore: add comments

* chore: add comments

* chore: update logic and add comments

* chore: add more check

* Update src/mito2/src/manifest/manager.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

---------

Co-authored-by: jeremyhi <jiachun_feng@proton.me>
Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
Weny Xu
2025-03-21 13:19:23 +08:00
committed by GitHub
parent a0739a96e4
commit 4eb0771afe
4 changed files with 388 additions and 8 deletions

View File

@@ -781,6 +781,50 @@ pub enum Error {
#[snafu(display("checksum mismatch (actual: {}, expected: {})", actual, expected))]
ChecksumMismatch { actual: u32, expected: u32 },
#[snafu(display(
"No checkpoint found, region: {}, last_version: {}",
region_id,
last_version
))]
NoCheckpoint {
region_id: RegionId,
last_version: ManifestVersion,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"No manifests found in range: [{}..{}), region: {}, last_version: {}",
start_version,
end_version,
region_id,
last_version
))]
NoManifests {
region_id: RegionId,
start_version: ManifestVersion,
end_version: ManifestVersion,
last_version: ManifestVersion,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Failed to install manifest to {}, region: {}, available manifest version: {}, last version: {}",
target_version,
available_version,
region_id,
last_version
))]
InstallManifestTo {
region_id: RegionId,
target_version: ManifestVersion,
available_version: ManifestVersion,
#[snafu(implicit)]
location: Location,
last_version: ManifestVersion,
},
#[snafu(display("Region {} is stopped", region_id))]
RegionStopped {
region_id: RegionId,
@@ -1019,7 +1063,10 @@ impl ErrorExt for Error {
| OperateAbortedIndex { .. }
| UnexpectedReplay { .. }
| IndexEncodeNull { .. }
| UnexpectedImpureDefault { .. } => StatusCode::Unexpected,
| UnexpectedImpureDefault { .. }
| NoCheckpoint { .. }
| NoManifests { .. }
| InstallManifestTo { .. } => StatusCode::Unexpected,
RegionNotFound { .. } => StatusCode::RegionNotFound,
ObjectStoreNotFound { .. }
| InvalidScanIndex { .. }

View File

@@ -23,7 +23,9 @@ use snafu::{ensure, OptionExt, ResultExt};
use store_api::manifest::{ManifestVersion, MAX_VERSION, MIN_VERSION};
use store_api::metadata::RegionMetadataRef;
use crate::error::{self, RegionStoppedSnafu, Result};
use crate::error::{
self, InstallManifestToSnafu, NoCheckpointSnafu, NoManifestsSnafu, RegionStoppedSnafu, Result,
};
use crate::manifest::action::{
RegionChange, RegionCheckpoint, RegionManifest, RegionManifestBuilder, RegionMetaAction,
RegionMetaActionList,
@@ -197,9 +199,9 @@ impl RegionManifestManager {
let checkpoint = Self::last_checkpoint(&mut store).await?;
let last_checkpoint_version = checkpoint
.as_ref()
.map(|checkpoint| checkpoint.last_version)
.map(|(checkpoint, _)| checkpoint.last_version)
.unwrap_or(MIN_VERSION);
let mut manifest_builder = if let Some(checkpoint) = checkpoint {
let mut manifest_builder = if let Some((checkpoint, _)) = checkpoint {
info!(
"Recover region manifest {} from checkpoint version {}",
options.manifest_dir, checkpoint.last_version
@@ -275,6 +277,153 @@ impl RegionManifestManager {
self.stopped = true;
}
/// Installs the manifest changes from the current version to the target version (inclusive).
///
/// Returns installed version.
/// **Note**: This function is not guaranteed to install the target version strictly.
/// The installed version may be greater than the target version.
pub async fn install_manifest_to(
&mut self,
target_version: ManifestVersion,
) -> Result<ManifestVersion> {
let _t = MANIFEST_OP_ELAPSED
.with_label_values(&["install_manifest_to"])
.start_timer();
// Case 1: If the target version is less than the current version, return the current version.
if self.last_version >= target_version {
debug!(
"Target version {} is less than or equal to the current version {}, region: {}, skip install",
target_version, self.last_version, self.manifest.metadata.region_id
);
return Ok(self.last_version);
}
ensure!(
!self.stopped,
RegionStoppedSnafu {
region_id: self.manifest.metadata.region_id,
}
);
// Fetches manifests from the last version strictly.
let mut manifests = self
.store
// Invariant: last_version < target_version.
.fetch_manifests_strict_from(self.last_version + 1, target_version + 1)
.await?;
// Case 2: No manifests in range: [current_version+1, target_version+1)
//
// |---------Has been deleted------------| [Checkpoint Version]...[Latest Version]
// [Leader region]
// [Current Version]......[Target Version]
// [Follower region]
if manifests.is_empty() {
debug!(
"Manifests are not strict from {}, region: {}, tries to install the last checkpoint",
self.last_version, self.manifest.metadata.region_id
);
let last_version = self.install_last_checkpoint().await?;
// Case 2.1: If the installed checkpoint version is greater than or equal to the target version, return the last version.
if last_version >= target_version {
return Ok(last_version);
}
// Fetches manifests from the installed version strictly.
manifests = self
.store
// Invariant: last_version < target_version.
.fetch_manifests_strict_from(last_version + 1, target_version + 1)
.await?;
}
if manifests.is_empty() {
return NoManifestsSnafu {
region_id: self.manifest.metadata.region_id,
start_version: self.last_version + 1,
end_version: target_version + 1,
last_version: self.last_version,
}
.fail();
}
debug_assert_eq!(manifests.first().unwrap().0, self.last_version + 1);
let mut manifest_builder =
RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()));
for (manifest_version, raw_action_list) in manifests {
self.store
.set_delta_file_size(manifest_version, raw_action_list.len() as u64);
let action_list = RegionMetaActionList::decode(&raw_action_list)?;
for action in action_list.actions {
match action {
RegionMetaAction::Change(action) => {
manifest_builder.apply_change(manifest_version, action);
}
RegionMetaAction::Edit(action) => {
manifest_builder.apply_edit(manifest_version, action);
}
RegionMetaAction::Remove(_) => {
debug!(
"Unhandled action for region {}, action: {:?}",
self.manifest.metadata.region_id, action
);
}
RegionMetaAction::Truncate(action) => {
manifest_builder.apply_truncate(manifest_version, action);
}
}
}
}
let new_manifest = manifest_builder.try_build()?;
ensure!(
new_manifest.manifest_version >= target_version,
InstallManifestToSnafu {
region_id: self.manifest.metadata.region_id,
target_version,
available_version: new_manifest.manifest_version,
last_version: self.last_version,
}
);
let version = self.last_version;
self.manifest = Arc::new(new_manifest);
self.last_version = self.manifest.manifest_version;
info!(
"Install manifest changes from {} to {}, region: {}",
version, self.last_version, self.manifest.metadata.region_id
);
Ok(self.last_version)
}
/// Installs the last checkpoint.
pub(crate) async fn install_last_checkpoint(&mut self) -> Result<ManifestVersion> {
let Some((checkpoint, checkpoint_size)) = Self::last_checkpoint(&mut self.store).await?
else {
return NoCheckpointSnafu {
region_id: self.manifest.metadata.region_id,
last_version: self.last_version,
}
.fail();
};
self.store.reset_manifest_size();
self.store
.set_checkpoint_file_size(checkpoint.last_version, checkpoint_size);
let builder = RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint);
let manifest = builder.try_build()?;
self.last_version = manifest.manifest_version;
self.manifest = Arc::new(manifest);
info!(
"Installed region manifest from checkpoint: {}, region: {}",
checkpoint.last_version, self.manifest.metadata.region_id
);
Ok(self.last_version)
}
/// Updates the manifest. Returns the current manifest version number.
pub async fn update(&mut self, action_list: RegionMetaActionList) -> Result<ManifestVersion> {
let _t = MANIFEST_OP_ELAPSED
@@ -371,14 +520,17 @@ impl RegionManifestManager {
}
/// Fetches the last [RegionCheckpoint] from storage.
///
/// If the checkpoint is not found, returns `None`.
/// Otherwise, returns the checkpoint and the size of the checkpoint.
pub(crate) async fn last_checkpoint(
store: &mut ManifestObjectStore,
) -> Result<Option<RegionCheckpoint>> {
) -> Result<Option<(RegionCheckpoint, u64)>> {
let last_checkpoint = store.load_last_checkpoint().await?;
if let Some((_, bytes)) = last_checkpoint {
let checkpoint = RegionCheckpoint::decode(&bytes)?;
Ok(Some(checkpoint))
Ok(Some((checkpoint, bytes.len() as u64)))
} else {
Ok(None)
}

View File

@@ -236,7 +236,31 @@ impl ManifestObjectStore {
Ok(entries)
}
/// Fetch all manifests in concurrent.
/// Fetches manifests in range [start_version, end_version).
///
/// This functions is guaranteed to return manifests from the `start_version` strictly (must contain `start_version`).
pub async fn fetch_manifests_strict_from(
&self,
start_version: ManifestVersion,
end_version: ManifestVersion,
) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
let mut manifests = self.fetch_manifests(start_version, end_version).await?;
let start_index = manifests.iter().position(|(v, _)| *v == start_version);
debug!(
"fetches manifests in range [{},{}), start_index: {:?}",
start_version, end_version, start_index
);
if let Some(start_index) = start_index {
Ok(manifests.split_off(start_index))
} else {
Ok(vec![])
}
}
/// Fetch all manifests in concurrent, and return the manifests in range [start_version, end_version)
///
/// **Notes**: This function is no guarantee to return manifests from the `start_version` strictly.
/// Uses [fetch_manifests_strict_from](ManifestObjectStore::fetch_manifests_strict_from) to get manifests from the `start_version`.
pub async fn fetch_manifests(
&self,
start_version: ManifestVersion,
@@ -576,6 +600,12 @@ impl ManifestObjectStore {
self.manifest_size_map.read().unwrap().values().sum()
}
/// Resets the size of all files.
pub(crate) fn reset_manifest_size(&mut self) {
self.manifest_size_map.write().unwrap().clear();
self.total_manifest_size.store(0, Ordering::Relaxed);
}
/// Set the size of the delta file by delta version.
pub(crate) fn set_delta_file_size(&mut self, version: ManifestVersion, size: u64) {
let mut m = self.manifest_size_map.write().unwrap();
@@ -585,7 +615,7 @@ impl ManifestObjectStore {
}
/// Set the size of the checkpoint file by checkpoint version.
fn set_checkpoint_file_size(&self, version: ManifestVersion, size: u64) {
pub(crate) fn set_checkpoint_file_size(&self, version: ManifestVersion, size: u64) {
let mut m = self.manifest_size_map.write().unwrap();
m.insert(FileKey::Checkpoint(version), size);
@@ -595,6 +625,7 @@ impl ManifestObjectStore {
fn unset_file_size(&self, key: &FileKey) {
let mut m = self.manifest_size_map.write().unwrap();
if let Some(val) = m.remove(key) {
debug!("Unset file size: {:?}, size: {}", key, val);
self.dec_total_manifest_size(val);
}
}

View File

@@ -44,6 +44,18 @@ async fn build_manager(
(env, manager)
}
async fn build_manager_with_initial_metadata(
env: &TestEnv,
checkpoint_distance: u64,
compress_type: CompressionType,
) -> RegionManifestManager {
let metadata = Arc::new(basic_region_metadata());
env.create_manifest_manager(compress_type, checkpoint_distance, Some(metadata.clone()))
.await
.unwrap()
.unwrap()
}
async fn reopen_manager(
env: &TestEnv,
checkpoint_distance: u64,
@@ -265,4 +277,142 @@ async fn generate_checkpoint_with_compression_types(
.await
.unwrap()
.unwrap()
.0
}
fn generate_action_lists(num: usize) -> (Vec<FileId>, Vec<RegionMetaActionList>) {
let mut files = vec![];
let mut actions = vec![];
for _ in 0..num {
let file_id = FileId::random();
files.push(file_id);
let file_meta = FileMeta {
region_id: RegionId::new(123, 456),
file_id,
time_range: (0.into(), 10000000.into()),
level: 0,
file_size: 1024000,
available_indexes: Default::default(),
index_file_size: 0,
num_rows: 0,
num_row_groups: 0,
sequence: None,
};
let action = RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit {
files_to_add: vec![file_meta],
files_to_remove: vec![],
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
})]);
actions.push(action);
}
(files, actions)
}
#[tokio::test]
async fn manifest_install_manifest_to() {
common_telemetry::init_default_ut_logging();
let (env, mut manager) = build_manager(0, CompressionType::Uncompressed).await;
let (files, actions) = generate_action_lists(10);
for action in actions {
manager.update(action).await.unwrap();
}
// Nothing to install
let target_version = manager.manifest().manifest_version;
let installed_version = manager.install_manifest_to(target_version).await.unwrap();
assert_eq!(target_version, installed_version);
let mut another_manager =
build_manager_with_initial_metadata(&env, 0, CompressionType::Uncompressed).await;
// install manifest changes
let target_version = manager.manifest().manifest_version;
let installed_version = another_manager
.install_manifest_to(target_version - 1)
.await
.unwrap();
assert_eq!(target_version - 1, installed_version);
for file_id in files[0..9].iter() {
assert!(another_manager.manifest().files.contains_key(file_id));
}
let installed_version = another_manager
.install_manifest_to(target_version)
.await
.unwrap();
assert_eq!(target_version, installed_version);
for file_id in files.iter() {
assert!(another_manager.manifest().files.contains_key(file_id));
}
}
#[tokio::test]
async fn manifest_install_manifest_to_with_checkpoint() {
common_telemetry::init_default_ut_logging();
let (env, mut manager) = build_manager(3, CompressionType::Uncompressed).await;
let (files, actions) = generate_action_lists(10);
for action in actions {
manager.update(action).await.unwrap();
while manager.checkpointer().is_doing_checkpoint() {
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
// has checkpoint
assert!(manager
.store()
.load_last_checkpoint()
.await
.unwrap()
.is_some());
// check files
let mut expected = vec![
"/",
"00000000000000000006.checkpoint",
"00000000000000000007.json",
"00000000000000000008.json",
"00000000000000000009.checkpoint",
"00000000000000000009.json",
"00000000000000000010.json",
"_last_checkpoint",
];
expected.sort_unstable();
let mut paths = manager
.store()
.get_paths(|e| Some(e.name().to_string()))
.await
.unwrap();
paths.sort_unstable();
assert_eq!(expected, paths);
let mut another_manager =
build_manager_with_initial_metadata(&env, 0, CompressionType::Uncompressed).await;
// Install 9 manifests
let target_version = manager.manifest().manifest_version;
let installed_version = another_manager
.install_manifest_to(target_version - 1)
.await
.unwrap();
assert_eq!(target_version - 1, installed_version);
for file_id in files[0..9].iter() {
assert!(another_manager.manifest().files.contains_key(file_id));
}
// Install all manifests
let target_version = manager.manifest().manifest_version;
let installed_version = another_manager
.install_manifest_to(target_version)
.await
.unwrap();
assert_eq!(target_version, installed_version);
for file_id in files.iter() {
assert!(another_manager.manifest().files.contains_key(file_id));
}
assert_eq!(4217, another_manager.store().total_manifest_size());
}