diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index ef60fea1cb..5c67f482f1 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -239,10 +239,10 @@ impl AccessLayer { .write_and_upload_sst( request, SstUploadRequest { - dest_path_provider: RegionFilePathFactory { - table_dir: self.table_dir.clone(), - path_type: self.path_type, - }, + dest_path_provider: RegionFilePathFactory::new( + self.table_dir.clone(), + self.path_type, + ), remote_store: self.object_store.clone(), }, write_opts, diff --git a/src/mito2/src/engine/staging_test.rs b/src/mito2/src/engine/staging_test.rs index 6ecdce63f5..c22ddf92a9 100644 --- a/src/mito2/src/engine/staging_test.rs +++ b/src/mito2/src/engine/staging_test.rs @@ -14,13 +14,19 @@ //! Integration tests for staging state functionality. -use store_api::region_engine::RegionEngine; -use store_api::region_request::{RegionAlterRequest, RegionRequest, RegionTruncateRequest}; +use std::fs; + +use api::v1::Rows; +use store_api::region_engine::{RegionEngine, SettableRegionRoleState}; +use store_api::region_request::{ + RegionAlterRequest, RegionFlushRequest, RegionRequest, RegionTruncateRequest, +}; use store_api::storage::RegionId; use crate::config::MitoConfig; +use crate::region::{RegionLeaderState, RegionRoleState}; use crate::request::WorkerRequest; -use crate::test_util::{CreateRequestBuilder, TestEnv}; +use crate::test_util::{build_rows, put_rows, rows_schema, CreateRequestBuilder, TestEnv}; #[tokio::test] async fn test_staging_state_integration() { @@ -127,8 +133,6 @@ async fn test_staging_blocks_truncate_operations() { #[tokio::test] async fn test_staging_state_validation_patterns() { - use crate::region::{RegionLeaderState, RegionRoleState}; - // Test the state validation patterns used throughout the codebase let staging_state = RegionRoleState::Leader(RegionLeaderState::Staging); let writable_state = RegionRoleState::Leader(RegionLeaderState::Writable); @@ -178,3 +182,84 @@ async fn test_staging_state_validation_patterns() { "Writable regions should be flushable" ); } + +#[tokio::test] +async fn test_staging_manifest_directory() { + let mut env = TestEnv::new().await; + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1024, 0); + let request = CreateRequestBuilder::new().build(); + + // Get column schemas before consuming the request + let column_schemas = rows_schema(&request); + + // Check manifest files after region creation (before staging mode) + // Create region + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + // Check that manifest files exist after region creation + let data_home = env.data_home(); + + let region_dir = format!("{}/data/test/1024_0000000000", data_home.display()); + let normal_manifest_dir = format!("{}/manifest", region_dir); + assert!( + fs::metadata(&normal_manifest_dir).is_ok(), + "Normal manifest directory should exist" + ); + + // Now test staging mode manifest creation + // Set region to staging mode using the engine API + engine + .set_region_role_state_gracefully(region_id, SettableRegionRoleState::StagingLeader) + .await + .unwrap(); + + // Put some data and flush in staging mode + let rows_data = Rows { + schema: column_schemas.clone(), + rows: build_rows(0, 3), + }; + put_rows(&engine, region_id, rows_data).await; + + // Force flush to generate manifest files in staging mode + engine + .handle_request( + region_id, + RegionRequest::Flush(RegionFlushRequest { + row_group_size: None, + }), + ) + .await + .unwrap(); + + // Check that manifest files are in staging directory + let staging_manifest_dir = format!("{}/staging/manifest", region_dir); + assert!( + fs::metadata(&staging_manifest_dir).is_ok(), + "Staging manifest directory should exist" + ); + + // Check what exists in normal manifest directory + let files: Vec<_> = fs::read_dir(&normal_manifest_dir) + .unwrap() + .collect::, _>>() + .unwrap(); + assert!( + !files.is_empty(), + "Normal manifest directory should contain files" + ); + + // Check what exists in staging manifest directory + let staging_files = fs::read_dir(&staging_manifest_dir) + .unwrap() + .collect::, _>>() + .unwrap(); + assert!( + !staging_files.is_empty(), + "Staging manifest directory should contain files" + ); +} diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index dee6901c9a..ff20b7af4c 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -43,7 +43,7 @@ use crate::read::scan_region::PredicateGroup; use crate::read::Source; use crate::region::options::{IndexOptions, MergeMode}; use crate::region::version::{VersionControlData, VersionControlRef}; -use crate::region::{ManifestContextRef, RegionLeaderState}; +use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState}; use crate::request::{ BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderBulkRequest, SenderDdlRequest, SenderWriteRequest, WorkerRequest, WorkerRequestWithTime, @@ -458,7 +458,13 @@ impl RegionFlushTask { let expected_state = if matches!(self.reason, FlushReason::Downgrading) { RegionLeaderState::Downgrading } else { - RegionLeaderState::Writable + // Check if region is in staging mode + let current_state = self.manifest_ctx.current_state(); + if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) { + RegionLeaderState::Staging + } else { + RegionLeaderState::Writable + } }; // We will leak files if the manifest update fails, but we ignore them for simplicity. We can // add a cleanup job to remove them later. diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index 7d4e8e34da..e765528b66 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -35,9 +35,7 @@ use crate::manifest::storage::{ file_version, is_checkpoint_file, is_delta_file, ManifestObjectStore, }; use crate::metrics::MANIFEST_OP_ELAPSED; -#[cfg(test)] -use crate::region::RegionLeaderState; -use crate::region::RegionRoleState; +use crate::region::{RegionLeaderState, RegionRoleState}; /// Options for [RegionManifestManager]. #[derive(Debug, Clone)] @@ -167,7 +165,9 @@ impl RegionManifestManager { // Persist region change. let action_list = RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange { metadata })); - store.save(version, &action_list.encode()?).await?; + // New region is not in staging mode. + // TODO(ruihang): add staging mode support if needed. + store.save(version, &action_list.encode()?, false).await?; let checkpointer = Checkpointer::new(region_id, options, store.clone(), MIN_VERSION); manifest_version.store(version, Ordering::Relaxed); @@ -454,7 +454,10 @@ impl RegionManifestManager { ); let version = self.increase_version(); - self.store.save(version, &action_list.encode()?).await?; + let is_staging = region_state == RegionRoleState::Leader(RegionLeaderState::Staging); + self.store + .save(version, &action_list.encode()?, is_staging) + .await?; let mut manifest_builder = RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone())); diff --git a/src/mito2/src/manifest/storage.rs b/src/mito2/src/manifest/storage.rs index 7394f03ed7..e2d0572958 100644 --- a/src/mito2/src/manifest/storage.rs +++ b/src/mito2/src/manifest/storage.rs @@ -134,6 +134,7 @@ pub struct ManifestObjectStore { object_store: ObjectStore, compress_type: CompressionType, path: String, + staging_path: String, /// Stores the size of each manifest file. manifest_size_map: Arc>>, total_manifest_size: Arc, @@ -146,18 +147,30 @@ impl ManifestObjectStore { compress_type: CompressionType, total_manifest_size: Arc, ) -> Self { + let path = util::normalize_dir(path); + let staging_path = { + // Convert "region_dir/manifest/" to "region_dir/staging/manifest/" + let parent_dir = path.trim_end_matches("manifest/").trim_end_matches('/'); + util::normalize_dir(&format!("{}/staging/manifest", parent_dir)) + }; Self { object_store, compress_type, - path: util::normalize_dir(path), + path, + staging_path, manifest_size_map: Arc::new(RwLock::new(HashMap::new())), total_manifest_size, } } /// Returns the delta file path under the **current** compression algorithm - fn delta_file_path(&self, version: ManifestVersion) -> String { - gen_path(&self.path, &delta_file(version), self.compress_type) + fn delta_file_path(&self, version: ManifestVersion, is_staging: bool) -> String { + let base_path = if is_staging { + &self.staging_path + } else { + &self.path + }; + gen_path(base_path, &delta_file(version), self.compress_type) } /// Returns the checkpoint file path under the **current** compression algorithm @@ -390,8 +403,13 @@ impl ManifestObjectStore { } /// Save the delta manifest file. - pub async fn save(&mut self, version: ManifestVersion, bytes: &[u8]) -> Result<()> { - let path = self.delta_file_path(version); + pub async fn save( + &mut self, + version: ManifestVersion, + bytes: &[u8], + is_staging: bool, + ) -> Result<()> { + let path = self.delta_file_path(version, is_staging); debug!("Save log to manifest storage, version: {}", version); let data = self .compress_type @@ -724,7 +742,7 @@ mod tests { async fn test_manifest_log_store_case(mut log_store: ManifestObjectStore) { for v in 0..5 { log_store - .save(v, format!("hello, {v}").as_bytes()) + .save(v, format!("hello, {v}").as_bytes(), false) .await .unwrap(); } @@ -797,7 +815,7 @@ mod tests { log_store.compress_type = CompressionType::Uncompressed; for v in 0..5 { log_store - .save(v, format!("hello, {v}").as_bytes()) + .save(v, format!("hello, {v}").as_bytes(), false) .await .unwrap(); } @@ -817,7 +835,7 @@ mod tests { // write compressed data to stimulate compress algorithm take effect for v in 5..10 { log_store - .save(v, format!("hello, {v}").as_bytes()) + .save(v, format!("hello, {v}").as_bytes(), false) .await .unwrap(); } @@ -873,7 +891,7 @@ mod tests { log_store.compress_type = CompressionType::Uncompressed; for v in 0..5 { log_store - .save(v, format!("hello, {v}").as_bytes()) + .save(v, format!("hello, {v}").as_bytes(), false) .await .unwrap(); } @@ -912,7 +930,7 @@ mod tests { // write 5 manifest files for v in 0..5 { log_store - .save(v, format!("hello, {v}").as_bytes()) + .save(v, format!("hello, {v}").as_bytes(), false) .await .unwrap(); } diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index 7f7d5e3e15..b676ecf773 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -209,11 +209,22 @@ impl RegionWorkerLoop { } }; - region.version_control.apply_edit( - request.edit.clone(), - &request.memtables_to_remove, - region.file_purger.clone(), - ); + // Check if region is currently in staging mode + let is_staging = region.manifest_ctx.current_state() + == crate::region::RegionRoleState::Leader(crate::region::RegionLeaderState::Staging); + + if is_staging { + info!( + "Skipping region metadata update for region {} in staging mode", + region_id + ); + } else { + region.version_control.apply_edit( + request.edit.clone(), + &request.memtables_to_remove, + region.file_purger.clone(), + ); + } region.update_flush_millis(); diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index d5fe42cbf5..68d8152d5a 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -242,7 +242,8 @@ impl RegionWorkerLoop { continue; }; match region.state() { - RegionRoleState::Leader(RegionLeaderState::Writable) => { + RegionRoleState::Leader(RegionLeaderState::Writable) + | RegionRoleState::Leader(RegionLeaderState::Staging) => { let region_ctx = RegionWriteCtx::new( region.region_id, ®ion.version_control,