diff --git a/Cargo.lock b/Cargo.lock index c8f8faedbf..02f99d7290 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5466,7 +5466,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=520fa524f9d590752ea327683e82ffd65721b27c#520fa524f9d590752ea327683e82ffd65721b27c" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=a2e5099d72a1cfa8ba41fa4296101eb5f874074a#a2e5099d72a1cfa8ba41fa4296101eb5f874074a" dependencies = [ "prost 0.13.5", "prost-types 0.13.5", diff --git a/Cargo.toml b/Cargo.toml index e0e2701f85..594002bedb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -151,7 +151,7 @@ etcd-client = { version = "0.16.1", features = [ fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "520fa524f9d590752ea327683e82ffd65721b27c" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a2e5099d72a1cfa8ba41fa4296101eb5f874074a" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 4ac52683cf..a3c818c015 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -126,7 +126,7 @@ use crate::config::MitoConfig; use crate::engine::puffin_index::{IndexEntryContext, collect_index_entries_from_puffin}; use crate::error::{ InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result, - SerdeJsonSnafu, SerializeColumnMetadataSnafu, SerializeManifestSnafu, + SerdeJsonSnafu, SerializeColumnMetadataSnafu, }; #[cfg(feature = "enterprise")] use crate::extension::BoxedExtensionRangeProviderFactory; @@ -1057,19 +1057,8 @@ impl EngineInner { let region_id = request.region_id; let (request, receiver) = WorkerRequest::try_from_remap_manifests_request(request)?; self.workers.submit_to_worker(region_id, request).await?; - let manifests = receiver.await.context(RecvSnafu)??; - - let new_manifests = manifests - .into_iter() - .map(|(region_id, manifest)| { - Ok(( - region_id, - serde_json::to_string(&manifest) - .context(SerializeManifestSnafu { region_id })?, - )) - }) - .collect::>>()?; - Ok(RemapManifestsResponse { new_manifests }) + let manifest_paths = receiver.await.context(RecvSnafu)??; + Ok(RemapManifestsResponse { manifest_paths }) } async fn copy_region_from( diff --git a/src/mito2/src/engine/apply_staging_manifest_test.rs b/src/mito2/src/engine/apply_staging_manifest_test.rs index c5048b1c4b..d4fc7f7cb7 100644 --- a/src/mito2/src/engine/apply_staging_manifest_test.rs +++ b/src/mito2/src/engine/apply_staging_manifest_test.rs @@ -69,7 +69,8 @@ async fn test_apply_staging_manifest_invalid_region_state_with_format(flat_forma region_id, RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest { partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(), - files_to_add: vec![], + central_region_id: RegionId::new(1, 0), + manifest_path: "manifest.json".to_string(), }), ) .await @@ -88,7 +89,8 @@ async fn test_apply_staging_manifest_invalid_region_state_with_format(flat_forma region_id, RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest { partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(), - files_to_add: vec![], + central_region_id: RegionId::new(1, 0), + manifest_path: "manifest.json".to_string(), }), ) .await @@ -136,7 +138,8 @@ async fn test_apply_staging_manifest_mismatched_partition_expr_with_format(flat_ region_id, RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest { partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(), - files_to_add: vec![], + central_region_id: RegionId::new(1, 0), + manifest_path: "dummy".to_string(), }), ) .await @@ -144,7 +147,36 @@ async fn test_apply_staging_manifest_mismatched_partition_expr_with_format(flat_ assert_matches!( err.into_inner().as_any().downcast_ref::().unwrap(), Error::StagingPartitionExprMismatch { .. } - ) + ); + + // If staging manifest's partition expr is different from the request. + let result = engine + .remap_manifests(RemapManifestsRequest { + region_id, + input_regions: vec![region_id], + region_mapping: [(region_id, vec![region_id])].into_iter().collect(), + new_partition_exprs: [(region_id, range_expr("x", 0, 49).as_json_str().unwrap())] + .into_iter() + .collect(), + }) + .await + .unwrap(); + + let err = engine + .handle_request( + region_id, + RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest { + partition_expr: range_expr("x", 0, 50).as_json_str().unwrap(), + central_region_id: region_id, + manifest_path: result.manifest_paths[®ion_id].clone(), + }), + ) + .await + .unwrap_err(); + assert_matches!( + err.into_inner().as_any().downcast_ref::().unwrap(), + Error::StagingPartitionExprMismatch { .. } + ); } #[tokio::test] @@ -216,13 +248,26 @@ async fn test_apply_staging_manifest_success_with_format(flat_format: bool) { }) .await .unwrap(); - assert_eq!(result.new_manifests.len(), 2); - let new_manifest_1 = - serde_json::from_str::(&result.new_manifests[&new_region_id_1]).unwrap(); - let new_manifest_2 = - serde_json::from_str::(&result.new_manifests[&new_region_id_2]).unwrap(); + let region = engine.get_region(region_id).unwrap(); + let manager = region.manifest_ctx.manifest_manager.write().await; + let manifest_storage = manager.store(); + let blob_store = manifest_storage.staging_storage().blob_storage(); + + assert_eq!(result.manifest_paths.len(), 2); + common_telemetry::debug!("manifest paths: {:?}", result.manifest_paths); + let new_manifest_1 = blob_store + .get(&result.manifest_paths[&new_region_id_1]) + .await + .unwrap(); + let new_manifest_2 = blob_store + .get(&result.manifest_paths[&new_region_id_2]) + .await + .unwrap(); + let new_manifest_1 = serde_json::from_slice::(&new_manifest_1).unwrap(); + let new_manifest_2 = serde_json::from_slice::(&new_manifest_2).unwrap(); assert_eq!(new_manifest_1.files.len(), 3); assert_eq!(new_manifest_2.files.len(), 3); + drop(manager); let request = CreateRequestBuilder::new().build(); engine @@ -238,7 +283,6 @@ async fn test_apply_staging_manifest_success_with_format(flat_format: bool) { ) .await .unwrap(); - let mut files_to_add = new_manifest_1.files.values().cloned().collect::>(); // Before apply staging manifest, the files should be empty let region = engine.get_region(new_region_id_1).unwrap(); let manifest = region.manifest_ctx.manifest().await; @@ -251,7 +295,8 @@ async fn test_apply_staging_manifest_success_with_format(flat_format: bool) { new_region_id_1, RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest { partition_expr: range_expr("tag_0", 0, 50).as_json_str().unwrap(), - files_to_add: serde_json::to_vec(&files_to_add).unwrap(), + central_region_id: region_id, + manifest_path: result.manifest_paths[&new_region_id_1].clone(), }), ) .await @@ -277,23 +322,52 @@ async fn test_apply_staging_manifest_success_with_format(flat_format: bool) { let region_dir = format!("{}/data/test/1_0000000001", data_home.display()); let staging_manifest_dir = format!("{}/staging/manifest", region_dir); let staging_files = fs::read_dir(&staging_manifest_dir) - .map(|entries| entries.collect::, _>>().unwrap_or_default()) + .map(|entries| { + entries + .filter(|e| e.as_ref().unwrap().metadata().unwrap().is_file()) + .collect::, _>>() + .unwrap_or_default() + }) .unwrap_or_default(); - assert_eq!(staging_files.len(), 0); + assert_eq!(staging_files.len(), 0, "staging_files: {:?}", staging_files); + + let region = engine.get_region(region_id).unwrap(); + let manager = region.manifest_ctx.manifest_manager.write().await; + let manifest_storage = manager.store(); + let blob_store = manifest_storage.staging_storage().blob_storage(); + + let new_manifest_1 = blob_store + .get(&result.manifest_paths[&new_region_id_1]) + .await + .unwrap(); + let mut new_manifest_1 = serde_json::from_slice::(&new_manifest_1).unwrap(); // Try to modify the file sequence. - files_to_add.push(FileMeta { - region_id, - file_id: FileId::random(), - ..Default::default() - }); + let file_id = FileId::random(); + new_manifest_1.files.insert( + file_id, + FileMeta { + region_id, + file_id, + ..Default::default() + }, + ); + blob_store + .put( + &result.manifest_paths[&new_region_id_1], + serde_json::to_vec(&new_manifest_1).unwrap(), + ) + .await + .unwrap(); + drop(manager); // This request will be ignored. engine .handle_request( new_region_id_1, RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest { partition_expr: range_expr("tag_0", 0, 50).as_json_str().unwrap(), - files_to_add: serde_json::to_vec(&files_to_add).unwrap(), + central_region_id: region_id, + manifest_path: result.manifest_paths[&new_region_id_1].clone(), }), ) .await @@ -334,12 +408,40 @@ async fn test_apply_staging_manifest_invalid_files_to_add_with_format(flat_forma ) .await .unwrap(); + // Apply staging manifest with not exists manifest path. let err = engine .handle_request( region_id, RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest { partition_expr: range_expr("tag_0", 0, 50).as_json_str().unwrap(), - files_to_add: b"invalid".to_vec(), + central_region_id: RegionId::new(1, 0), + manifest_path: "dummy".to_string(), + }), + ) + .await + .unwrap_err(); + assert_matches!( + err.into_inner().as_any().downcast_ref::().unwrap(), + Error::OpenDal { .. } + ); + + // Apply staging manifest with invalid bytes. + let region = engine.get_region(region_id).unwrap(); + let manager = region.manifest_ctx.manifest_manager.write().await; + let manifest_storage = manager.store(); + let blob_store = manifest_storage.staging_storage().blob_storage(); + blob_store + .put("invalid_bytes", b"invalid_bytes".to_vec()) + .await + .unwrap(); + drop(manager); + let err = engine + .handle_request( + region_id, + RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest { + partition_expr: range_expr("tag_0", 0, 50).as_json_str().unwrap(), + central_region_id: region_id, + manifest_path: "invalid_bytes".to_string(), }), ) .await @@ -349,52 +451,3 @@ async fn test_apply_staging_manifest_invalid_files_to_add_with_format(flat_forma Error::SerdeJson { .. } ); } - -#[tokio::test] -async fn test_apply_staging_manifest_empty_files() { - common_telemetry::init_default_ut_logging(); - test_apply_staging_manifest_empty_files_with_format(false).await; - test_apply_staging_manifest_empty_files_with_format(true).await; -} - -async fn test_apply_staging_manifest_empty_files_with_format(flat_format: bool) { - let mut env = TestEnv::with_prefix("empty-files").await; - let engine = env - .create_engine(MitoConfig { - default_experimental_flat_format: flat_format, - ..Default::default() - }) - .await; - let region_id = RegionId::new(1, 1); - let request = CreateRequestBuilder::new().build(); - engine - .handle_request(region_id, RegionRequest::Create(request)) - .await - .unwrap(); - engine - .handle_request( - region_id, - RegionRequest::EnterStaging(EnterStagingRequest { - partition_expr: range_expr("tag_0", 0, 50).as_json_str().unwrap(), - }), - ) - .await - .unwrap(); - engine - .handle_request( - region_id, - RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest { - partition_expr: range_expr("tag_0", 0, 50).as_json_str().unwrap(), - files_to_add: serde_json::to_vec::>(&vec![]).unwrap(), - }), - ) - .await - .unwrap(); - let region = engine.get_region(region_id).unwrap(); - let manifest = region.manifest_ctx.manifest().await; - assert_eq!(manifest.files.len(), 0); - let staging_manifest = region.manifest_ctx.staging_manifest().await; - assert!(staging_manifest.is_none()); - let staging_partition_expr = region.staging_partition_expr.lock().unwrap(); - assert!(staging_partition_expr.is_none()); -} diff --git a/src/mito2/src/engine/remap_manifests_test.rs b/src/mito2/src/engine/remap_manifests_test.rs index bd38e87e2a..e3538401aa 100644 --- a/src/mito2/src/engine/remap_manifests_test.rs +++ b/src/mito2/src/engine/remap_manifests_test.rs @@ -229,11 +229,23 @@ async fn test_remap_manifests_success_with_format(flat_format: bool) { }) .await .unwrap(); - assert_eq!(result.new_manifests.len(), 2); - let new_manifest_1 = - serde_json::from_str::(&result.new_manifests[&new_region_id_1]).unwrap(); - let new_manifest_2 = - serde_json::from_str::(&result.new_manifests[&new_region_id_2]).unwrap(); + let region = engine.get_region(region_id).unwrap(); + let manager = region.manifest_ctx.manifest_manager.write().await; + let manifest_storage = manager.store(); + let blob_store = manifest_storage.staging_storage().blob_storage(); + + assert_eq!(result.manifest_paths.len(), 2); + common_telemetry::debug!("manifest paths: {:?}", result.manifest_paths); + let new_manifest_1 = blob_store + .get(&result.manifest_paths[&new_region_id_1]) + .await + .unwrap(); + let new_manifest_2 = blob_store + .get(&result.manifest_paths[&new_region_id_2]) + .await + .unwrap(); + let new_manifest_1 = serde_json::from_slice::(&new_manifest_1).unwrap(); + let new_manifest_2 = serde_json::from_slice::(&new_manifest_2).unwrap(); assert_eq!(new_manifest_1.files.len(), 3); assert_eq!(new_manifest_2.files.len(), 3); } diff --git a/src/mito2/src/manifest/storage.rs b/src/mito2/src/manifest/storage.rs index bebab52de1..6013c0393f 100644 --- a/src/mito2/src/manifest/storage.rs +++ b/src/mito2/src/manifest/storage.rs @@ -378,6 +378,11 @@ impl ManifestObjectStore { pub async fn clear_staging_manifests(&mut self) -> Result<()> { self.staging_storage.clear().await } + + /// Returns the staging storage. + pub(crate) fn staging_storage(&self) -> &StagingStorage { + &self.staging_storage + } } #[cfg(test)] diff --git a/src/mito2/src/manifest/storage/staging.rs b/src/mito2/src/manifest/storage/staging.rs index 7a00349be9..82fbee2246 100644 --- a/src/mito2/src/manifest/storage/staging.rs +++ b/src/mito2/src/manifest/storage/staging.rs @@ -26,20 +26,104 @@ use crate::manifest::storage::size_tracker::NoopTracker; use crate::manifest::storage::utils::sort_manifests; use crate::manifest::storage::{file_version, is_delta_file}; +/// A simple blob storage for arbitrary binary data in the staging directory. +/// +/// This is primarily used during repartition operations to store generated +/// manifests that will be consumed by other regions via [`ApplyStagingManifestRequest`](store_api::region_request::ApplyStagingManifestRequest). +/// The blobs are stored in `{region_dir}/staging/blob/` directory. +#[derive(Debug, Clone)] +pub(crate) struct StagingBlobStorage { + object_store: ObjectStore, + path: String, +} + +/// Returns the staging path from the blob path. +/// +/// # Example +/// - Input: `"data/table/region_0001/manifest/"` +/// - Output: `"data/table/region_0001/staging/blob/"` +pub fn staging_blob_path(manifest_path: &str) -> String { + let parent_dir = manifest_path + .trim_end_matches("manifest/") + .trim_end_matches('/'); + util::normalize_dir(&format!("{}/staging/blob", parent_dir)) +} + +impl StagingBlobStorage { + pub fn new(path: String, object_store: ObjectStore) -> Self { + let path = util::normalize_dir(&path); + common_telemetry::debug!( + "Staging blob storage path: {}, root: {}", + path, + object_store.info().root() + ); + Self { object_store, path } + } + + /// Put the bytes to the blob storage. + pub async fn put(&self, path: &str, bytes: Vec) -> Result<()> { + let path = format!("{}{}", self.path, path); + common_telemetry::debug!( + "Putting blob to staging blob storage, path: {}, root: {}, bytes: {}", + path, + self.object_store.info().root(), + bytes.len() + ); + self.object_store + .write(&path, bytes) + .await + .context(OpenDalSnafu)?; + Ok(()) + } + + /// Get the bytes from the blob storage. + pub async fn get(&self, path: &str) -> Result> { + let path = format!("{}{}", self.path, path); + common_telemetry::debug!( + "Reading blob from staging blob storage, path: {}, root: {}", + path, + self.object_store.info().root() + ); + let bytes = self.object_store.read(&path).await.context(OpenDalSnafu)?; + + Ok(bytes.to_vec()) + } +} + +/// Storage for staging manifest files and blobs used during repartition operations. +/// +/// Fields: +/// - `delta_storage`: Manages incremental manifest delta files specific to the staging region. +/// - `blob_storage`: Manages arbitrary blobs, such as generated manifests for regions. +/// +/// Directory structure: +/// - `{region_dir}/staging/manifest/` — for incremental manifest delta files for the staging region. +/// - `{region_dir}/staging/blob/` — for arbitrary blobs (e.g., generated region manifests). #[derive(Debug, Clone)] pub(crate) struct StagingStorage { delta_storage: DeltaStorage, + blob_storage: StagingBlobStorage, +} + +/// Returns the staging path from the manifest path. +/// +/// # Example +/// - Input: `"data/table/region_0001/manifest/"` +/// - Output: `"data/table/region_0001/staging/manifest/"` +pub fn staging_manifest_path(manifest_path: &str) -> String { + let parent_dir = manifest_path + .trim_end_matches("manifest/") + .trim_end_matches('/'); + util::normalize_dir(&format!("{}/staging/manifest", parent_dir)) } impl StagingStorage { pub fn new(path: String, object_store: ObjectStore, compress_type: CompressionType) -> Self { - let staging_path = { - // Convert "region_dir/manifest/" to "region_dir/staging/manifest/" - let parent_dir = path.trim_end_matches("manifest/").trim_end_matches('/'); - util::normalize_dir(&format!("{}/staging/manifest", parent_dir)) - }; + let staging_blob_path = staging_blob_path(&path); + let blob_storage = StagingBlobStorage::new(staging_blob_path, object_store.clone()); + let staging_manifest_path = staging_manifest_path(&path); let delta_storage = DeltaStorage::new( - staging_path.clone(), + staging_manifest_path.clone(), object_store.clone(), compress_type, // StagingStorage does not use a manifest cache; set to None. @@ -48,7 +132,16 @@ impl StagingStorage { // deleted after exiting staging mode. Arc::new(NoopTracker), ); - Self { delta_storage } + + Self { + delta_storage, + blob_storage, + } + } + + /// Returns the blob storage. + pub(crate) fn blob_storage(&self) -> &StagingBlobStorage { + &self.blob_storage } /// Returns an iterator of manifests from staging directory. @@ -107,3 +200,22 @@ impl StagingStorage { self.delta_storage.set_compress_type(compress_type); } } + +#[cfg(test)] +mod tests { + use crate::manifest::storage::staging::{staging_blob_path, staging_manifest_path}; + + #[test] + fn test_staging_path() { + let path = "/data/table/region_0001/manifest/"; + let expected = "/data/table/region_0001/staging/manifest/"; + assert_eq!(staging_manifest_path(path), expected); + } + + #[test] + fn test_staging_blob_path() { + let path = "/data/table/region_0001/manifest/"; + let expected = "/data/table/region_0001/staging/blob/"; + assert_eq!(staging_blob_path(path), expected); + } +} diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 7c40ee0d23..334c1825b0 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -50,7 +50,7 @@ use crate::error::{ FlushRegionSnafu, InvalidPartitionExprSnafu, InvalidRequestSnafu, MissingPartitionExprSnafu, Result, UnexpectedSnafu, }; -use crate::manifest::action::{RegionEdit, RegionManifest, TruncateKind}; +use crate::manifest::action::{RegionEdit, TruncateKind}; use crate::memtable::MemtableId; use crate::memtable::bulk::part::BulkPart; use crate::metrics::COMPACTION_ELAPSED_TOTAL; @@ -796,10 +796,7 @@ impl WorkerRequest { region_mapping, new_partition_exprs, }: store_api::region_engine::RemapManifestsRequest, - ) -> Result<( - WorkerRequest, - Receiver>>, - )> { + ) -> Result<(WorkerRequest, Receiver>>)> { let (sender, receiver) = oneshot::channel(); let new_partition_exprs = new_partition_exprs .into_iter() @@ -1116,8 +1113,10 @@ pub(crate) struct RemapManifestsRequest { pub(crate) region_mapping: HashMap>, /// New partition expressions for the new regions. pub(crate) new_partition_exprs: HashMap, - /// Result sender. - pub(crate) sender: Sender>>, + /// Sender for the result of the remap operation. + /// + /// The result is a map from region IDs to their corresponding staging manifest paths. + pub(crate) sender: Sender>>, } #[derive(Debug)] diff --git a/src/mito2/src/worker/handle_apply_staging.rs b/src/mito2/src/worker/handle_apply_staging.rs index ed4cc899a0..3fe868d518 100644 --- a/src/mito2/src/worker/handle_apply_staging.rs +++ b/src/mito2/src/worker/handle_apply_staging.rs @@ -21,12 +21,14 @@ use store_api::storage::RegionId; use tokio::sync::oneshot; use crate::error::{ - RegionStateSnafu, SerdeJsonSnafu, StagingPartitionExprMismatchSnafu, UnexpectedSnafu, + RegionStateSnafu, Result, SerdeJsonSnafu, StagingPartitionExprMismatchSnafu, UnexpectedSnafu, }; -use crate::manifest::action::RegionEdit; -use crate::region::{RegionLeaderState, RegionRoleState}; -use crate::request::{OptionOutputTx, RegionEditRequest}; -use crate::sst::file::FileMeta; +use crate::manifest::action::{RegionEdit, RegionManifest}; +use crate::manifest::storage::manifest_dir; +use crate::manifest::storage::staging::{StagingBlobStorage, staging_blob_path}; +use crate::region::{MitoRegionRef, RegionLeaderState, RegionRoleState}; +use crate::request::{OptionOutputTx, RegionEditRequest, WorkerRequest, WorkerRequestWithTime}; +use crate::sst::location::region_dir_from_table_dir; use crate::worker::RegionWorkerLoop; impl RegionWorkerLoop { @@ -86,21 +88,32 @@ impl RegionWorkerLoop { return; } - let (tx, rx) = oneshot::channel(); - let files_to_add = match serde_json::from_slice::>(&request.files_to_add) - .context(SerdeJsonSnafu) - { - Ok(files_to_add) => files_to_add, - Err(e) => { - sender.send(Err(e)); + let worker_sender = self.sender.clone(); + common_runtime::spawn_global(async move { + let staging_manifest = match Self::fetch_staging_manifest( + ®ion, + request.central_region_id, + &request.manifest_path, + ) + .await + { + Ok(staging_manifest) => staging_manifest, + Err(e) => { + sender.send(Err(e)); + return; + } + }; + if staging_manifest.metadata.partition_expr.as_ref() != Some(&request.partition_expr) { + sender.send(Err(StagingPartitionExprMismatchSnafu { + manifest_expr: staging_manifest.metadata.partition_expr.clone(), + request_expr: request.partition_expr, + } + .build())); return; } - }; - info!("Applying staging manifest request to region {}", region_id); - self.handle_region_edit(RegionEditRequest { - region_id, - edit: RegionEdit { + let files_to_add = staging_manifest.files.values().cloned().collect::>(); + let edit = RegionEdit { files_to_add, files_to_remove: vec![], timestamp_ms: Some(Utc::now().timestamp_millis()), @@ -108,11 +121,23 @@ impl RegionWorkerLoop { flushed_entry_id: None, flushed_sequence: None, committed_sequence: None, - }, - tx, - }); + }; + + let (tx, rx) = oneshot::channel(); + info!( + "Applying staging manifest request to region {}", + region.region_id, + ); + let _ = worker_sender + .send(WorkerRequestWithTime::new(WorkerRequest::EditRegion( + RegionEditRequest { + region_id: region.region_id, + edit, + tx, + }, + ))) + .await; - common_runtime::spawn_global(async move { // Await the result from the region edit and forward the outcome to the original sender. // If the operation completes successfully, respond with Ok(0); otherwise, respond with an appropriate error. if let Ok(result) = rx.await { @@ -137,4 +162,25 @@ impl RegionWorkerLoop { } }); } + + /// Fetches the staging manifest from the central region's staging blob storage. + /// + /// The `central_region_id` is used to locate the staging directory because the staging + /// manifest was created by the central region during `remap_manifests` operation. + async fn fetch_staging_manifest( + region: &MitoRegionRef, + central_region_id: RegionId, + manifest_path: &str, + ) -> Result { + let region_dir = + region_dir_from_table_dir(region.table_dir(), central_region_id, region.path_type()); + let staging_blob_path = staging_blob_path(&manifest_dir(®ion_dir)); + let staging_blob_storage = StagingBlobStorage::new( + staging_blob_path, + region.access_layer().object_store().clone(), + ); + let staging_manifest = staging_blob_storage.get(manifest_path).await?; + + serde_json::from_slice::(&staging_manifest).context(SerdeJsonSnafu) + } } diff --git a/src/mito2/src/worker/handle_remap.rs b/src/mito2/src/worker/handle_remap.rs index 5e94221f7d..639b9eef3e 100644 --- a/src/mito2/src/worker/handle_remap.rs +++ b/src/mito2/src/worker/handle_remap.rs @@ -16,14 +16,13 @@ use std::collections::HashMap; use std::time::Instant; use common_error::ext::BoxedError; -use common_telemetry::info; +use common_telemetry::{debug, info}; use futures::future::try_join_all; use partition::expr::PartitionExpr; use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; -use crate::error::{FetchManifestsSnafu, InvalidRequestSnafu, MissingManifestSnafu, Result}; -use crate::manifest::action::RegionManifest; +use crate::error::{self, FetchManifestsSnafu, InvalidRequestSnafu, MissingManifestSnafu, Result}; use crate::region::{MitoRegionRef, RegionMetadataLoader}; use crate::remap_manifest::RemapManifest; use crate::request::RemapManifestsRequest; @@ -75,13 +74,17 @@ impl RegionWorkerLoop { }); } + // Fetches manifests for input regions, remaps them according to the provided + // mapping and partition expressions. + // + // Returns a map from each new region to its relative staging manifest path. async fn fetch_and_remap_manifests( region: MitoRegionRef, region_metadata_loader: RegionMetadataLoader, input_regions: Vec, new_partition_exprs: HashMap, region_mapping: HashMap>, - ) -> Result> { + ) -> Result> { let mut tasks = Vec::with_capacity(input_regions.len()); let region_options = region.version().options.clone(); let table_dir = region.table_dir(); @@ -97,7 +100,6 @@ impl RegionWorkerLoop { .await }); } - let results = try_join_all(tasks) .await .map_err(BoxedError::new) @@ -112,12 +114,38 @@ impl RegionWorkerLoop { .collect::>>()?; let mut mapper = RemapManifest::new(manifests, new_partition_exprs, region_mapping); let remap_result = mapper.remap_manifests()?; + + // Write new manifests to staging blob storage. + let manifest_manager = region.manifest_ctx.manifest_manager.write().await; + let manifest_storage = manifest_manager.store(); + let staging_blob_storage = manifest_storage.staging_storage().blob_storage().clone(); + let mut tasks = Vec::with_capacity(remap_result.new_manifests.len()); + + for (remap_region_id, manifest) in &remap_result.new_manifests { + let bytes = serde_json::to_vec(&manifest).context(error::SerializeManifestSnafu { + region_id: *remap_region_id, + })?; + let key = remap_manifest_key(remap_region_id); + tasks.push(async { + debug!( + "Putting manifest to staging blob storage, region_id: {}, key: {}", + *remap_region_id, key + ); + staging_blob_storage.put(&key, bytes).await?; + Ok((*remap_region_id, key)) + }); + } + let r = try_join_all(tasks).await?; info!( "Remap manifests cost: {:?}, region: {}", now.elapsed(), region.region_id ); - Ok(remap_result.new_manifests) + Ok(r.into_iter().collect::>()) } } + +fn remap_manifest_key(region_id: &RegionId) -> String { + format!("remap_manifest_{}", region_id.as_u64()) +} diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index a179d1dc14..5d5bf8f4ce 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -759,8 +759,11 @@ pub struct RemapManifestsRequest { /// Response to remap manifests from old regions to new regions. #[derive(Debug, Clone)] pub struct RemapManifestsResponse { - /// The new manifests for the new regions. - pub new_manifests: HashMap, + /// Maps region id to its staging manifest path. + /// + /// These paths are relative paths within the central region's staging blob storage, + /// and should be passed to [`ApplyStagingManifestRequest`](RegionRequest::ApplyStagingManifest) to finalize the repartition. + pub manifest_paths: HashMap, } /// Request to copy files from a source region to a target region. diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 6293adc3b4..c7e069a3c1 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -421,20 +421,17 @@ fn make_region_apply_staging_manifest( api::v1::region::ApplyStagingManifestRequest { region_id, partition_expr, - files_to_add, + central_region_id, + manifest_path, }: api::v1::region::ApplyStagingManifestRequest, ) -> Result> { let region_id = region_id.into(); - let files_to_add = files_to_add - .context(UnexpectedSnafu { - reason: "'files_to_add' field is missing", - })? - .data; Ok(vec![( region_id, RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest { partition_expr, - files_to_add, + central_region_id: central_region_id.into(), + manifest_path, }), )]) } @@ -1464,8 +1461,10 @@ pub struct EnterStagingRequest { /// In practice, this means: /// - The `partition_expr` identifies the staging region rule that the manifest /// was generated for. -/// - `files_to_add` carries the serialized metadata (such as file manifests or -/// file lists) that should be attached to the region under the new rule. +/// - `central_region_id` specifies which region holds the staging blob storage +/// where the manifest was written during the `remap_manifests` operation. +/// - `manifest_path` is the relative path within the central region's staging +/// blob storage to fetch the generated manifest. /// /// It should typically be called **after** the staging region has been /// initialized by [`EnterStagingRequest`] and the new file layout has been @@ -1474,8 +1473,11 @@ pub struct EnterStagingRequest { pub struct ApplyStagingManifestRequest { /// The partition expression of the staging region. pub partition_expr: String, - /// The files to add to the region. - pub files_to_add: Vec, + /// The region that stores the staging manifests in its staging blob storage. + pub central_region_id: RegionId, + /// The relative path to the staging manifest within the central region's + /// staging blob storage. + pub manifest_path: String, } impl fmt::Display for RegionRequest {