mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-17 13:30:38 +00:00
fix(mito): allow compaction publish during editing (#8097)
* fix(mito): allow compaction publish during editing Allow compaction manifest updates while a region is in the transient editing state, and restrict direct region edits to add-only requests so compaction cannot race with external file removals. Files: - `src/mito2/src/compaction/compactor.rs` - `src/mito2/src/engine.rs` - `src/mito2/src/engine/edit_region_test.rs` - `src/mito2/src/region.rs` Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * Allow remove-only region edits - `edit_region`: restore direct `RegionEdit` validation for non-empty `files_to_add` or `files_to_remove` in `src/mito2/src/engine.rs` - `compaction`: document why compaction can publish during `Editing` while remove-capable sync-region edits are follower-only in `src/mito2/src/region.rs` - `tests`: update region-edit validation coverage and remove obsolete rejection coverage in `src/mito2/src/engine.rs` and `src/mito2/src/engine/edit_region_test.rs` Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * fix: logs Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * fix/editing-with-compaction: Document compaction staging behavior - `compaction`: clarify why `update_manifest_for_compaction` writes to the normal manifest path in `src/mito2/src/region.rs` - `staging`: document that staging SSTs stay outside normal region version control until staging exits in `src/mito2/src/region.rs` Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> --------- Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
@@ -620,7 +620,7 @@ where
|
||||
// TODO: We might leak files if we fail to update manifest. We can add a cleanup task to remove them later.
|
||||
compaction_region
|
||||
.manifest_ctx
|
||||
.update_manifest(RegionLeaderState::Writable, action_list, false)
|
||||
.update_manifest_for_compaction(action_list)
|
||||
.await?;
|
||||
|
||||
Ok(edit)
|
||||
|
||||
@@ -1466,7 +1466,19 @@ mod tests {
|
||||
};
|
||||
assert!(!is_valid_region_edit(&edit));
|
||||
|
||||
// Valid: "files_to_remove" is not empty
|
||||
// Valid: has only "files_to_remove"
|
||||
let edit = RegionEdit {
|
||||
files_to_add: vec![],
|
||||
files_to_remove: vec![FileMeta::default()],
|
||||
timestamp_ms: None,
|
||||
compaction_time_window: None,
|
||||
flushed_entry_id: None,
|
||||
flushed_sequence: None,
|
||||
committed_sequence: None,
|
||||
};
|
||||
assert!(is_valid_region_edit(&edit));
|
||||
|
||||
// Valid: both "files_to_add" and "files_to_remove" are not empty
|
||||
let edit = RegionEdit {
|
||||
files_to_add: vec![FileMeta::default()],
|
||||
files_to_remove: vec![FileMeta::default()],
|
||||
|
||||
@@ -977,6 +977,87 @@ impl ManifestContext {
|
||||
expect_state: RegionLeaderState,
|
||||
action_list: RegionMetaActionList,
|
||||
is_staging: bool,
|
||||
) -> Result<ManifestVersion> {
|
||||
self.update_manifest_with_state_check(action_list, is_staging, |current_state, region_id| {
|
||||
// If expect_state is not downgrading, the current state must be either `expect_state` or downgrading.
|
||||
//
|
||||
// A downgrading leader rejects user writes but still allows
|
||||
// flushing the memtable and updating the manifest.
|
||||
if expect_state != RegionLeaderState::Downgrading {
|
||||
if current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading) {
|
||||
info!(
|
||||
"Region {} is in downgrading leader state, updating manifest. Expect state is {:?}",
|
||||
region_id, expect_state
|
||||
);
|
||||
}
|
||||
ensure!(
|
||||
current_state == RegionRoleState::Leader(expect_state)
|
||||
|| current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading),
|
||||
UpdateManifestSnafu {
|
||||
region_id,
|
||||
state: current_state,
|
||||
}
|
||||
);
|
||||
} else {
|
||||
ensure!(
|
||||
current_state == RegionRoleState::Leader(expect_state),
|
||||
RegionStateSnafu {
|
||||
region_id,
|
||||
state: current_state,
|
||||
expect: RegionRoleState::Leader(expect_state),
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Updates the manifest for compaction.
|
||||
///
|
||||
/// Compaction may finish while a direct external region edit is in the transient
|
||||
/// `Editing` state. Direct external edits can remove files both when followers
|
||||
/// apply sync-region metadata and when a writable leader performs a direct edit
|
||||
/// such as `edit_region()`. Allowing compaction to publish in `Editing` is still
|
||||
/// safe because publication happens under the manifest write lock and compaction
|
||||
/// rechecks that its input files are still valid before committing.
|
||||
///
|
||||
/// This intentionally writes to the normal manifest path (`is_staging = false`).
|
||||
/// Entering staging cancels or waits for active compactions before switching the
|
||||
/// region to `Staging`, so a compaction that started before staging still finishes
|
||||
/// against the normal manifest. Even if a manual compaction is requested while the
|
||||
/// region is already staging, compaction only sees SSTs in the normal visible
|
||||
/// region version; SSTs from staging manifests are not applied to region version
|
||||
/// control until staging exits successfully.
|
||||
pub(crate) async fn update_manifest_for_compaction(
|
||||
&self,
|
||||
action_list: RegionMetaActionList,
|
||||
) -> Result<ManifestVersion> {
|
||||
self.update_manifest_with_state_check(action_list, false, |current_state, region_id| {
|
||||
ensure!(
|
||||
matches!(
|
||||
current_state,
|
||||
RegionRoleState::Leader(RegionLeaderState::Writable)
|
||||
| RegionRoleState::Leader(RegionLeaderState::Editing)
|
||||
| RegionRoleState::Leader(RegionLeaderState::Downgrading)
|
||||
),
|
||||
UpdateManifestSnafu {
|
||||
region_id,
|
||||
state: current_state,
|
||||
}
|
||||
);
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
async fn update_manifest_with_state_check(
|
||||
&self,
|
||||
action_list: RegionMetaActionList,
|
||||
is_staging: bool,
|
||||
check_state: impl FnOnce(RegionRoleState, RegionId) -> Result<()>,
|
||||
) -> Result<ManifestVersion> {
|
||||
// Acquires the write lock of the manifest manager.
|
||||
let mut manager = self.manifest_manager.write().await;
|
||||
@@ -985,36 +1066,7 @@ impl ManifestContext {
|
||||
// Checks state inside the lock. This is to ensure that we won't update the manifest
|
||||
// after `set_readonly_gracefully()` is called.
|
||||
let current_state = self.state.load();
|
||||
|
||||
// If expect_state is not downgrading, the current state must be either `expect_state` or downgrading.
|
||||
//
|
||||
// A downgrading leader rejects user writes but still allows
|
||||
// flushing the memtable and updating the manifest.
|
||||
if expect_state != RegionLeaderState::Downgrading {
|
||||
if current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading) {
|
||||
info!(
|
||||
"Region {} is in downgrading leader state, updating manifest. state is {:?}",
|
||||
manifest.metadata.region_id, expect_state
|
||||
);
|
||||
}
|
||||
ensure!(
|
||||
current_state == RegionRoleState::Leader(expect_state)
|
||||
|| current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading),
|
||||
UpdateManifestSnafu {
|
||||
region_id: manifest.metadata.region_id,
|
||||
state: current_state,
|
||||
}
|
||||
);
|
||||
} else {
|
||||
ensure!(
|
||||
current_state == RegionRoleState::Leader(expect_state),
|
||||
RegionStateSnafu {
|
||||
region_id: manifest.metadata.region_id,
|
||||
state: current_state,
|
||||
expect: RegionRoleState::Leader(expect_state),
|
||||
}
|
||||
);
|
||||
}
|
||||
check_state(current_state, manifest.metadata.region_id)?;
|
||||
|
||||
for action in &action_list.actions {
|
||||
// Checks whether the edit is still applicable.
|
||||
@@ -1562,7 +1614,7 @@ mod tests {
|
||||
use store_api::logstore::provider::Provider;
|
||||
use store_api::region_engine::RegionRole;
|
||||
use store_api::region_request::PathType;
|
||||
use store_api::storage::RegionId;
|
||||
use store_api::storage::{FileId, RegionId};
|
||||
|
||||
use crate::access_layer::AccessLayer;
|
||||
use crate::error::Error;
|
||||
@@ -1640,6 +1692,44 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_compaction_update_manifest_allows_editing_state() {
|
||||
let env = SchedulerEnv::new().await;
|
||||
let region = build_test_region(&env).await;
|
||||
region.set_editing(RegionLeaderState::Writable).unwrap();
|
||||
|
||||
let file_id = FileId::random();
|
||||
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(RegionEdit {
|
||||
files_to_add: vec![crate::sst::file::FileMeta {
|
||||
region_id: region.region_id,
|
||||
file_id,
|
||||
level: 1,
|
||||
..Default::default()
|
||||
}],
|
||||
files_to_remove: Vec::new(),
|
||||
timestamp_ms: None,
|
||||
compaction_time_window: None,
|
||||
flushed_entry_id: None,
|
||||
flushed_sequence: None,
|
||||
committed_sequence: None,
|
||||
}));
|
||||
|
||||
region
|
||||
.manifest_ctx
|
||||
.update_manifest_for_compaction(action_list)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(
|
||||
region
|
||||
.manifest_ctx
|
||||
.manifest()
|
||||
.await
|
||||
.files
|
||||
.contains_key(&file_id)
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_exit_staging_partition_expr_change_and_edit_success() {
|
||||
let env = SchedulerEnv::new().await;
|
||||
|
||||
Reference in New Issue
Block a user