fix: staging mode with proper region edit operations (#6962)

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-09-11 21:39:42 -07:00
committed by GitHub
parent 6c066c1a4a
commit d86f489a74
9 changed files with 123 additions and 24 deletions

View File

@@ -17,11 +17,12 @@
use std::fs;
use api::v1::Rows;
use common_recordbatch::RecordBatches;
use store_api::region_engine::{RegionEngine, SettableRegionRoleState};
use store_api::region_request::{
RegionAlterRequest, RegionFlushRequest, RegionRequest, RegionTruncateRequest,
};
use store_api::storage::RegionId;
use store_api::storage::{RegionId, ScanRequest};
use crate::config::MitoConfig;
use crate::region::{RegionLeaderState, RegionRoleState};
@@ -329,9 +330,10 @@ async fn test_staging_exit_success_with_manifests() {
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert!(
!staging_files_before.is_empty(),
"Staging manifest directory should contain files before exit"
assert_eq!(
staging_files_before.len(),
2,
"Staging manifest directory should contain two files before exit"
);
// Count normal manifest files before exit
@@ -341,6 +343,31 @@ async fn test_staging_exit_success_with_manifests() {
.collect::<Result<Vec<_>, _>>()
.unwrap();
let normal_count_before = normal_files_before.len();
assert_eq!(
normal_count_before, 1,
"Normal manifest directory should initially contain one file"
);
// Try read data before exiting staging, SST files should be invisible
let request = ScanRequest::default();
let scanner = engine.scanner(region_id, request).await.unwrap();
assert_eq!(
scanner.num_files(),
0,
"No SST files should be scanned before exit"
);
assert_eq!(
scanner.num_memtables(),
0,
"Memtables should be removed in staging before exit"
);
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let total_rows: usize = batches.iter().map(|rb| rb.num_rows()).sum();
assert_eq!(
total_rows, 0,
"No data should be readable before exit staging mode"
);
// Exit staging mode successfully
engine
@@ -374,4 +401,27 @@ async fn test_staging_exit_success_with_manifests() {
normal_files_after.len() > normal_count_before,
"Normal manifest directory should contain more files after merge"
);
// Validate in-memory version reflects merged manifests (files visible in levels)
let version = region.version();
let levels = version.ssts.levels();
assert!(
!levels.is_empty() && !levels[0].files.is_empty(),
"SST levels should have files after exiting staging"
);
// Also ensure scanner behavior reflects 2 SSTs
let request = ScanRequest::default();
let scanner = engine.scanner(region_id, request).await.unwrap();
assert_eq!(
scanner.num_files(),
2,
"SST files should be scanned after exit"
);
// Try reading data via scanner to ensure previous staged data is actually readable
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let total_rows: usize = batches.iter().map(|rb| rb.num_rows()).sum();
assert_eq!(total_rows, 10, "Expected to read all staged rows");
}

View File

@@ -1012,14 +1012,14 @@ mod tests {
.collect();
// Assumes the flush job is finished.
version_control.apply_edit(
RegionEdit {
Some(RegionEdit {
files_to_add: Vec::new(),
files_to_remove: Vec::new(),
timestamp_ms: None,
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
},
}),
&[0],
builder.file_purger(),
);

View File

@@ -368,6 +368,38 @@ impl RegionMetaActionList {
pub fn new(actions: Vec<RegionMetaAction>) -> Self {
Self { actions }
}
pub fn into_region_edit(self) -> RegionEdit {
let mut edit = RegionEdit {
files_to_add: Vec::new(),
files_to_remove: Vec::new(),
timestamp_ms: None,
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
};
for action in self.actions {
if let RegionMetaAction::Edit(region_edit) = action {
// Merge file adds/removes
edit.files_to_add.extend(region_edit.files_to_add);
edit.files_to_remove.extend(region_edit.files_to_remove);
// Max of flushed entry id / sequence
if let Some(eid) = region_edit.flushed_entry_id {
edit.flushed_entry_id = Some(edit.flushed_entry_id.map_or(eid, |v| v.max(eid)));
}
if let Some(seq) = region_edit.flushed_sequence {
edit.flushed_sequence = Some(edit.flushed_sequence.map_or(seq, |v| v.max(seq)));
}
// Prefer the latest non-none time window
if region_edit.compaction_time_window.is_some() {
edit.compaction_time_window = region_edit.compaction_time_window;
}
}
}
edit
}
}
impl RegionMetaActionList {

View File

@@ -621,13 +621,18 @@ impl MitoRegion {
// Submit merged actions using the manifest manager's update method
// Pass the target state (Writable) so it saves to normal directory, not staging
let target_state = RegionRoleState::Leader(RegionLeaderState::Writable);
let new_version = manager.update(merged_actions, target_state).await?;
let new_version = manager.update(merged_actions.clone(), target_state).await?;
info!(
"Successfully submitted merged staged manifests for region {}, new version: {}",
self.region_id, new_version
);
// Apply the merged changes to in-memory version control
let merged_edit = merged_actions.into_region_edit();
self.version_control
.apply_edit(Some(merged_edit), &[], self.file_purger.clone());
// Clear all staging manifests and transit state
manager.store().clear_staging_manifests().await?;
self.exit_staging()?;
@@ -782,7 +787,7 @@ impl ManifestContext {
/// Sets the [`RegionRole`].
///
/// ```
/// ```text
/// +------------------------------------------+
/// | +-----------------+ |
/// | | | |

View File

@@ -127,19 +127,22 @@ impl VersionControl {
}
/// Apply edit to current version.
///
/// If `edit` is None, only removes the specified memtables.
pub(crate) fn apply_edit(
&self,
edit: RegionEdit,
edit: Option<RegionEdit>,
memtables_to_remove: &[MemtableId],
purger: FilePurgerRef,
) {
let version = self.current().version;
let new_version = Arc::new(
VersionBuilder::from_version(version)
.apply_edit(edit, purger)
.remove_memtables(memtables_to_remove)
.build(),
);
let builder = VersionBuilder::from_version(version);
let builder = if let Some(edit) = edit {
builder.apply_edit(edit, purger)
} else {
builder
};
let new_version = Arc::new(builder.remove_memtables(memtables_to_remove).build());
let mut version_data = self.data.write().unwrap();
version_data.version = new_version;

View File

@@ -204,14 +204,14 @@ pub(crate) fn apply_edit(
.collect();
version_control.apply_edit(
RegionEdit {
Some(RegionEdit {
files_to_add,
files_to_remove: files_to_remove.to_vec(),
timestamp_ms: None,
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
},
}),
&[],
purger,
);

View File

@@ -74,9 +74,11 @@ impl<S> RegionWorkerLoop<S> {
};
region.update_compaction_millis();
region
.version_control
.apply_edit(request.edit.clone(), &[], region.file_purger.clone());
region.version_control.apply_edit(
Some(request.edit.clone()),
&[],
region.file_purger.clone(),
);
// compaction finished.
request.on_success();

View File

@@ -215,9 +215,14 @@ impl<S: LogStore> RegionWorkerLoop<S> {
"Skipping region metadata update for region {} in staging mode",
region_id
);
region.version_control.apply_edit(
None,
&request.memtables_to_remove,
region.file_purger.clone(),
);
} else {
region.version_control.apply_edit(
request.edit.clone(),
Some(request.edit.clone()),
&request.memtables_to_remove,
region.file_purger.clone(),
);

View File

@@ -262,9 +262,11 @@ impl<S> RegionWorkerLoop<S> {
if edit_result.result.is_ok() {
// Applies the edit to the region.
region
.version_control
.apply_edit(edit_result.edit, &[], region.file_purger.clone());
region.version_control.apply_edit(
Some(edit_result.edit),
&[],
region.file_purger.clone(),
);
}
// Sets the region as writable.