From fcb77fd02503036243c51fd81e4c4767619bf0bb Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Fri, 15 May 2026 11:13:13 +0800 Subject: [PATCH] fix(mito): allow compaction publish during editing (#8097) * fix(mito): allow compaction publish during editing Allow compaction manifest updates while a region is in the transient editing state, and restrict direct region edits to add-only requests so compaction cannot race with external file removals. Files: - `src/mito2/src/compaction/compactor.rs` - `src/mito2/src/engine.rs` - `src/mito2/src/engine/edit_region_test.rs` - `src/mito2/src/region.rs` Signed-off-by: Lei, HUANG * Allow remove-only region edits - `edit_region`: restore direct `RegionEdit` validation for non-empty `files_to_add` or `files_to_remove` in `src/mito2/src/engine.rs` - `compaction`: document why compaction can publish during `Editing` while remove-capable sync-region edits are follower-only in `src/mito2/src/region.rs` - `tests`: update region-edit validation coverage and remove obsolete rejection coverage in `src/mito2/src/engine.rs` and `src/mito2/src/engine/edit_region_test.rs` Signed-off-by: Lei, HUANG * fix: logs Signed-off-by: Lei, HUANG * fix/editing-with-compaction: Document compaction staging behavior - `compaction`: clarify why `update_manifest_for_compaction` writes to the normal manifest path in `src/mito2/src/region.rs` - `staging`: document that staging SSTs stay outside normal region version control until staging exits in `src/mito2/src/region.rs` Signed-off-by: Lei, HUANG --------- Signed-off-by: Lei, HUANG --- src/mito2/src/compaction/compactor.rs | 2 +- src/mito2/src/engine.rs | 14 ++- src/mito2/src/region.rs | 152 ++++++++++++++++++++------ 3 files changed, 135 insertions(+), 33 deletions(-) diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index e5dae0af0c..4aaf84f626 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -620,7 +620,7 @@ where // TODO: We might leak files if we fail to update manifest. We can add a cleanup task to remove them later. compaction_region .manifest_ctx - .update_manifest(RegionLeaderState::Writable, action_list, false) + .update_manifest_for_compaction(action_list) .await?; Ok(edit) diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index b3fa6ce75b..d4c52c7162 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -1466,7 +1466,19 @@ mod tests { }; assert!(!is_valid_region_edit(&edit)); - // Valid: "files_to_remove" is not empty + // Valid: has only "files_to_remove" + let edit = RegionEdit { + files_to_add: vec![], + files_to_remove: vec![FileMeta::default()], + timestamp_ms: None, + compaction_time_window: None, + flushed_entry_id: None, + flushed_sequence: None, + committed_sequence: None, + }; + assert!(is_valid_region_edit(&edit)); + + // Valid: both "files_to_add" and "files_to_remove" are not empty let edit = RegionEdit { files_to_add: vec![FileMeta::default()], files_to_remove: vec![FileMeta::default()], diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 2e7b7a8ae5..ef88d6dd7c 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -977,6 +977,87 @@ impl ManifestContext { expect_state: RegionLeaderState, action_list: RegionMetaActionList, is_staging: bool, + ) -> Result { + self.update_manifest_with_state_check(action_list, is_staging, |current_state, region_id| { + // If expect_state is not downgrading, the current state must be either `expect_state` or downgrading. + // + // A downgrading leader rejects user writes but still allows + // flushing the memtable and updating the manifest. + if expect_state != RegionLeaderState::Downgrading { + if current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading) { + info!( + "Region {} is in downgrading leader state, updating manifest. Expect state is {:?}", + region_id, expect_state + ); + } + ensure!( + current_state == RegionRoleState::Leader(expect_state) + || current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading), + UpdateManifestSnafu { + region_id, + state: current_state, + } + ); + } else { + ensure!( + current_state == RegionRoleState::Leader(expect_state), + RegionStateSnafu { + region_id, + state: current_state, + expect: RegionRoleState::Leader(expect_state), + } + ); + } + + Ok(()) + }) + .await + } + + /// Updates the manifest for compaction. + /// + /// Compaction may finish while a direct external region edit is in the transient + /// `Editing` state. Direct external edits can remove files both when followers + /// apply sync-region metadata and when a writable leader performs a direct edit + /// such as `edit_region()`. Allowing compaction to publish in `Editing` is still + /// safe because publication happens under the manifest write lock and compaction + /// rechecks that its input files are still valid before committing. + /// + /// This intentionally writes to the normal manifest path (`is_staging = false`). + /// Entering staging cancels or waits for active compactions before switching the + /// region to `Staging`, so a compaction that started before staging still finishes + /// against the normal manifest. Even if a manual compaction is requested while the + /// region is already staging, compaction only sees SSTs in the normal visible + /// region version; SSTs from staging manifests are not applied to region version + /// control until staging exits successfully. + pub(crate) async fn update_manifest_for_compaction( + &self, + action_list: RegionMetaActionList, + ) -> Result { + self.update_manifest_with_state_check(action_list, false, |current_state, region_id| { + ensure!( + matches!( + current_state, + RegionRoleState::Leader(RegionLeaderState::Writable) + | RegionRoleState::Leader(RegionLeaderState::Editing) + | RegionRoleState::Leader(RegionLeaderState::Downgrading) + ), + UpdateManifestSnafu { + region_id, + state: current_state, + } + ); + + Ok(()) + }) + .await + } + + async fn update_manifest_with_state_check( + &self, + action_list: RegionMetaActionList, + is_staging: bool, + check_state: impl FnOnce(RegionRoleState, RegionId) -> Result<()>, ) -> Result { // Acquires the write lock of the manifest manager. let mut manager = self.manifest_manager.write().await; @@ -985,36 +1066,7 @@ impl ManifestContext { // Checks state inside the lock. This is to ensure that we won't update the manifest // after `set_readonly_gracefully()` is called. let current_state = self.state.load(); - - // If expect_state is not downgrading, the current state must be either `expect_state` or downgrading. - // - // A downgrading leader rejects user writes but still allows - // flushing the memtable and updating the manifest. - if expect_state != RegionLeaderState::Downgrading { - if current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading) { - info!( - "Region {} is in downgrading leader state, updating manifest. state is {:?}", - manifest.metadata.region_id, expect_state - ); - } - ensure!( - current_state == RegionRoleState::Leader(expect_state) - || current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading), - UpdateManifestSnafu { - region_id: manifest.metadata.region_id, - state: current_state, - } - ); - } else { - ensure!( - current_state == RegionRoleState::Leader(expect_state), - RegionStateSnafu { - region_id: manifest.metadata.region_id, - state: current_state, - expect: RegionRoleState::Leader(expect_state), - } - ); - } + check_state(current_state, manifest.metadata.region_id)?; for action in &action_list.actions { // Checks whether the edit is still applicable. @@ -1562,7 +1614,7 @@ mod tests { use store_api::logstore::provider::Provider; use store_api::region_engine::RegionRole; use store_api::region_request::PathType; - use store_api::storage::RegionId; + use store_api::storage::{FileId, RegionId}; use crate::access_layer::AccessLayer; use crate::error::Error; @@ -1640,6 +1692,44 @@ mod tests { } } + #[tokio::test] + async fn test_compaction_update_manifest_allows_editing_state() { + let env = SchedulerEnv::new().await; + let region = build_test_region(&env).await; + region.set_editing(RegionLeaderState::Writable).unwrap(); + + let file_id = FileId::random(); + let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(RegionEdit { + files_to_add: vec![crate::sst::file::FileMeta { + region_id: region.region_id, + file_id, + level: 1, + ..Default::default() + }], + files_to_remove: Vec::new(), + timestamp_ms: None, + compaction_time_window: None, + flushed_entry_id: None, + flushed_sequence: None, + committed_sequence: None, + })); + + region + .manifest_ctx + .update_manifest_for_compaction(action_list) + .await + .unwrap(); + + assert!( + region + .manifest_ctx + .manifest() + .await + .files + .contains_key(&file_id) + ); + } + #[tokio::test] async fn test_exit_staging_partition_expr_change_and_edit_success() { let env = SchedulerEnv::new().await;