feat: exiting staging mode on success case (#6913)

* clear staging directories on entering staging mode

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* test and merger

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* storage accessor

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* exit on success

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* use remove_all

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-09-09 18:52:55 -07:00
committed by GitHub
parent 83290be3ba
commit 2f6da3b718
5 changed files with 382 additions and 47 deletions

View File

@@ -263,3 +263,115 @@ async fn test_staging_manifest_directory() {
"Staging manifest directory should contain files"
);
}
#[tokio::test]
async fn test_staging_exit_success_with_manifests() {
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1024, 0);
let request = CreateRequestBuilder::new().build();
let column_schemas = rows_schema(&request);
// Create region
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Enter staging mode
engine
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::StagingLeader)
.await
.unwrap();
// Add some data and flush in staging mode to generate staging manifests
let rows_data = Rows {
schema: column_schemas.clone(),
rows: build_rows(0, 5),
};
put_rows(&engine, region_id, rows_data).await;
// Force flush to generate staging manifests
engine
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
)
.await
.unwrap();
// Add more data and flush again to generate multiple staging manifests
let rows_data2 = Rows {
schema: column_schemas.clone(),
rows: build_rows(5, 10),
};
put_rows(&engine, region_id, rows_data2).await;
engine
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
)
.await
.unwrap();
// Verify we're in staging mode and staging manifests exist
let data_home = env.data_home();
let region_dir = format!("{}/data/test/1024_0000000000", data_home.display());
let staging_manifest_dir = format!("{}/staging/manifest", region_dir);
let staging_files_before = fs::read_dir(&staging_manifest_dir)
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert!(
!staging_files_before.is_empty(),
"Staging manifest directory should contain files before exit"
);
// Count normal manifest files before exit
let normal_manifest_dir = format!("{}/manifest", region_dir);
let normal_files_before = fs::read_dir(&normal_manifest_dir)
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
let normal_count_before = normal_files_before.len();
// Exit staging mode successfully
engine
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::Leader)
.await
.unwrap();
// Verify we're back in normal mode
let workers = &engine.inner.workers;
let region = workers.get_region(region_id).unwrap();
assert!(
!region.is_staging(),
"Region should no longer be in staging mode"
);
// Verify staging manifests have been cleared
let staging_files_after = fs::read_dir(&staging_manifest_dir)
.map(|entries| entries.collect::<Result<Vec<_>, _>>().unwrap_or_default())
.unwrap_or_default();
assert!(
staging_files_after.is_empty(),
"Staging manifest directory should be empty after successful exit"
);
// Verify normal manifests contain the merged changes
let normal_files_after = fs::read_dir(&normal_manifest_dir)
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert!(
normal_files_after.len() > normal_count_before,
"Normal manifest directory should contain more files after merge"
);
}

View File

@@ -547,7 +547,7 @@ impl RegionManifestManager {
let streamer =
self.store
.manifest_lister()
.manifest_lister(false)
.await?
.context(error::EmptyManifestDirSnafu {
manifest_dir: self.store.manifest_dir(),
@@ -611,6 +611,63 @@ impl RegionManifestManager {
pub(crate) fn checkpointer(&self) -> &Checkpointer {
&self.checkpointer
}
/// Merge all staged manifest actions into a single action list ready for submission.
/// This collects all staging manifests, applies them sequentially, and returns the merged actions.
pub(crate) async fn merge_staged_actions(
&mut self,
region_state: RegionRoleState,
) -> Result<Option<RegionMetaActionList>> {
// Only merge if we're in staging mode
if region_state != RegionRoleState::Leader(RegionLeaderState::Staging) {
return Ok(None);
}
// Fetch all staging manifests
let staging_manifests = self.store.fetch_staging_manifests().await?;
if staging_manifests.is_empty() {
info!(
"No staging manifests to merge for region {}",
self.manifest.metadata.region_id
);
return Ok(None);
}
info!(
"Merging {} staging manifests for region {}",
staging_manifests.len(),
self.manifest.metadata.region_id
);
// Start with current manifest state as the base
let mut merged_actions = Vec::new();
let mut latest_version = self.last_version();
// Apply all staging actions in order
for (manifest_version, raw_action_list) in staging_manifests {
let action_list = RegionMetaActionList::decode(&raw_action_list)?;
for action in action_list.actions {
merged_actions.push(action);
}
latest_version = latest_version.max(manifest_version);
}
if merged_actions.is_empty() {
return Ok(None);
}
info!(
"Successfully merged {} actions from staging manifests for region {}, latest version: {}",
merged_actions.len(),
self.manifest.metadata.region_id,
latest_version
);
Ok(Some(RegionMetaActionList::new(merged_actions)))
}
}
#[cfg(test)]

View File

@@ -189,26 +189,31 @@ impl ManifestObjectStore {
&self.path
}
/// Returns a iterator of manifests.
pub(crate) async fn manifest_lister(&self) -> Result<Option<Lister>> {
match self.object_store.lister_with(&self.path).await {
/// Returns an iterator of manifests from normal or staging directory.
pub(crate) async fn manifest_lister(&self, is_staging: bool) -> Result<Option<Lister>> {
let path = if is_staging {
&self.staging_path
} else {
&self.path
};
match self.object_store.lister_with(path).await {
Ok(streamer) => Ok(Some(streamer)),
Err(e) if e.kind() == ErrorKind::NotFound => {
debug!("Manifest directory does not exists: {}", self.path);
debug!("Manifest directory does not exist: {}", path);
Ok(None)
}
Err(e) => Err(e).context(OpenDalSnafu)?,
}
}
/// Return all `R`s in the root directory that meet the `filter` conditions (that is, the `filter` closure returns `Some(R)`),
/// Return all `R`s in the directory that meet the `filter` conditions (that is, the `filter` closure returns `Some(R)`),
/// and discard `R` that does not meet the conditions (that is, the `filter` closure returns `None`)
/// Return an empty vector when directory is not found.
pub async fn get_paths<F, R>(&self, filter: F) -> Result<Vec<R>>
pub async fn get_paths<F, R>(&self, filter: F, is_staging: bool) -> Result<Vec<R>>
where
F: Fn(Entry) -> Option<R>,
{
let Some(streamer) = self.manifest_lister().await? else {
let Some(streamer) = self.manifest_lister(is_staging).await? else {
return Ok(vec![]);
};
@@ -233,16 +238,19 @@ impl ManifestObjectStore {
ensure!(start <= end, InvalidScanIndexSnafu { start, end });
let mut entries: Vec<(ManifestVersion, Entry)> = self
.get_paths(|entry| {
let file_name = entry.name();
if is_delta_file(file_name) {
let version = file_version(file_name);
if start <= version && version < end {
return Some((version, entry));
.get_paths(
|entry| {
let file_name = entry.name();
if is_delta_file(file_name) {
let version = file_version(file_name);
if start <= version && version < end {
return Some((version, entry));
}
}
}
None
})
None
},
false,
)
.await?;
Self::sort_manifests(&mut entries);
@@ -276,21 +284,19 @@ impl ManifestObjectStore {
}
}
/// Fetch all manifests in concurrent, and return the manifests in range [start_version, end_version)
///
/// **Notes**: This function is no guarantee to return manifests from the `start_version` strictly.
/// Uses [fetch_manifests_strict_from](ManifestObjectStore::fetch_manifests_strict_from) to get manifests from the `start_version`.
pub async fn fetch_manifests(
/// Common implementation for fetching manifests from entries in parallel.
async fn fetch_manifests_from_entries(
&self,
start_version: ManifestVersion,
end_version: ManifestVersion,
entries: Vec<(ManifestVersion, Entry)>,
) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
let manifests = self.scan(start_version, end_version).await?;
if entries.is_empty() {
return Ok(vec![]);
}
// TODO(weny): Make it configurable.
let semaphore = Semaphore::new(FETCH_MANIFEST_PARALLELISM);
let tasks = manifests.iter().map(|(v, entry)| async {
let tasks = entries.iter().map(|(v, entry)| async {
// Safety: semaphore must exist.
let _permit = semaphore.acquire().await.unwrap();
@@ -313,6 +319,19 @@ impl ManifestObjectStore {
try_join_all(tasks).await
}
/// Fetch all manifests in concurrent, and return the manifests in range [start_version, end_version)
///
/// **Notes**: This function is no guarantee to return manifests from the `start_version` strictly.
/// Uses [fetch_manifests_strict_from](ManifestObjectStore::fetch_manifests_strict_from) to get manifests from the `start_version`.
pub async fn fetch_manifests(
&self,
start_version: ManifestVersion,
end_version: ManifestVersion,
) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
let manifests = self.scan(start_version, end_version).await?;
self.fetch_manifests_from_entries(manifests).await
}
/// Delete manifest files that version < end.
/// If keep_last_checkpoint is true, the last checkpoint file will be kept.
/// ### Return
@@ -324,17 +343,20 @@ impl ManifestObjectStore {
) -> Result<usize> {
// Stores (entry, is_checkpoint, version) in a Vec.
let entries: Vec<_> = self
.get_paths(|entry| {
let file_name = entry.name();
let is_checkpoint = is_checkpoint_file(file_name);
if is_delta_file(file_name) || is_checkpoint_file(file_name) {
let version = file_version(file_name);
if version < end {
return Some((entry, is_checkpoint, version));
.get_paths(
|entry| {
let file_name = entry.name();
let is_checkpoint = is_checkpoint_file(file_name);
if is_delta_file(file_name) || is_checkpoint_file(file_name) {
let version = file_version(file_name);
if version < end {
return Some((entry, is_checkpoint, version));
}
}
}
None
})
None
},
false,
)
.await?;
let checkpoint_version = if keep_last_checkpoint {
// Note that the order of entries is unspecific.
@@ -653,6 +675,44 @@ impl ManifestObjectStore {
fn dec_total_manifest_size(&self, val: u64) {
self.total_manifest_size.fetch_sub(val, Ordering::Relaxed);
}
/// Fetch all staging manifest files and return them as (version, action_list) pairs.
pub async fn fetch_staging_manifests(&self) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
let manifest_entries = self
.get_paths(
|entry| {
let file_name = entry.name();
if is_delta_file(file_name) {
let version = file_version(file_name);
Some((version, entry))
} else {
None
}
},
true,
)
.await?;
let mut sorted_entries = manifest_entries;
Self::sort_manifests(&mut sorted_entries);
self.fetch_manifests_from_entries(sorted_entries).await
}
/// Clear all staging manifest files.
pub async fn clear_staging_manifests(&mut self) -> Result<()> {
self.object_store
.remove_all(&self.staging_path)
.await
.context(OpenDalSnafu)?;
debug!(
"Cleared all staging manifest files from {}",
self.staging_path
);
Ok(())
}
}
#[derive(Serialize, Deserialize, Debug)]

View File

@@ -123,7 +123,7 @@ async fn manager_without_checkpoint() {
expected.sort_unstable();
let mut paths = manager
.store()
.get_paths(|e| Some(e.name().to_string()))
.get_paths(|e| Some(e.name().to_string()), false)
.await
.unwrap();
paths.sort_unstable();
@@ -171,7 +171,7 @@ async fn manager_with_checkpoint_distance_1() {
expected.sort_unstable();
let mut paths = manager
.store()
.get_paths(|e| Some(e.name().to_string()))
.get_paths(|e| Some(e.name().to_string()), false)
.await
.unwrap();
paths.sort_unstable();
@@ -427,7 +427,7 @@ async fn manifest_install_manifest_to_with_checkpoint() {
expected.sort_unstable();
let mut paths = manager
.store()
.get_paths(|e| Some(e.name().to_string()))
.get_paths(|e| Some(e.name().to_string()), false)
.await
.unwrap();

View File

@@ -35,6 +35,7 @@ use store_api::region_engine::{
};
use store_api::sst_entry::ManifestSstEntry;
use store_api::storage::{RegionId, SequenceNumber};
use tokio::sync::RwLockWriteGuard;
use crate::access_layer::AccessLayerRef;
use crate::error::{
@@ -299,9 +300,16 @@ impl MitoRegion {
}
/// Sets the staging state.
///
/// You should call this method in the worker loop.
/// Transitions from Writable to Staging state.
pub(crate) fn set_staging(&self) -> Result<()> {
/// Cleans any existing staging manifests before entering staging mode.
pub(crate) async fn set_staging(
&self,
manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
) -> Result<()> {
manager.store().clear_staging_manifests().await?;
self.compare_exchange_state(
RegionLeaderState::Writable,
RegionRoleState::Leader(RegionLeaderState::Staging),
@@ -309,9 +317,10 @@ impl MitoRegion {
}
/// Exits the staging state back to writable.
///
/// You should call this method in the worker loop.
/// Transitions from Staging to Writable state.
pub(crate) fn exit_staging(&self) -> Result<()> {
fn exit_staging(&self) -> Result<()> {
self.compare_exchange_state(
RegionLeaderState::Staging,
RegionRoleState::Leader(RegionLeaderState::Writable),
@@ -333,7 +342,8 @@ impl MitoRegion {
match current_state {
RegionRoleState::Leader(RegionLeaderState::Staging) => {
info!("Exiting staging mode for region {}", self.region_id);
self.exit_staging()?;
// Use the success exit path that merges all staged manifests
self.exit_staging_on_success(&mut manager).await?;
}
RegionRoleState::Leader(RegionLeaderState::Writable) => {
// Already in desired state - no-op
@@ -357,7 +367,7 @@ impl MitoRegion {
match current_state {
RegionRoleState::Leader(RegionLeaderState::Writable) => {
info!("Entering staging mode for region {}", self.region_id);
self.set_staging()?;
self.set_staging(&mut manager).await?;
}
RegionRoleState::Leader(RegionLeaderState::Staging) => {
// Already in desired state - no-op
@@ -455,6 +465,8 @@ impl MitoRegion {
}
}
drop(manager);
Ok(())
}
@@ -576,6 +588,52 @@ impl MitoRegion {
})
.collect()
}
/// Exit staging mode successfully by merging all staged manifests and making them visible.
pub(crate) async fn exit_staging_on_success(
&self,
manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
) -> Result<()> {
let current_state = self.manifest_ctx.current_state();
ensure!(
current_state == RegionRoleState::Leader(RegionLeaderState::Staging),
RegionStateSnafu {
region_id: self.region_id,
state: current_state,
expect: RegionRoleState::Leader(RegionLeaderState::Staging),
}
);
// Merge all staged manifest actions
let merged_actions = match manager.merge_staged_actions(current_state).await? {
Some(actions) => actions,
None => {
info!(
"No staged manifests to merge for region {}, exiting staging mode without changes",
self.region_id
);
// Even if no manifests to merge, we still need to exit staging mode
self.exit_staging()?;
return Ok(());
}
};
// Submit merged actions using the manifest manager's update method
// Pass the target state (Writable) so it saves to normal directory, not staging
let target_state = RegionRoleState::Leader(RegionLeaderState::Writable);
let new_version = manager.update(merged_actions, target_state).await?;
info!(
"Successfully submitted merged staged manifests for region {}, new version: {}",
self.region_id, new_version
);
// Clear all staging manifests and transit state
manager.store().clear_staging_manifests().await?;
self.exit_staging()?;
Ok(())
}
}
/// Context to update the region manifest.
@@ -1078,6 +1136,7 @@ mod tests {
use store_api::storage::RegionId;
use crate::access_layer::AccessLayer;
use crate::manifest::action::RegionMetaActionList;
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::region::{
ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState,
@@ -1245,7 +1304,7 @@ mod tests {
region_id: metadata.region_id,
version_control,
access_layer,
manifest_ctx,
manifest_ctx: manifest_ctx.clone(),
file_purger: crate::test_util::new_noop_file_purger(),
provider: Provider::noop_provider(),
last_flush_millis: Default::default(),
@@ -1265,7 +1324,9 @@ mod tests {
assert!(!region.is_staging());
// Test transition to staging
region.set_staging().unwrap();
let mut manager = manifest_ctx.manifest_manager.write().await;
region.set_staging(&mut manager).await.unwrap();
drop(manager);
assert_eq!(
region.state(),
RegionRoleState::Leader(RegionLeaderState::Staging)
@@ -1280,9 +1341,54 @@ mod tests {
);
assert!(!region.is_staging());
// Test staging directory cleanup: Create dirty staging files before entering staging mode
{
// Create some dummy staging manifest files to simulate interrupted session
let manager = manifest_ctx.manifest_manager.write().await;
let dummy_actions = RegionMetaActionList::new(vec![]);
let dummy_bytes = dummy_actions.encode().unwrap();
// Create dirty staging files with versions 100 and 101
manager.store().save(100, &dummy_bytes, true).await.unwrap();
manager.store().save(101, &dummy_bytes, true).await.unwrap();
drop(manager);
// Verify dirty files exist before entering staging
let manager = manifest_ctx.manifest_manager.read().await;
let dirty_manifests = manager.store().fetch_staging_manifests().await.unwrap();
assert_eq!(
dirty_manifests.len(),
2,
"Should have 2 dirty staging files"
);
drop(manager);
// Enter staging mode - this should clean up the dirty files
let mut manager = manifest_ctx.manifest_manager.write().await;
region.set_staging(&mut manager).await.unwrap();
drop(manager);
// Verify dirty files are cleaned up after entering staging
let manager = manifest_ctx.manifest_manager.read().await;
let cleaned_manifests = manager.store().fetch_staging_manifests().await.unwrap();
assert_eq!(
cleaned_manifests.len(),
0,
"Dirty staging files should be cleaned up"
);
drop(manager);
// Exit staging to restore normal state for remaining tests
region.exit_staging().unwrap();
}
// Test invalid transitions
assert!(region.set_staging().is_ok()); // Writable -> Staging should work
assert!(region.set_staging().is_err()); // Staging -> Staging should fail
let mut manager = manifest_ctx.manifest_manager.write().await;
assert!(region.set_staging(&mut manager).await.is_ok()); // Writable -> Staging should work
drop(manager);
let mut manager = manifest_ctx.manifest_manager.write().await;
assert!(region.set_staging(&mut manager).await.is_err()); // Staging -> Staging should fail
drop(manager);
assert!(region.exit_staging().is_ok()); // Staging -> Writable should work
assert!(region.exit_staging().is_err()); // Writable -> Writable should fail
}