diff --git a/src/mito2/src/engine/staging_test.rs b/src/mito2/src/engine/staging_test.rs index a883c53106..137a9e9b5f 100644 --- a/src/mito2/src/engine/staging_test.rs +++ b/src/mito2/src/engine/staging_test.rs @@ -17,11 +17,12 @@ use std::fs; use api::v1::Rows; +use common_recordbatch::RecordBatches; use store_api::region_engine::{RegionEngine, SettableRegionRoleState}; use store_api::region_request::{ RegionAlterRequest, RegionFlushRequest, RegionRequest, RegionTruncateRequest, }; -use store_api::storage::RegionId; +use store_api::storage::{RegionId, ScanRequest}; use crate::config::MitoConfig; use crate::region::{RegionLeaderState, RegionRoleState}; @@ -329,9 +330,10 @@ async fn test_staging_exit_success_with_manifests() { .unwrap() .collect::, _>>() .unwrap(); - assert!( - !staging_files_before.is_empty(), - "Staging manifest directory should contain files before exit" + assert_eq!( + staging_files_before.len(), + 2, + "Staging manifest directory should contain two files before exit" ); // Count normal manifest files before exit @@ -341,6 +343,31 @@ async fn test_staging_exit_success_with_manifests() { .collect::, _>>() .unwrap(); let normal_count_before = normal_files_before.len(); + assert_eq!( + normal_count_before, 1, + "Normal manifest directory should initially contain one file" + ); + + // Try read data before exiting staging, SST files should be invisible + let request = ScanRequest::default(); + let scanner = engine.scanner(region_id, request).await.unwrap(); + assert_eq!( + scanner.num_files(), + 0, + "No SST files should be scanned before exit" + ); + assert_eq!( + scanner.num_memtables(), + 0, + "Memtables should be removed in staging before exit" + ); + let stream = scanner.scan().await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let total_rows: usize = batches.iter().map(|rb| rb.num_rows()).sum(); + assert_eq!( + total_rows, 0, + "No data should be readable before exit staging mode" + ); // Exit staging mode successfully engine @@ -374,4 +401,27 @@ async fn test_staging_exit_success_with_manifests() { normal_files_after.len() > normal_count_before, "Normal manifest directory should contain more files after merge" ); + + // Validate in-memory version reflects merged manifests (files visible in levels) + let version = region.version(); + let levels = version.ssts.levels(); + assert!( + !levels.is_empty() && !levels[0].files.is_empty(), + "SST levels should have files after exiting staging" + ); + + // Also ensure scanner behavior reflects 2 SSTs + let request = ScanRequest::default(); + let scanner = engine.scanner(region_id, request).await.unwrap(); + assert_eq!( + scanner.num_files(), + 2, + "SST files should be scanned after exit" + ); + + // Try reading data via scanner to ensure previous staged data is actually readable + let stream = scanner.scan().await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let total_rows: usize = batches.iter().map(|rb| rb.num_rows()).sum(); + assert_eq!(total_rows, 10, "Expected to read all staged rows"); } diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 0c8e40dda5..cc40b6fed0 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -1012,14 +1012,14 @@ mod tests { .collect(); // Assumes the flush job is finished. version_control.apply_edit( - RegionEdit { + Some(RegionEdit { files_to_add: Vec::new(), files_to_remove: Vec::new(), timestamp_ms: None, compaction_time_window: None, flushed_entry_id: None, flushed_sequence: None, - }, + }), &[0], builder.file_purger(), ); diff --git a/src/mito2/src/manifest/action.rs b/src/mito2/src/manifest/action.rs index c4de57896c..85b8677b0a 100644 --- a/src/mito2/src/manifest/action.rs +++ b/src/mito2/src/manifest/action.rs @@ -368,6 +368,38 @@ impl RegionMetaActionList { pub fn new(actions: Vec) -> Self { Self { actions } } + + pub fn into_region_edit(self) -> RegionEdit { + let mut edit = RegionEdit { + files_to_add: Vec::new(), + files_to_remove: Vec::new(), + timestamp_ms: None, + compaction_time_window: None, + flushed_entry_id: None, + flushed_sequence: None, + }; + + for action in self.actions { + if let RegionMetaAction::Edit(region_edit) = action { + // Merge file adds/removes + edit.files_to_add.extend(region_edit.files_to_add); + edit.files_to_remove.extend(region_edit.files_to_remove); + // Max of flushed entry id / sequence + if let Some(eid) = region_edit.flushed_entry_id { + edit.flushed_entry_id = Some(edit.flushed_entry_id.map_or(eid, |v| v.max(eid))); + } + if let Some(seq) = region_edit.flushed_sequence { + edit.flushed_sequence = Some(edit.flushed_sequence.map_or(seq, |v| v.max(seq))); + } + // Prefer the latest non-none time window + if region_edit.compaction_time_window.is_some() { + edit.compaction_time_window = region_edit.compaction_time_window; + } + } + } + + edit + } } impl RegionMetaActionList { diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index b657b107ec..e280ebe493 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -621,13 +621,18 @@ impl MitoRegion { // Submit merged actions using the manifest manager's update method // Pass the target state (Writable) so it saves to normal directory, not staging let target_state = RegionRoleState::Leader(RegionLeaderState::Writable); - let new_version = manager.update(merged_actions, target_state).await?; + let new_version = manager.update(merged_actions.clone(), target_state).await?; info!( "Successfully submitted merged staged manifests for region {}, new version: {}", self.region_id, new_version ); + // Apply the merged changes to in-memory version control + let merged_edit = merged_actions.into_region_edit(); + self.version_control + .apply_edit(Some(merged_edit), &[], self.file_purger.clone()); + // Clear all staging manifests and transit state manager.store().clear_staging_manifests().await?; self.exit_staging()?; @@ -782,7 +787,7 @@ impl ManifestContext { /// Sets the [`RegionRole`]. /// - /// ``` + /// ```text /// +------------------------------------------+ /// | +-----------------+ | /// | | | | diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index 6a62219855..e37ed743ce 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -127,19 +127,22 @@ impl VersionControl { } /// Apply edit to current version. + /// + /// If `edit` is None, only removes the specified memtables. pub(crate) fn apply_edit( &self, - edit: RegionEdit, + edit: Option, memtables_to_remove: &[MemtableId], purger: FilePurgerRef, ) { let version = self.current().version; - let new_version = Arc::new( - VersionBuilder::from_version(version) - .apply_edit(edit, purger) - .remove_memtables(memtables_to_remove) - .build(), - ); + let builder = VersionBuilder::from_version(version); + let builder = if let Some(edit) = edit { + builder.apply_edit(edit, purger) + } else { + builder + }; + let new_version = Arc::new(builder.remove_memtables(memtables_to_remove).build()); let mut version_data = self.data.write().unwrap(); version_data.version = new_version; diff --git a/src/mito2/src/test_util/version_util.rs b/src/mito2/src/test_util/version_util.rs index d023e50f53..16fe0eaa12 100644 --- a/src/mito2/src/test_util/version_util.rs +++ b/src/mito2/src/test_util/version_util.rs @@ -204,14 +204,14 @@ pub(crate) fn apply_edit( .collect(); version_control.apply_edit( - RegionEdit { + Some(RegionEdit { files_to_add, files_to_remove: files_to_remove.to_vec(), timestamp_ms: None, compaction_time_window: None, flushed_entry_id: None, flushed_sequence: None, - }, + }), &[], purger, ); diff --git a/src/mito2/src/worker/handle_compaction.rs b/src/mito2/src/worker/handle_compaction.rs index b74819a733..b9aa7baa38 100644 --- a/src/mito2/src/worker/handle_compaction.rs +++ b/src/mito2/src/worker/handle_compaction.rs @@ -74,9 +74,11 @@ impl RegionWorkerLoop { }; region.update_compaction_millis(); - region - .version_control - .apply_edit(request.edit.clone(), &[], region.file_purger.clone()); + region.version_control.apply_edit( + Some(request.edit.clone()), + &[], + region.file_purger.clone(), + ); // compaction finished. request.on_success(); diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index aabee2db4f..336d198ce5 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -215,9 +215,14 @@ impl RegionWorkerLoop { "Skipping region metadata update for region {} in staging mode", region_id ); + region.version_control.apply_edit( + None, + &request.memtables_to_remove, + region.file_purger.clone(), + ); } else { region.version_control.apply_edit( - request.edit.clone(), + Some(request.edit.clone()), &request.memtables_to_remove, region.file_purger.clone(), ); diff --git a/src/mito2/src/worker/handle_manifest.rs b/src/mito2/src/worker/handle_manifest.rs index 6e897be971..4fb32befed 100644 --- a/src/mito2/src/worker/handle_manifest.rs +++ b/src/mito2/src/worker/handle_manifest.rs @@ -262,9 +262,11 @@ impl RegionWorkerLoop { if edit_result.result.is_ok() { // Applies the edit to the region. - region - .version_control - .apply_edit(edit_result.edit, &[], region.file_purger.clone()); + region.version_control.apply_edit( + Some(edit_result.edit), + &[], + region.file_purger.clone(), + ); } // Sets the region as writable.