feat: update ApplyStagingManifestRequest to fetch manifest from central region (#7493)

* feat: update ApplyStagingManifestRequest to fetch manifest from central region

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: refine comments

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor(mito2): rename `StagingDataStorage` to `StagingBlobStorage`

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: update proto

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-12-30 15:29:56 +08:00
committed by GitHub
parent 554f3943b6
commit b1d81913f5
12 changed files with 393 additions and 144 deletions

2
Cargo.lock generated
View File

@@ -5466,7 +5466,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=520fa524f9d590752ea327683e82ffd65721b27c#520fa524f9d590752ea327683e82ffd65721b27c"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=a2e5099d72a1cfa8ba41fa4296101eb5f874074a#a2e5099d72a1cfa8ba41fa4296101eb5f874074a"
dependencies = [
"prost 0.13.5",
"prost-types 0.13.5",

View File

@@ -151,7 +151,7 @@ etcd-client = { version = "0.16.1", features = [
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "520fa524f9d590752ea327683e82ffd65721b27c" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a2e5099d72a1cfa8ba41fa4296101eb5f874074a" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -126,7 +126,7 @@ use crate::config::MitoConfig;
use crate::engine::puffin_index::{IndexEntryContext, collect_index_entries_from_puffin};
use crate::error::{
InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result,
SerdeJsonSnafu, SerializeColumnMetadataSnafu, SerializeManifestSnafu,
SerdeJsonSnafu, SerializeColumnMetadataSnafu,
};
#[cfg(feature = "enterprise")]
use crate::extension::BoxedExtensionRangeProviderFactory;
@@ -1057,19 +1057,8 @@ impl EngineInner {
let region_id = request.region_id;
let (request, receiver) = WorkerRequest::try_from_remap_manifests_request(request)?;
self.workers.submit_to_worker(region_id, request).await?;
let manifests = receiver.await.context(RecvSnafu)??;
let new_manifests = manifests
.into_iter()
.map(|(region_id, manifest)| {
Ok((
region_id,
serde_json::to_string(&manifest)
.context(SerializeManifestSnafu { region_id })?,
))
})
.collect::<Result<HashMap<_, _>>>()?;
Ok(RemapManifestsResponse { new_manifests })
let manifest_paths = receiver.await.context(RecvSnafu)??;
Ok(RemapManifestsResponse { manifest_paths })
}
async fn copy_region_from(

View File

@@ -69,7 +69,8 @@ async fn test_apply_staging_manifest_invalid_region_state_with_format(flat_forma
region_id,
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(),
files_to_add: vec![],
central_region_id: RegionId::new(1, 0),
manifest_path: "manifest.json".to_string(),
}),
)
.await
@@ -88,7 +89,8 @@ async fn test_apply_staging_manifest_invalid_region_state_with_format(flat_forma
region_id,
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(),
files_to_add: vec![],
central_region_id: RegionId::new(1, 0),
manifest_path: "manifest.json".to_string(),
}),
)
.await
@@ -136,7 +138,8 @@ async fn test_apply_staging_manifest_mismatched_partition_expr_with_format(flat_
region_id,
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(),
files_to_add: vec![],
central_region_id: RegionId::new(1, 0),
manifest_path: "dummy".to_string(),
}),
)
.await
@@ -144,7 +147,36 @@ async fn test_apply_staging_manifest_mismatched_partition_expr_with_format(flat_
assert_matches!(
err.into_inner().as_any().downcast_ref::<Error>().unwrap(),
Error::StagingPartitionExprMismatch { .. }
)
);
// If staging manifest's partition expr is different from the request.
let result = engine
.remap_manifests(RemapManifestsRequest {
region_id,
input_regions: vec![region_id],
region_mapping: [(region_id, vec![region_id])].into_iter().collect(),
new_partition_exprs: [(region_id, range_expr("x", 0, 49).as_json_str().unwrap())]
.into_iter()
.collect(),
})
.await
.unwrap();
let err = engine
.handle_request(
region_id,
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
partition_expr: range_expr("x", 0, 50).as_json_str().unwrap(),
central_region_id: region_id,
manifest_path: result.manifest_paths[&region_id].clone(),
}),
)
.await
.unwrap_err();
assert_matches!(
err.into_inner().as_any().downcast_ref::<Error>().unwrap(),
Error::StagingPartitionExprMismatch { .. }
);
}
#[tokio::test]
@@ -216,13 +248,26 @@ async fn test_apply_staging_manifest_success_with_format(flat_format: bool) {
})
.await
.unwrap();
assert_eq!(result.new_manifests.len(), 2);
let new_manifest_1 =
serde_json::from_str::<RegionManifest>(&result.new_manifests[&new_region_id_1]).unwrap();
let new_manifest_2 =
serde_json::from_str::<RegionManifest>(&result.new_manifests[&new_region_id_2]).unwrap();
let region = engine.get_region(region_id).unwrap();
let manager = region.manifest_ctx.manifest_manager.write().await;
let manifest_storage = manager.store();
let blob_store = manifest_storage.staging_storage().blob_storage();
assert_eq!(result.manifest_paths.len(), 2);
common_telemetry::debug!("manifest paths: {:?}", result.manifest_paths);
let new_manifest_1 = blob_store
.get(&result.manifest_paths[&new_region_id_1])
.await
.unwrap();
let new_manifest_2 = blob_store
.get(&result.manifest_paths[&new_region_id_2])
.await
.unwrap();
let new_manifest_1 = serde_json::from_slice::<RegionManifest>(&new_manifest_1).unwrap();
let new_manifest_2 = serde_json::from_slice::<RegionManifest>(&new_manifest_2).unwrap();
assert_eq!(new_manifest_1.files.len(), 3);
assert_eq!(new_manifest_2.files.len(), 3);
drop(manager);
let request = CreateRequestBuilder::new().build();
engine
@@ -238,7 +283,6 @@ async fn test_apply_staging_manifest_success_with_format(flat_format: bool) {
)
.await
.unwrap();
let mut files_to_add = new_manifest_1.files.values().cloned().collect::<Vec<_>>();
// Before apply staging manifest, the files should be empty
let region = engine.get_region(new_region_id_1).unwrap();
let manifest = region.manifest_ctx.manifest().await;
@@ -251,7 +295,8 @@ async fn test_apply_staging_manifest_success_with_format(flat_format: bool) {
new_region_id_1,
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
partition_expr: range_expr("tag_0", 0, 50).as_json_str().unwrap(),
files_to_add: serde_json::to_vec(&files_to_add).unwrap(),
central_region_id: region_id,
manifest_path: result.manifest_paths[&new_region_id_1].clone(),
}),
)
.await
@@ -277,23 +322,52 @@ async fn test_apply_staging_manifest_success_with_format(flat_format: bool) {
let region_dir = format!("{}/data/test/1_0000000001", data_home.display());
let staging_manifest_dir = format!("{}/staging/manifest", region_dir);
let staging_files = fs::read_dir(&staging_manifest_dir)
.map(|entries| entries.collect::<Result<Vec<_>, _>>().unwrap_or_default())
.map(|entries| {
entries
.filter(|e| e.as_ref().unwrap().metadata().unwrap().is_file())
.collect::<Result<Vec<_>, _>>()
.unwrap_or_default()
})
.unwrap_or_default();
assert_eq!(staging_files.len(), 0);
assert_eq!(staging_files.len(), 0, "staging_files: {:?}", staging_files);
let region = engine.get_region(region_id).unwrap();
let manager = region.manifest_ctx.manifest_manager.write().await;
let manifest_storage = manager.store();
let blob_store = manifest_storage.staging_storage().blob_storage();
let new_manifest_1 = blob_store
.get(&result.manifest_paths[&new_region_id_1])
.await
.unwrap();
let mut new_manifest_1 = serde_json::from_slice::<RegionManifest>(&new_manifest_1).unwrap();
// Try to modify the file sequence.
files_to_add.push(FileMeta {
region_id,
file_id: FileId::random(),
..Default::default()
});
let file_id = FileId::random();
new_manifest_1.files.insert(
file_id,
FileMeta {
region_id,
file_id,
..Default::default()
},
);
blob_store
.put(
&result.manifest_paths[&new_region_id_1],
serde_json::to_vec(&new_manifest_1).unwrap(),
)
.await
.unwrap();
drop(manager);
// This request will be ignored.
engine
.handle_request(
new_region_id_1,
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
partition_expr: range_expr("tag_0", 0, 50).as_json_str().unwrap(),
files_to_add: serde_json::to_vec(&files_to_add).unwrap(),
central_region_id: region_id,
manifest_path: result.manifest_paths[&new_region_id_1].clone(),
}),
)
.await
@@ -334,12 +408,40 @@ async fn test_apply_staging_manifest_invalid_files_to_add_with_format(flat_forma
)
.await
.unwrap();
// Apply staging manifest with not exists manifest path.
let err = engine
.handle_request(
region_id,
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
partition_expr: range_expr("tag_0", 0, 50).as_json_str().unwrap(),
files_to_add: b"invalid".to_vec(),
central_region_id: RegionId::new(1, 0),
manifest_path: "dummy".to_string(),
}),
)
.await
.unwrap_err();
assert_matches!(
err.into_inner().as_any().downcast_ref::<Error>().unwrap(),
Error::OpenDal { .. }
);
// Apply staging manifest with invalid bytes.
let region = engine.get_region(region_id).unwrap();
let manager = region.manifest_ctx.manifest_manager.write().await;
let manifest_storage = manager.store();
let blob_store = manifest_storage.staging_storage().blob_storage();
blob_store
.put("invalid_bytes", b"invalid_bytes".to_vec())
.await
.unwrap();
drop(manager);
let err = engine
.handle_request(
region_id,
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
partition_expr: range_expr("tag_0", 0, 50).as_json_str().unwrap(),
central_region_id: region_id,
manifest_path: "invalid_bytes".to_string(),
}),
)
.await
@@ -349,52 +451,3 @@ async fn test_apply_staging_manifest_invalid_files_to_add_with_format(flat_forma
Error::SerdeJson { .. }
);
}
#[tokio::test]
async fn test_apply_staging_manifest_empty_files() {
common_telemetry::init_default_ut_logging();
test_apply_staging_manifest_empty_files_with_format(false).await;
test_apply_staging_manifest_empty_files_with_format(true).await;
}
async fn test_apply_staging_manifest_empty_files_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("empty-files").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
..Default::default()
})
.await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
engine
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: range_expr("tag_0", 0, 50).as_json_str().unwrap(),
}),
)
.await
.unwrap();
engine
.handle_request(
region_id,
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
partition_expr: range_expr("tag_0", 0, 50).as_json_str().unwrap(),
files_to_add: serde_json::to_vec::<Vec<FileMeta>>(&vec![]).unwrap(),
}),
)
.await
.unwrap();
let region = engine.get_region(region_id).unwrap();
let manifest = region.manifest_ctx.manifest().await;
assert_eq!(manifest.files.len(), 0);
let staging_manifest = region.manifest_ctx.staging_manifest().await;
assert!(staging_manifest.is_none());
let staging_partition_expr = region.staging_partition_expr.lock().unwrap();
assert!(staging_partition_expr.is_none());
}

View File

@@ -229,11 +229,23 @@ async fn test_remap_manifests_success_with_format(flat_format: bool) {
})
.await
.unwrap();
assert_eq!(result.new_manifests.len(), 2);
let new_manifest_1 =
serde_json::from_str::<RegionManifest>(&result.new_manifests[&new_region_id_1]).unwrap();
let new_manifest_2 =
serde_json::from_str::<RegionManifest>(&result.new_manifests[&new_region_id_2]).unwrap();
let region = engine.get_region(region_id).unwrap();
let manager = region.manifest_ctx.manifest_manager.write().await;
let manifest_storage = manager.store();
let blob_store = manifest_storage.staging_storage().blob_storage();
assert_eq!(result.manifest_paths.len(), 2);
common_telemetry::debug!("manifest paths: {:?}", result.manifest_paths);
let new_manifest_1 = blob_store
.get(&result.manifest_paths[&new_region_id_1])
.await
.unwrap();
let new_manifest_2 = blob_store
.get(&result.manifest_paths[&new_region_id_2])
.await
.unwrap();
let new_manifest_1 = serde_json::from_slice::<RegionManifest>(&new_manifest_1).unwrap();
let new_manifest_2 = serde_json::from_slice::<RegionManifest>(&new_manifest_2).unwrap();
assert_eq!(new_manifest_1.files.len(), 3);
assert_eq!(new_manifest_2.files.len(), 3);
}

View File

@@ -378,6 +378,11 @@ impl ManifestObjectStore {
pub async fn clear_staging_manifests(&mut self) -> Result<()> {
self.staging_storage.clear().await
}
/// Returns the staging storage.
pub(crate) fn staging_storage(&self) -> &StagingStorage {
&self.staging_storage
}
}
#[cfg(test)]

View File

@@ -26,20 +26,104 @@ use crate::manifest::storage::size_tracker::NoopTracker;
use crate::manifest::storage::utils::sort_manifests;
use crate::manifest::storage::{file_version, is_delta_file};
/// A simple blob storage for arbitrary binary data in the staging directory.
///
/// This is primarily used during repartition operations to store generated
/// manifests that will be consumed by other regions via [`ApplyStagingManifestRequest`](store_api::region_request::ApplyStagingManifestRequest).
/// The blobs are stored in `{region_dir}/staging/blob/` directory.
#[derive(Debug, Clone)]
pub(crate) struct StagingBlobStorage {
object_store: ObjectStore,
path: String,
}
/// Returns the staging path from the blob path.
///
/// # Example
/// - Input: `"data/table/region_0001/manifest/"`
/// - Output: `"data/table/region_0001/staging/blob/"`
pub fn staging_blob_path(manifest_path: &str) -> String {
let parent_dir = manifest_path
.trim_end_matches("manifest/")
.trim_end_matches('/');
util::normalize_dir(&format!("{}/staging/blob", parent_dir))
}
impl StagingBlobStorage {
pub fn new(path: String, object_store: ObjectStore) -> Self {
let path = util::normalize_dir(&path);
common_telemetry::debug!(
"Staging blob storage path: {}, root: {}",
path,
object_store.info().root()
);
Self { object_store, path }
}
/// Put the bytes to the blob storage.
pub async fn put(&self, path: &str, bytes: Vec<u8>) -> Result<()> {
let path = format!("{}{}", self.path, path);
common_telemetry::debug!(
"Putting blob to staging blob storage, path: {}, root: {}, bytes: {}",
path,
self.object_store.info().root(),
bytes.len()
);
self.object_store
.write(&path, bytes)
.await
.context(OpenDalSnafu)?;
Ok(())
}
/// Get the bytes from the blob storage.
pub async fn get(&self, path: &str) -> Result<Vec<u8>> {
let path = format!("{}{}", self.path, path);
common_telemetry::debug!(
"Reading blob from staging blob storage, path: {}, root: {}",
path,
self.object_store.info().root()
);
let bytes = self.object_store.read(&path).await.context(OpenDalSnafu)?;
Ok(bytes.to_vec())
}
}
/// Storage for staging manifest files and blobs used during repartition operations.
///
/// Fields:
/// - `delta_storage`: Manages incremental manifest delta files specific to the staging region.
/// - `blob_storage`: Manages arbitrary blobs, such as generated manifests for regions.
///
/// Directory structure:
/// - `{region_dir}/staging/manifest/` — for incremental manifest delta files for the staging region.
/// - `{region_dir}/staging/blob/` — for arbitrary blobs (e.g., generated region manifests).
#[derive(Debug, Clone)]
pub(crate) struct StagingStorage {
delta_storage: DeltaStorage<NoopTracker>,
blob_storage: StagingBlobStorage,
}
/// Returns the staging path from the manifest path.
///
/// # Example
/// - Input: `"data/table/region_0001/manifest/"`
/// - Output: `"data/table/region_0001/staging/manifest/"`
pub fn staging_manifest_path(manifest_path: &str) -> String {
let parent_dir = manifest_path
.trim_end_matches("manifest/")
.trim_end_matches('/');
util::normalize_dir(&format!("{}/staging/manifest", parent_dir))
}
impl StagingStorage {
pub fn new(path: String, object_store: ObjectStore, compress_type: CompressionType) -> Self {
let staging_path = {
// Convert "region_dir/manifest/" to "region_dir/staging/manifest/"
let parent_dir = path.trim_end_matches("manifest/").trim_end_matches('/');
util::normalize_dir(&format!("{}/staging/manifest", parent_dir))
};
let staging_blob_path = staging_blob_path(&path);
let blob_storage = StagingBlobStorage::new(staging_blob_path, object_store.clone());
let staging_manifest_path = staging_manifest_path(&path);
let delta_storage = DeltaStorage::new(
staging_path.clone(),
staging_manifest_path.clone(),
object_store.clone(),
compress_type,
// StagingStorage does not use a manifest cache; set to None.
@@ -48,7 +132,16 @@ impl StagingStorage {
// deleted after exiting staging mode.
Arc::new(NoopTracker),
);
Self { delta_storage }
Self {
delta_storage,
blob_storage,
}
}
/// Returns the blob storage.
pub(crate) fn blob_storage(&self) -> &StagingBlobStorage {
&self.blob_storage
}
/// Returns an iterator of manifests from staging directory.
@@ -107,3 +200,22 @@ impl StagingStorage {
self.delta_storage.set_compress_type(compress_type);
}
}
#[cfg(test)]
mod tests {
use crate::manifest::storage::staging::{staging_blob_path, staging_manifest_path};
#[test]
fn test_staging_path() {
let path = "/data/table/region_0001/manifest/";
let expected = "/data/table/region_0001/staging/manifest/";
assert_eq!(staging_manifest_path(path), expected);
}
#[test]
fn test_staging_blob_path() {
let path = "/data/table/region_0001/manifest/";
let expected = "/data/table/region_0001/staging/blob/";
assert_eq!(staging_blob_path(path), expected);
}
}

View File

@@ -50,7 +50,7 @@ use crate::error::{
FlushRegionSnafu, InvalidPartitionExprSnafu, InvalidRequestSnafu, MissingPartitionExprSnafu,
Result, UnexpectedSnafu,
};
use crate::manifest::action::{RegionEdit, RegionManifest, TruncateKind};
use crate::manifest::action::{RegionEdit, TruncateKind};
use crate::memtable::MemtableId;
use crate::memtable::bulk::part::BulkPart;
use crate::metrics::COMPACTION_ELAPSED_TOTAL;
@@ -796,10 +796,7 @@ impl WorkerRequest {
region_mapping,
new_partition_exprs,
}: store_api::region_engine::RemapManifestsRequest,
) -> Result<(
WorkerRequest,
Receiver<Result<HashMap<RegionId, RegionManifest>>>,
)> {
) -> Result<(WorkerRequest, Receiver<Result<HashMap<RegionId, String>>>)> {
let (sender, receiver) = oneshot::channel();
let new_partition_exprs = new_partition_exprs
.into_iter()
@@ -1116,8 +1113,10 @@ pub(crate) struct RemapManifestsRequest {
pub(crate) region_mapping: HashMap<RegionId, Vec<RegionId>>,
/// New partition expressions for the new regions.
pub(crate) new_partition_exprs: HashMap<RegionId, PartitionExpr>,
/// Result sender.
pub(crate) sender: Sender<Result<HashMap<RegionId, RegionManifest>>>,
/// Sender for the result of the remap operation.
///
/// The result is a map from region IDs to their corresponding staging manifest paths.
pub(crate) sender: Sender<Result<HashMap<RegionId, String>>>,
}
#[derive(Debug)]

View File

@@ -21,12 +21,14 @@ use store_api::storage::RegionId;
use tokio::sync::oneshot;
use crate::error::{
RegionStateSnafu, SerdeJsonSnafu, StagingPartitionExprMismatchSnafu, UnexpectedSnafu,
RegionStateSnafu, Result, SerdeJsonSnafu, StagingPartitionExprMismatchSnafu, UnexpectedSnafu,
};
use crate::manifest::action::RegionEdit;
use crate::region::{RegionLeaderState, RegionRoleState};
use crate::request::{OptionOutputTx, RegionEditRequest};
use crate::sst::file::FileMeta;
use crate::manifest::action::{RegionEdit, RegionManifest};
use crate::manifest::storage::manifest_dir;
use crate::manifest::storage::staging::{StagingBlobStorage, staging_blob_path};
use crate::region::{MitoRegionRef, RegionLeaderState, RegionRoleState};
use crate::request::{OptionOutputTx, RegionEditRequest, WorkerRequest, WorkerRequestWithTime};
use crate::sst::location::region_dir_from_table_dir;
use crate::worker::RegionWorkerLoop;
impl<S: LogStore> RegionWorkerLoop<S> {
@@ -86,21 +88,32 @@ impl<S: LogStore> RegionWorkerLoop<S> {
return;
}
let (tx, rx) = oneshot::channel();
let files_to_add = match serde_json::from_slice::<Vec<FileMeta>>(&request.files_to_add)
.context(SerdeJsonSnafu)
{
Ok(files_to_add) => files_to_add,
Err(e) => {
sender.send(Err(e));
let worker_sender = self.sender.clone();
common_runtime::spawn_global(async move {
let staging_manifest = match Self::fetch_staging_manifest(
&region,
request.central_region_id,
&request.manifest_path,
)
.await
{
Ok(staging_manifest) => staging_manifest,
Err(e) => {
sender.send(Err(e));
return;
}
};
if staging_manifest.metadata.partition_expr.as_ref() != Some(&request.partition_expr) {
sender.send(Err(StagingPartitionExprMismatchSnafu {
manifest_expr: staging_manifest.metadata.partition_expr.clone(),
request_expr: request.partition_expr,
}
.build()));
return;
}
};
info!("Applying staging manifest request to region {}", region_id);
self.handle_region_edit(RegionEditRequest {
region_id,
edit: RegionEdit {
let files_to_add = staging_manifest.files.values().cloned().collect::<Vec<_>>();
let edit = RegionEdit {
files_to_add,
files_to_remove: vec![],
timestamp_ms: Some(Utc::now().timestamp_millis()),
@@ -108,11 +121,23 @@ impl<S: LogStore> RegionWorkerLoop<S> {
flushed_entry_id: None,
flushed_sequence: None,
committed_sequence: None,
},
tx,
});
};
let (tx, rx) = oneshot::channel();
info!(
"Applying staging manifest request to region {}",
region.region_id,
);
let _ = worker_sender
.send(WorkerRequestWithTime::new(WorkerRequest::EditRegion(
RegionEditRequest {
region_id: region.region_id,
edit,
tx,
},
)))
.await;
common_runtime::spawn_global(async move {
// Await the result from the region edit and forward the outcome to the original sender.
// If the operation completes successfully, respond with Ok(0); otherwise, respond with an appropriate error.
if let Ok(result) = rx.await {
@@ -137,4 +162,25 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}
});
}
/// Fetches the staging manifest from the central region's staging blob storage.
///
/// The `central_region_id` is used to locate the staging directory because the staging
/// manifest was created by the central region during `remap_manifests` operation.
async fn fetch_staging_manifest(
region: &MitoRegionRef,
central_region_id: RegionId,
manifest_path: &str,
) -> Result<RegionManifest> {
let region_dir =
region_dir_from_table_dir(region.table_dir(), central_region_id, region.path_type());
let staging_blob_path = staging_blob_path(&manifest_dir(&region_dir));
let staging_blob_storage = StagingBlobStorage::new(
staging_blob_path,
region.access_layer().object_store().clone(),
);
let staging_manifest = staging_blob_storage.get(manifest_path).await?;
serde_json::from_slice::<RegionManifest>(&staging_manifest).context(SerdeJsonSnafu)
}
}

View File

@@ -16,14 +16,13 @@ use std::collections::HashMap;
use std::time::Instant;
use common_error::ext::BoxedError;
use common_telemetry::info;
use common_telemetry::{debug, info};
use futures::future::try_join_all;
use partition::expr::PartitionExpr;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use crate::error::{FetchManifestsSnafu, InvalidRequestSnafu, MissingManifestSnafu, Result};
use crate::manifest::action::RegionManifest;
use crate::error::{self, FetchManifestsSnafu, InvalidRequestSnafu, MissingManifestSnafu, Result};
use crate::region::{MitoRegionRef, RegionMetadataLoader};
use crate::remap_manifest::RemapManifest;
use crate::request::RemapManifestsRequest;
@@ -75,13 +74,17 @@ impl<S> RegionWorkerLoop<S> {
});
}
// Fetches manifests for input regions, remaps them according to the provided
// mapping and partition expressions.
//
// Returns a map from each new region to its relative staging manifest path.
async fn fetch_and_remap_manifests(
region: MitoRegionRef,
region_metadata_loader: RegionMetadataLoader,
input_regions: Vec<RegionId>,
new_partition_exprs: HashMap<RegionId, PartitionExpr>,
region_mapping: HashMap<RegionId, Vec<RegionId>>,
) -> Result<HashMap<RegionId, RegionManifest>> {
) -> Result<HashMap<RegionId, String>> {
let mut tasks = Vec::with_capacity(input_regions.len());
let region_options = region.version().options.clone();
let table_dir = region.table_dir();
@@ -97,7 +100,6 @@ impl<S> RegionWorkerLoop<S> {
.await
});
}
let results = try_join_all(tasks)
.await
.map_err(BoxedError::new)
@@ -112,12 +114,38 @@ impl<S> RegionWorkerLoop<S> {
.collect::<Result<HashMap<_, _>>>()?;
let mut mapper = RemapManifest::new(manifests, new_partition_exprs, region_mapping);
let remap_result = mapper.remap_manifests()?;
// Write new manifests to staging blob storage.
let manifest_manager = region.manifest_ctx.manifest_manager.write().await;
let manifest_storage = manifest_manager.store();
let staging_blob_storage = manifest_storage.staging_storage().blob_storage().clone();
let mut tasks = Vec::with_capacity(remap_result.new_manifests.len());
for (remap_region_id, manifest) in &remap_result.new_manifests {
let bytes = serde_json::to_vec(&manifest).context(error::SerializeManifestSnafu {
region_id: *remap_region_id,
})?;
let key = remap_manifest_key(remap_region_id);
tasks.push(async {
debug!(
"Putting manifest to staging blob storage, region_id: {}, key: {}",
*remap_region_id, key
);
staging_blob_storage.put(&key, bytes).await?;
Ok((*remap_region_id, key))
});
}
let r = try_join_all(tasks).await?;
info!(
"Remap manifests cost: {:?}, region: {}",
now.elapsed(),
region.region_id
);
Ok(remap_result.new_manifests)
Ok(r.into_iter().collect::<HashMap<_, _>>())
}
}
fn remap_manifest_key(region_id: &RegionId) -> String {
format!("remap_manifest_{}", region_id.as_u64())
}

View File

@@ -759,8 +759,11 @@ pub struct RemapManifestsRequest {
/// Response to remap manifests from old regions to new regions.
#[derive(Debug, Clone)]
pub struct RemapManifestsResponse {
/// The new manifests for the new regions.
pub new_manifests: HashMap<RegionId, String>,
/// Maps region id to its staging manifest path.
///
/// These paths are relative paths within the central region's staging blob storage,
/// and should be passed to [`ApplyStagingManifestRequest`](RegionRequest::ApplyStagingManifest) to finalize the repartition.
pub manifest_paths: HashMap<RegionId, String>,
}
/// Request to copy files from a source region to a target region.

View File

@@ -421,20 +421,17 @@ fn make_region_apply_staging_manifest(
api::v1::region::ApplyStagingManifestRequest {
region_id,
partition_expr,
files_to_add,
central_region_id,
manifest_path,
}: api::v1::region::ApplyStagingManifestRequest,
) -> Result<Vec<(RegionId, RegionRequest)>> {
let region_id = region_id.into();
let files_to_add = files_to_add
.context(UnexpectedSnafu {
reason: "'files_to_add' field is missing",
})?
.data;
Ok(vec![(
region_id,
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
partition_expr,
files_to_add,
central_region_id: central_region_id.into(),
manifest_path,
}),
)])
}
@@ -1464,8 +1461,10 @@ pub struct EnterStagingRequest {
/// In practice, this means:
/// - The `partition_expr` identifies the staging region rule that the manifest
/// was generated for.
/// - `files_to_add` carries the serialized metadata (such as file manifests or
/// file lists) that should be attached to the region under the new rule.
/// - `central_region_id` specifies which region holds the staging blob storage
/// where the manifest was written during the `remap_manifests` operation.
/// - `manifest_path` is the relative path within the central region's staging
/// blob storage to fetch the generated manifest.
///
/// It should typically be called **after** the staging region has been
/// initialized by [`EnterStagingRequest`] and the new file layout has been
@@ -1474,8 +1473,11 @@ pub struct EnterStagingRequest {
pub struct ApplyStagingManifestRequest {
/// The partition expression of the staging region.
pub partition_expr: String,
/// The files to add to the region.
pub files_to_add: Vec<u8>,
/// The region that stores the staging manifests in its staging blob storage.
pub central_region_id: RegionId,
/// The relative path to the staging manifest within the central region's
/// staging blob storage.
pub manifest_path: String,
}
impl fmt::Display for RegionRequest {