feat: introduce copy_region_from for mito engine (#7389)

* feat: introduce `copy_region_from`

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: fix clippy

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-12-16 14:12:06 +08:00
committed by GitHub
parent 9cd57e9342
commit f7d5c87ac0
16 changed files with 1213 additions and 75 deletions

View File

@@ -33,9 +33,9 @@ use servers::grpc::FlightCompression;
use session::context::QueryContextRef; use session::context::QueryContextRef;
use store_api::metadata::RegionMetadataRef; use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{ use store_api::region_engine::{
RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic, CopyRegionFromRequest, CopyRegionFromResponse, RegionEngine, RegionManifestInfo, RegionRole,
RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse, RegionScannerRef, RegionStatistic, RemapManifestsRequest, RemapManifestsResponse,
SettableRegionRoleState, SyncManifestResponse, SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse,
}; };
use store_api::region_request::{AffectedRows, RegionRequest}; use store_api::region_request::{AffectedRows, RegionRequest};
use store_api::storage::{RegionId, ScanRequest, SequenceNumber}; use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
@@ -299,6 +299,14 @@ impl RegionEngine for MockRegionEngine {
unimplemented!() unimplemented!()
} }
async fn copy_region_from(
&self,
_region_id: RegionId,
_request: CopyRegionFromRequest,
) -> Result<CopyRegionFromResponse, BoxedError> {
unimplemented!()
}
fn as_any(&self) -> &dyn Any { fn as_any(&self) -> &dyn Any {
self self
} }

View File

@@ -26,10 +26,10 @@ use object_store::ObjectStore;
use snafu::{OptionExt, ensure}; use snafu::{OptionExt, ensure};
use store_api::metadata::RegionMetadataRef; use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{ use store_api::region_engine::{
RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic, CopyRegionFromRequest, CopyRegionFromResponse, RegionEngine, RegionManifestInfo, RegionRole,
RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse, RegionScannerRef, RegionStatistic, RemapManifestsRequest, RemapManifestsResponse,
SetRegionRoleStateSuccess, SettableRegionRoleState, SinglePartitionScanner, SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState,
SyncManifestResponse, SinglePartitionScanner, SyncManifestResponse,
}; };
use store_api::region_request::{ use store_api::region_request::{
AffectedRows, RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest, AffectedRows, RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest,
@@ -163,6 +163,19 @@ impl RegionEngine for FileRegionEngine {
)) ))
} }
async fn copy_region_from(
&self,
_region_id: RegionId,
_request: CopyRegionFromRequest,
) -> Result<CopyRegionFromResponse, BoxedError> {
Err(BoxedError::new(
UnsupportedSnafu {
operation: "copy_region_from",
}
.build(),
))
}
fn role(&self, region_id: RegionId) -> Option<RegionRole> { fn role(&self, region_id: RegionId) -> Option<RegionRole> {
self.inner.state(region_id) self.inner.state(region_id)
} }

View File

@@ -43,9 +43,10 @@ pub(crate) use state::MetricEngineState;
use store_api::metadata::RegionMetadataRef; use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::METRIC_ENGINE_NAME; use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use store_api::region_engine::{ use store_api::region_engine::{
BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, BatchResponses, CopyRegionFromRequest, CopyRegionFromResponse, RegionEngine,
RegionStatistic, RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic, RemapManifestsRequest,
SetRegionRoleStateSuccess, SettableRegionRoleState, SyncManifestResponse, RemapManifestsResponse, SetRegionRoleStateResponse, SetRegionRoleStateSuccess,
SettableRegionRoleState, SyncManifestResponse,
}; };
use store_api::region_request::{ use store_api::region_request::{
BatchRegionDdlRequest, RegionCatchupRequest, RegionOpenRequest, RegionRequest, BatchRegionDdlRequest, RegionCatchupRequest, RegionOpenRequest, RegionRequest,
@@ -375,6 +376,14 @@ impl RegionEngine for MetricEngine {
} }
} }
async fn copy_region_from(
&self,
_region_id: RegionId,
_request: CopyRegionFromRequest,
) -> Result<CopyRegionFromResponse, BoxedError> {
todo!()
}
async fn set_region_role_state_gracefully( async fn set_region_role_state_gracefully(
&self, &self,
region_id: RegionId, region_id: RegionId,

View File

@@ -71,6 +71,8 @@ mod sync_test;
#[cfg(test)] #[cfg(test)]
mod truncate_test; mod truncate_test;
#[cfg(test)]
mod copy_region_from_test;
#[cfg(test)] #[cfg(test)]
mod remap_manifests_test; mod remap_manifests_test;
@@ -103,8 +105,9 @@ use store_api::metric_engine_consts::{
MANIFEST_INFO_EXTENSION_KEY, TABLE_COLUMN_METADATA_EXTENSION_KEY, MANIFEST_INFO_EXTENSION_KEY, TABLE_COLUMN_METADATA_EXTENSION_KEY,
}; };
use store_api::region_engine::{ use store_api::region_engine::{
BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, BatchResponses, CopyRegionFromRequest, CopyRegionFromResponse, MitoCopyRegionFromResponse,
RegionStatistic, RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic,
RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse,
SettableRegionRoleState, SyncManifestResponse, SettableRegionRoleState, SyncManifestResponse,
}; };
use store_api::region_request::{ use store_api::region_request::{
@@ -119,8 +122,8 @@ use crate::cache::{CacheManagerRef, CacheStrategy};
use crate::config::MitoConfig; use crate::config::MitoConfig;
use crate::engine::puffin_index::{IndexEntryContext, collect_index_entries_from_puffin}; use crate::engine::puffin_index::{IndexEntryContext, collect_index_entries_from_puffin};
use crate::error::{ use crate::error::{
InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result, self, InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu,
SerdeJsonSnafu, SerializeColumnMetadataSnafu, SerializeManifestSnafu, Result, SerdeJsonSnafu, SerializeColumnMetadataSnafu, SerializeManifestSnafu,
}; };
#[cfg(feature = "enterprise")] #[cfg(feature = "enterprise")]
use crate::extension::BoxedExtensionRangeProviderFactory; use crate::extension::BoxedExtensionRangeProviderFactory;
@@ -421,6 +424,17 @@ impl MitoEngine {
rx.await.context(RecvSnafu)? rx.await.context(RecvSnafu)?
} }
/// Handles copy region from request.
///
/// This method is only supported for internal use and is not exposed in the trait implementation.
pub async fn copy_region_from(
&self,
region_id: RegionId,
request: CopyRegionFromRequest,
) -> Result<MitoCopyRegionFromResponse> {
self.inner.copy_region_from(region_id, request).await
}
#[cfg(test)] #[cfg(test)]
pub(crate) fn get_region(&self, id: RegionId) -> Option<crate::region::MitoRegionRef> { pub(crate) fn get_region(&self, id: RegionId) -> Option<crate::region::MitoRegionRef> {
self.find_region(id) self.find_region(id)
@@ -621,7 +635,9 @@ impl MitoEngine {
} }
} }
/// Check whether the region edit is valid. Only adding files to region is considered valid now. /// Check whether the region edit is valid.
///
/// Only adding or removing files to region is considered valid now.
fn is_valid_region_edit(edit: &RegionEdit) -> bool { fn is_valid_region_edit(edit: &RegionEdit) -> bool {
!edit.files_to_add.is_empty() !edit.files_to_add.is_empty()
&& edit.files_to_remove.is_empty() && edit.files_to_remove.is_empty()
@@ -1054,6 +1070,18 @@ impl EngineInner {
Ok(RemapManifestsResponse { new_manifests }) Ok(RemapManifestsResponse { new_manifests })
} }
async fn copy_region_from(
&self,
region_id: RegionId,
request: CopyRegionFromRequest,
) -> Result<MitoCopyRegionFromResponse> {
let (request, receiver) =
WorkerRequest::try_from_copy_region_from_request(region_id, request)?;
self.workers.submit_to_worker(region_id, request).await?;
let response = receiver.await.context(RecvSnafu)??;
Ok(response)
}
fn role(&self, region_id: RegionId) -> Option<RegionRole> { fn role(&self, region_id: RegionId) -> Option<RegionRole> {
self.workers.get_region(region_id).map(|region| { self.workers.get_region(region_id).map(|region| {
if region.is_follower() { if region.is_follower() {
@@ -1240,6 +1268,19 @@ impl RegionEngine for MitoEngine {
.map_err(BoxedError::new) .map_err(BoxedError::new)
} }
async fn copy_region_from(
&self,
_region_id: RegionId,
_request: CopyRegionFromRequest,
) -> Result<CopyRegionFromResponse, BoxedError> {
Err(BoxedError::new(
error::UnsupportedOperationSnafu {
err_msg: "copy_region_from is not supported",
}
.build(),
))
}
fn role(&self, region_id: RegionId) -> Option<RegionRole> { fn role(&self, region_id: RegionId) -> Option<RegionRole> {
self.inner.role(region_id) self.inner.role(region_id)
} }

View File

@@ -0,0 +1,361 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::assert_matches::assert_matches;
use std::fs;
use std::sync::Arc;
use api::v1::Rows;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use object_store::layers::mock::{Error as MockError, ErrorKind, MockLayerBuilder};
use store_api::region_engine::{CopyRegionFromRequest, RegionEngine, RegionRole};
use store_api::region_request::{RegionFlushRequest, RegionRequest};
use store_api::storage::RegionId;
use crate::config::MitoConfig;
use crate::error::Error;
use crate::test_util::{CreateRequestBuilder, TestEnv, build_rows, put_rows, rows_schema};
#[tokio::test]
async fn test_engine_copy_region_from() {
common_telemetry::init_default_ut_logging();
test_engine_copy_region_from_with_format(true, true).await;
test_engine_copy_region_from_with_format(true, false).await;
test_engine_copy_region_from_with_format(false, true).await;
test_engine_copy_region_from_with_format(false, false).await;
}
async fn test_engine_copy_region_from_with_format(flat_format: bool, with_index: bool) {
let mut env = TestEnv::with_prefix("copy-region-from").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
..Default::default()
})
.await;
// Creates a source region and adds some data
let source_region_id = RegionId::new(1, 1);
let mut request = CreateRequestBuilder::new().build();
if with_index {
request
.column_metadatas
.iter_mut()
.find(|c| c.column_schema.name == "tag_0")
.unwrap()
.column_schema
.set_inverted_index(true);
}
let column_schemas = rows_schema(&request);
engine
.handle_request(source_region_id, RegionRequest::Create(request.clone()))
.await
.unwrap();
let rows = Rows {
schema: column_schemas,
rows: build_rows(0, 42),
};
put_rows(&engine, source_region_id, rows).await;
engine
.handle_request(
source_region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
)
.await
.unwrap();
// Creates a target region and enters staging mode
let target_region_id = RegionId::new(1, 2);
engine
.handle_request(target_region_id, RegionRequest::Create(request))
.await
.unwrap();
common_telemetry::debug!("copy region from");
let resp = engine
.copy_region_from(
target_region_id,
CopyRegionFromRequest {
source_region_id,
parallelism: 1,
},
)
.await
.unwrap();
let manifest = engine
.get_region(target_region_id)
.unwrap()
.manifest_ctx
.manifest()
.await;
assert!(!manifest.files.is_empty());
for meta in manifest.files.values() {
assert_eq!(meta.region_id, target_region_id);
assert_eq!(meta.exists_index(), with_index);
}
let source_region_dir = format!("{}/data/test/1_0000000001", env.data_home().display());
let source_region_files = collect_filename_in_dir(&source_region_dir);
let target_region_dir = format!("{}/data/test/1_0000000002", env.data_home().display());
let target_region_files = collect_filename_in_dir(&target_region_dir);
assert_eq!(source_region_files, target_region_files);
if with_index {
let source_region_index_files =
collect_filename_in_dir(&format!("{}/index", source_region_dir));
let target_region_index_files =
collect_filename_in_dir(&format!("{}/index", target_region_dir));
assert_eq!(source_region_index_files, target_region_index_files);
}
common_telemetry::debug!("copy region from again");
let resp2 = engine
.copy_region_from(
target_region_id,
CopyRegionFromRequest {
source_region_id,
parallelism: 1,
},
)
.await
.unwrap();
assert_eq!(resp.copied_file_ids, resp2.copied_file_ids);
}
#[tokio::test]
async fn test_engine_copy_region_failure() {
common_telemetry::init_default_ut_logging();
test_engine_copy_region_failure_with_format(false).await;
test_engine_copy_region_failure_with_format(true).await;
}
async fn test_engine_copy_region_failure_with_format(flat_format: bool) {
let mock_layer = MockLayerBuilder::default()
.copy_interceptor(Arc::new(|from, _, _args| {
if from.contains(".puffin") {
Some(Err(MockError::new(ErrorKind::Unexpected, "mock err")))
} else {
None
}
}))
.build()
.unwrap();
let mut env = TestEnv::new().await.with_mock_layer(mock_layer);
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
..Default::default()
})
.await;
// Creates a source region and adds some data
let source_region_id = RegionId::new(1, 1);
let mut request = CreateRequestBuilder::new().build();
request
.column_metadatas
.iter_mut()
.find(|c| c.column_schema.name == "tag_0")
.unwrap()
.column_schema
.set_inverted_index(true);
let column_schemas = rows_schema(&request);
engine
.handle_request(source_region_id, RegionRequest::Create(request.clone()))
.await
.unwrap();
let rows = Rows {
schema: column_schemas,
rows: build_rows(0, 42),
};
put_rows(&engine, source_region_id, rows).await;
engine
.handle_request(
source_region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
)
.await
.unwrap();
let source_region_dir = format!("{}/data/test/1_0000000001", env.data_home().display());
assert_file_num_in_dir(&source_region_dir, 1);
assert_file_num_in_dir(&format!("{}/index", source_region_dir), 1);
let source_region_files = collect_filename_in_dir(&source_region_dir);
let source_region_index_files =
collect_filename_in_dir(&format!("{}/index", source_region_dir));
// Creates a target region and enters staging mode
let target_region_id = RegionId::new(1, 2);
engine
.handle_request(target_region_id, RegionRequest::Create(request))
.await
.unwrap();
let err = engine
.copy_region_from(
target_region_id,
CopyRegionFromRequest {
source_region_id,
parallelism: 1,
},
)
.await
.unwrap_err();
assert_eq!(err.status_code(), StatusCode::StorageUnavailable);
// Check target region directory is empty
let target_region_dir = format!("{}/data/test/1_0000000002", env.data_home().display());
assert_file_num_in_dir(&target_region_dir, 0);
assert!(!fs::exists(format!("{}/index", target_region_dir)).unwrap());
// Check source region directory is not affected
let source_region_dir = format!("{}/data/test/1_0000000001", env.data_home().display());
assert_file_num_in_dir(&source_region_dir, 1);
assert_file_num_in_dir(&format!("{}/index", source_region_dir), 1);
assert_eq!(
source_region_files,
collect_filename_in_dir(&source_region_dir)
);
assert_eq!(
source_region_index_files,
collect_filename_in_dir(&format!("{}/index", source_region_dir))
);
}
fn assert_file_num_in_dir(dir: &str, expected_num: usize) {
let files = fs::read_dir(dir)
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap()
.into_iter()
.filter(|f| f.metadata().unwrap().is_file())
.collect::<Vec<_>>();
assert_eq!(
files.len(),
expected_num,
"The number of files in the directory should be {}, got: {:?}",
expected_num,
files
);
}
fn collect_filename_in_dir(dir: &str) -> Vec<String> {
let mut files = fs::read_dir(dir)
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap()
.into_iter()
.filter(|f| f.metadata().unwrap().is_file())
.map(|f| {
f.path()
.to_string_lossy()
.rsplit("/")
.last()
.unwrap()
.to_string()
})
.collect::<Vec<_>>();
files.sort_unstable();
files
}
#[tokio::test]
async fn test_engine_copy_region_invalid_args() {
common_telemetry::init_default_ut_logging();
test_engine_copy_region_invalid_args_with_format(false).await;
test_engine_copy_region_invalid_args_with_format(true).await;
}
async fn test_engine_copy_region_invalid_args_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
..Default::default()
})
.await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
engine
.handle_request(region_id, RegionRequest::Create(request.clone()))
.await
.unwrap();
let err = engine
.copy_region_from(
region_id,
CopyRegionFromRequest {
source_region_id: RegionId::new(2, 1),
parallelism: 1,
},
)
.await
.unwrap_err();
assert_eq!(err.status_code(), StatusCode::InvalidArguments);
let err = engine
.copy_region_from(
region_id,
CopyRegionFromRequest {
source_region_id: RegionId::new(1, 1),
parallelism: 1,
},
)
.await
.unwrap_err();
assert_eq!(err.status_code(), StatusCode::InvalidArguments);
}
#[tokio::test]
async fn test_engine_copy_region_unexpected_state() {
common_telemetry::init_default_ut_logging();
test_engine_copy_region_unexpected_state_with_format(false).await;
test_engine_copy_region_unexpected_state_with_format(true).await;
}
async fn test_engine_copy_region_unexpected_state_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
..Default::default()
})
.await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
engine
.handle_request(region_id, RegionRequest::Create(request.clone()))
.await
.unwrap();
engine
.set_region_role(region_id, RegionRole::Follower)
.unwrap();
let err = engine
.copy_region_from(
region_id,
CopyRegionFromRequest {
source_region_id: RegionId::new(1, 2),
parallelism: 1,
},
)
.await
.unwrap_err();
assert_matches!(
err.as_any().downcast_ref::<Error>().unwrap(),
Error::RegionState { .. }
)
}

View File

@@ -1185,6 +1185,18 @@ pub enum Error {
#[snafu(implicit)] #[snafu(implicit)]
location: Location, location: Location,
}, },
#[snafu(display(
"Invalid source and target region, source: {}, target: {}",
source_region_id,
target_region_id
))]
InvalidSourceAndTargetRegion {
source_region_id: RegionId,
target_region_id: RegionId,
#[snafu(implicit)]
location: Location,
},
} }
pub type Result<T, E = Error> = std::result::Result<T, E>; pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -1253,7 +1265,8 @@ impl ErrorExt for Error {
| MissingManifest { .. } | MissingManifest { .. }
| NoOldManifests { .. } | NoOldManifests { .. }
| MissingPartitionExpr { .. } | MissingPartitionExpr { .. }
| SerializePartitionExpr { .. } => StatusCode::InvalidArguments, | SerializePartitionExpr { .. }
| InvalidSourceAndTargetRegion { .. } => StatusCode::InvalidArguments,
RegionMetadataNotFound { .. } RegionMetadataNotFound { .. }
| Join { .. } | Join { .. }

View File

@@ -17,6 +17,7 @@
pub mod catchup; pub mod catchup;
pub mod opener; pub mod opener;
pub mod options; pub mod options;
pub mod utils;
pub(crate) mod version; pub(crate) mod version;
use std::collections::hash_map::Entry; use std::collections::hash_map::Entry;
@@ -34,9 +35,11 @@ use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{ use store_api::region_engine::{
RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState, RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState,
}; };
use store_api::region_request::PathType;
use store_api::sst_entry::ManifestSstEntry; use store_api::sst_entry::ManifestSstEntry;
use store_api::storage::{RegionId, SequenceNumber}; use store_api::storage::{FileId, RegionId, SequenceNumber};
use tokio::sync::RwLockWriteGuard; use tokio::sync::RwLockWriteGuard;
pub use utils::*;
use crate::access_layer::AccessLayerRef; use crate::access_layer::AccessLayerRef;
use crate::error::{ use crate::error::{
@@ -49,6 +52,7 @@ use crate::manifest::action::{
use crate::manifest::manager::RegionManifestManager; use crate::manifest::manager::RegionManifestManager;
use crate::region::version::{VersionControlRef, VersionRef}; use crate::region::version::{VersionControlRef, VersionRef};
use crate::request::{OnFailure, OptionOutputTx}; use crate::request::{OnFailure, OptionOutputTx};
use crate::sst::file::FileMeta;
use crate::sst::file_purger::FilePurgerRef; use crate::sst::file_purger::FilePurgerRef;
use crate::sst::location::{index_file_path, sst_file_path}; use crate::sst::location::{index_file_path, sst_file_path};
use crate::time_provider::TimeProviderRef; use crate::time_provider::TimeProviderRef;
@@ -215,6 +219,11 @@ impl MitoRegion {
self.access_layer.table_dir() self.access_layer.table_dir()
} }
/// Returns the path type of the region.
pub(crate) fn path_type(&self) -> PathType {
self.access_layer.path_type()
}
/// Returns whether the region is writable. /// Returns whether the region is writable.
pub(crate) fn is_writable(&self) -> bool { pub(crate) fn is_writable(&self) -> bool {
matches!( matches!(
@@ -657,6 +666,16 @@ impl MitoRegion {
.collect() .collect()
} }
/// Returns the file metas of the region by file ids.
pub async fn file_metas(&self, file_ids: &[FileId]) -> Vec<Option<FileMeta>> {
let manifest_files = self.manifest_ctx.manifest().await.files.clone();
file_ids
.iter()
.map(|file_id| manifest_files.get(file_id).cloned())
.collect::<Vec<_>>()
}
/// Exit staging mode successfully by merging all staged manifests and making them visible. /// Exit staging mode successfully by merging all staged manifests and making them visible.
pub(crate) async fn exit_staging_on_success( pub(crate) async fn exit_staging_on_success(
&self, &self,

View File

@@ -636,54 +636,6 @@ pub fn get_object_store(
} }
} }
/// A loader for loading metadata from a region dir.
#[derive(Debug, Clone)]
pub struct RegionMetadataLoader {
config: Arc<MitoConfig>,
object_store_manager: ObjectStoreManagerRef,
}
impl RegionMetadataLoader {
/// Creates a new `RegionOpenerBuilder`.
pub fn new(config: Arc<MitoConfig>, object_store_manager: ObjectStoreManagerRef) -> Self {
Self {
config,
object_store_manager,
}
}
/// Loads the metadata of the region from the region dir.
pub async fn load(
&self,
region_dir: &str,
region_options: &RegionOptions,
) -> Result<Option<RegionMetadataRef>> {
let manifest = self
.load_manifest(region_dir, &region_options.storage)
.await?;
Ok(manifest.map(|m| m.metadata.clone()))
}
/// Loads the manifest of the region from the region dir.
pub async fn load_manifest(
&self,
region_dir: &str,
storage: &Option<String>,
) -> Result<Option<Arc<RegionManifest>>> {
let object_store = get_object_store(storage, &self.object_store_manager)?;
let region_manifest_options =
RegionManifestOptions::new(&self.config, region_dir, &object_store);
let Some(manifest_manager) =
RegionManifestManager::open(region_manifest_options, &Default::default()).await?
else {
return Ok(None);
};
let manifest = manifest_manager.manifest();
Ok(Some(manifest))
}
}
/// Checks whether the recovered region has the same schema as region to create. /// Checks whether the recovered region has the same schema as region to create.
pub(crate) fn check_recovered_region( pub(crate) fn check_recovered_region(
recovered: &RegionMetadata, recovered: &RegionMetadata,

View File

@@ -0,0 +1,345 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use std::time::Instant;
use common_base::readable_size::ReadableSize;
use common_telemetry::{debug, error, info};
use futures::future::try_join_all;
use object_store::manager::ObjectStoreManagerRef;
use snafu::{ResultExt, ensure};
use store_api::metadata::RegionMetadataRef;
use store_api::region_request::PathType;
use store_api::storage::{FileId, IndexVersion, RegionId};
use crate::access_layer::AccessLayerRef;
use crate::config::MitoConfig;
use crate::error::{self, InvalidSourceAndTargetRegionSnafu, Result};
use crate::manifest::action::RegionManifest;
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::region::opener::get_object_store;
use crate::region::options::RegionOptions;
use crate::sst::file::{RegionFileId, RegionIndexId};
use crate::sst::location;
/// A loader for loading metadata from a region dir.
#[derive(Debug, Clone)]
pub struct RegionMetadataLoader {
config: Arc<MitoConfig>,
object_store_manager: ObjectStoreManagerRef,
}
impl RegionMetadataLoader {
/// Creates a new `RegionMetadataLoader`.
pub fn new(config: Arc<MitoConfig>, object_store_manager: ObjectStoreManagerRef) -> Self {
Self {
config,
object_store_manager,
}
}
/// Loads the metadata of the region from the region dir.
pub async fn load(
&self,
region_dir: &str,
region_options: &RegionOptions,
) -> Result<Option<RegionMetadataRef>> {
let manifest = self
.load_manifest(region_dir, &region_options.storage)
.await?;
Ok(manifest.map(|m| m.metadata.clone()))
}
/// Loads the manifest of the region from the region dir.
pub async fn load_manifest(
&self,
region_dir: &str,
storage: &Option<String>,
) -> Result<Option<Arc<RegionManifest>>> {
let object_store = get_object_store(storage, &self.object_store_manager)?;
let region_manifest_options =
RegionManifestOptions::new(&self.config, region_dir, &object_store);
let Some(manifest_manager) =
RegionManifestManager::open(region_manifest_options, &Default::default()).await?
else {
return Ok(None);
};
let manifest = manifest_manager.manifest();
Ok(Some(manifest))
}
}
/// A copier for copying files from a region to another region.
#[derive(Debug, Clone)]
pub struct RegionFileCopier {
access_layer: AccessLayerRef,
}
/// A descriptor for a file.
#[derive(Debug, Clone, Copy)]
pub enum FileDescriptor {
/// An index file.
Index {
file_id: FileId,
version: IndexVersion,
size: u64,
},
/// A data file.
Data { file_id: FileId, size: u64 },
}
impl FileDescriptor {
pub fn size(&self) -> u64 {
match self {
FileDescriptor::Index { size, .. } => *size,
FileDescriptor::Data { size, .. } => *size,
}
}
}
/// Builds the source and target file paths for a given file descriptor.
///
/// # Arguments
///
/// * `source_region_id`: The ID of the source region.
/// * `target_region_id`: The ID of the target region.
/// * `file_id`: The ID of the file.
///
/// # Returns
///
/// A tuple containing the source and target file paths.
fn build_copy_file_paths(
source_region_id: RegionId,
target_region_id: RegionId,
file_descriptor: FileDescriptor,
table_dir: &str,
path_type: PathType,
) -> (String, String) {
match file_descriptor {
FileDescriptor::Index {
file_id, version, ..
} => (
location::index_file_path(
table_dir,
RegionIndexId::new(RegionFileId::new(source_region_id, file_id), version),
path_type,
),
location::index_file_path(
table_dir,
RegionIndexId::new(RegionFileId::new(target_region_id, file_id), version),
path_type,
),
),
FileDescriptor::Data { file_id, .. } => (
location::sst_file_path(
table_dir,
RegionFileId::new(source_region_id, file_id),
path_type,
),
location::sst_file_path(
table_dir,
RegionFileId::new(target_region_id, file_id),
path_type,
),
),
}
}
fn build_delete_file_path(
target_region_id: RegionId,
file_descriptor: FileDescriptor,
table_dir: &str,
path_type: PathType,
) -> String {
match file_descriptor {
FileDescriptor::Index {
file_id, version, ..
} => location::index_file_path(
table_dir,
RegionIndexId::new(RegionFileId::new(target_region_id, file_id), version),
path_type,
),
FileDescriptor::Data { file_id, .. } => location::sst_file_path(
table_dir,
RegionFileId::new(target_region_id, file_id),
path_type,
),
}
}
impl RegionFileCopier {
pub fn new(access_layer: AccessLayerRef) -> Self {
Self { access_layer }
}
/// Copies files from a source region to a target region.
///
/// # Arguments
///
/// * `source_region_id`: The ID of the source region.
/// * `target_region_id`: The ID of the target region.
/// * `file_ids`: The IDs of the files to copy.
pub async fn copy_files(
&self,
source_region_id: RegionId,
target_region_id: RegionId,
file_ids: Vec<FileDescriptor>,
parallelism: usize,
) -> Result<()> {
ensure!(
source_region_id.table_id() == target_region_id.table_id(),
InvalidSourceAndTargetRegionSnafu {
source_region_id,
target_region_id,
},
);
let table_dir = self.access_layer.table_dir();
let path_type = self.access_layer.path_type();
let object_store = self.access_layer.object_store();
info!(
"Copying {} files from region {} to region {}",
file_ids.len(),
source_region_id,
target_region_id
);
debug!(
"Copying files: {:?} from region {} to region {}",
file_ids, source_region_id, target_region_id
);
let mut tasks = Vec::with_capacity(parallelism);
for skip in 0..parallelism {
let target_file_ids = file_ids.iter().skip(skip).step_by(parallelism).copied();
let object_store = object_store.clone();
tasks.push(async move {
for file_desc in target_file_ids {
let (source_path, target_path) = build_copy_file_paths(
source_region_id,
target_region_id,
file_desc,
table_dir,
path_type,
);
let now = Instant::now();
object_store
.copy(&source_path, &target_path)
.await
.inspect_err(
|e| error!(e; "Failed to copy file {} to {}", source_path, target_path),
)
.context(error::OpenDalSnafu)?;
let file_size = ReadableSize(file_desc.size());
info!(
"Copied file {} to {}, file size: {}, elapsed: {:?}",
source_path,
target_path,
file_size,
now.elapsed(),
);
}
Ok(())
});
}
if let Err(err) = try_join_all(tasks).await {
error!(err; "Failed to copy files from region {} to region {}", source_region_id, target_region_id);
self.clean_target_region(target_region_id, file_ids).await;
return Err(err);
}
Ok(())
}
/// Cleans the copied files from the target region.
async fn clean_target_region(&self, target_region_id: RegionId, file_ids: Vec<FileDescriptor>) {
let table_dir = self.access_layer.table_dir();
let path_type = self.access_layer.path_type();
let object_store = self.access_layer.object_store();
let delete_file_path = file_ids
.into_iter()
.map(|file_descriptor| {
build_delete_file_path(target_region_id, file_descriptor, table_dir, path_type)
})
.collect::<Vec<_>>();
debug!(
"Deleting files: {:?} after failed to copy files to target region {}",
delete_file_path, target_region_id
);
if let Err(err) = object_store.delete_iter(delete_file_path).await {
error!(err; "Failed to delete files from region {}", target_region_id);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_build_copy_file_paths() {
common_telemetry::init_default_ut_logging();
let file_id = FileId::random();
let source_region_id = RegionId::new(1, 1);
let target_region_id = RegionId::new(1, 2);
let file_descriptor = FileDescriptor::Data { file_id, size: 100 };
let table_dir = "/table_dir";
let path_type = PathType::Bare;
let (source_path, target_path) = build_copy_file_paths(
source_region_id,
target_region_id,
file_descriptor,
table_dir,
path_type,
);
assert_eq!(
source_path,
format!("/table_dir/1_0000000001/{}.parquet", file_id)
);
assert_eq!(
target_path,
format!("/table_dir/1_0000000002/{}.parquet", file_id)
);
let version = 1;
let file_descriptor = FileDescriptor::Index {
file_id,
version,
size: 100,
};
let (source_path, target_path) = build_copy_file_paths(
source_region_id,
target_region_id,
file_descriptor,
table_dir,
path_type,
);
assert_eq!(
source_path,
format!(
"/table_dir/1_0000000001/index/{}.{}.puffin",
file_id, version
)
);
assert_eq!(
target_path,
format!(
"/table_dir/1_0000000002/index/{}.{}.puffin",
file_id, version
)
);
}
}

View File

@@ -33,7 +33,9 @@ use snafu::{OptionExt, ResultExt, ensure};
use store_api::ManifestVersion; use store_api::ManifestVersion;
use store_api::codec::{PrimaryKeyEncoding, infer_primary_key_encoding_from_hint}; use store_api::codec::{PrimaryKeyEncoding, infer_primary_key_encoding_from_hint};
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef}; use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState}; use store_api::region_engine::{
MitoCopyRegionFromResponse, SetRegionRoleStateResponse, SettableRegionRoleState,
};
use store_api::region_request::{ use store_api::region_request::{
AffectedRows, EnterStagingRequest, RegionAlterRequest, RegionBuildIndexRequest, AffectedRows, EnterStagingRequest, RegionAlterRequest, RegionBuildIndexRequest,
RegionBulkInsertsRequest, RegionCatchupRequest, RegionCloseRequest, RegionCompactRequest, RegionBulkInsertsRequest, RegionCatchupRequest, RegionCloseRequest, RegionCompactRequest,
@@ -605,6 +607,9 @@ pub(crate) enum WorkerRequest {
/// Remap manifests request. /// Remap manifests request.
RemapManifests(RemapManifestsRequest), RemapManifests(RemapManifestsRequest),
/// Copy region from request.
CopyRegionFrom(CopyRegionFromRequest),
} }
impl WorkerRequest { impl WorkerRequest {
@@ -813,6 +818,24 @@ impl WorkerRequest {
Ok((WorkerRequest::RemapManifests(request), receiver)) Ok((WorkerRequest::RemapManifests(request), receiver))
} }
/// Converts [CopyRegionFromRequest] from a [CopyRegionFromRequest](store_api::region_engine::CopyRegionFromRequest).
pub(crate) fn try_from_copy_region_from_request(
region_id: RegionId,
store_api::region_engine::CopyRegionFromRequest {
source_region_id,
parallelism,
}: store_api::region_engine::CopyRegionFromRequest,
) -> Result<(WorkerRequest, Receiver<Result<MitoCopyRegionFromResponse>>)> {
let (sender, receiver) = oneshot::channel();
let request = CopyRegionFromRequest {
region_id,
source_region_id,
parallelism,
sender,
};
Ok((WorkerRequest::CopyRegionFrom(request), receiver))
}
} }
/// DDL request to a region. /// DDL request to a region.
@@ -867,6 +890,8 @@ pub(crate) enum BackgroundNotify {
RegionEdit(RegionEditResult), RegionEdit(RegionEditResult),
/// Enter staging result. /// Enter staging result.
EnterStaging(EnterStagingResult), EnterStaging(EnterStagingResult),
/// Copy region result.
CopyRegionFromFinished(CopyRegionFromFinished),
} }
/// Notifies a flush job is finished. /// Notifies a flush job is finished.
@@ -1023,6 +1048,16 @@ pub(crate) struct EnterStagingResult {
pub(crate) result: Result<()>, pub(crate) result: Result<()>,
} }
#[derive(Debug)]
pub(crate) struct CopyRegionFromFinished {
/// Region id.
pub(crate) region_id: RegionId,
/// Region edit to apply.
pub(crate) edit: RegionEdit,
/// Result sender.
pub(crate) sender: Sender<Result<MitoCopyRegionFromResponse>>,
}
/// Request to edit a region directly. /// Request to edit a region directly.
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct RegionEditRequest { pub(crate) struct RegionEditRequest {
@@ -1077,6 +1112,18 @@ pub(crate) struct RemapManifestsRequest {
pub(crate) sender: Sender<Result<HashMap<RegionId, RegionManifest>>>, pub(crate) sender: Sender<Result<HashMap<RegionId, RegionManifest>>>,
} }
#[derive(Debug)]
pub(crate) struct CopyRegionFromRequest {
/// The [`RegionId`] of the target region.
pub(crate) region_id: RegionId,
/// The [`RegionId`] of the source region.
pub(crate) source_region_id: RegionId,
/// The parallelism of the copy operation.
pub(crate) parallelism: usize,
/// Result sender.
pub(crate) sender: Sender<Result<MitoCopyRegionFromResponse>>,
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use api::v1::value::ValueData; use api::v1::value::ValueData;

View File

@@ -19,6 +19,7 @@ mod handle_bulk_insert;
mod handle_catchup; mod handle_catchup;
mod handle_close; mod handle_close;
mod handle_compaction; mod handle_compaction;
mod handle_copy_region;
mod handle_create; mod handle_create;
mod handle_drop; mod handle_drop;
mod handle_enter_staging; mod handle_enter_staging;
@@ -1039,6 +1040,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
WorkerRequest::RemapManifests(req) => { WorkerRequest::RemapManifests(req) => {
self.handle_remap_manifests_request(req); self.handle_remap_manifests_request(req);
} }
WorkerRequest::CopyRegionFrom(req) => {
self.handle_copy_region_from_request(req);
}
} }
} }
@@ -1154,6 +1158,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
} }
BackgroundNotify::EnterStaging(req) => self.handle_enter_staging_result(req).await, BackgroundNotify::EnterStaging(req) => self.handle_enter_staging_result(req).await,
BackgroundNotify::RegionEdit(req) => self.handle_region_edit_result(req).await, BackgroundNotify::RegionEdit(req) => self.handle_region_edit_result(req).await,
BackgroundNotify::CopyRegionFromFinished(req) => {
self.handle_copy_region_from_finished(req)
}
} }
} }

View File

@@ -0,0 +1,245 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_telemetry::{debug, error, info};
use snafu::OptionExt;
use store_api::region_engine::MitoCopyRegionFromResponse;
use store_api::storage::{FileId, RegionId};
use crate::error::{InvalidRequestSnafu, MissingManifestSnafu, Result};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::region::{FileDescriptor, MitoRegionRef, RegionFileCopier, RegionMetadataLoader};
use crate::request::{
BackgroundNotify, CopyRegionFromFinished, CopyRegionFromRequest, WorkerRequest,
};
use crate::sst::location::region_dir_from_table_dir;
use crate::worker::{RegionWorkerLoop, WorkerRequestWithTime};
impl<S> RegionWorkerLoop<S> {
pub(crate) fn handle_copy_region_from_request(&mut self, request: CopyRegionFromRequest) {
let region_id = request.region_id;
let source_region_id = request.source_region_id;
let sender = request.sender;
let region = match self.regions.writable_region(region_id) {
Ok(region) => region,
Err(e) => {
let _ = sender.send(Err(e));
return;
}
};
let same_table = source_region_id.table_id() == region_id.table_id();
if !same_table {
let _ = sender.send(
InvalidRequestSnafu {
region_id,
reason: format!("Source and target regions must be from the same table, source_region_id: {source_region_id}, target_region_id: {region_id}"),
}
.fail(),
);
return;
}
if source_region_id == region_id {
let _ = sender.send(
InvalidRequestSnafu {
region_id,
reason: format!("Source and target regions must be different, source_region_id: {source_region_id}, target_region_id: {region_id}"),
}
.fail(),
);
return;
}
let region_metadata_loader =
RegionMetadataLoader::new(self.config.clone(), self.object_store_manager.clone());
let worker_sender = self.sender.clone();
common_runtime::spawn_global(async move {
let (region_edit, file_ids) = match Self::copy_region_from(
&region,
region_metadata_loader,
source_region_id,
region_id,
request.parallelism.max(1),
)
.await
{
Ok(region_files) => region_files,
Err(e) => {
let _ = sender.send(Err(e));
return;
}
};
match region_edit {
Some(region_edit) => {
if let Err(e) = worker_sender
.send(WorkerRequestWithTime::new(WorkerRequest::Background {
region_id,
notify: BackgroundNotify::CopyRegionFromFinished(
CopyRegionFromFinished {
region_id,
edit: region_edit,
sender,
},
),
}))
.await
{
error!(e; "Failed to send copy region from finished notification to worker, region_id: {}", region_id);
}
}
None => {
let _ = sender.send(Ok(MitoCopyRegionFromResponse {
copied_file_ids: file_ids,
}));
}
}
});
}
pub(crate) fn handle_copy_region_from_finished(&mut self, request: CopyRegionFromFinished) {
let region_id = request.region_id;
let sender = request.sender;
let region = match self.regions.writable_region(region_id) {
Ok(region) => region,
Err(e) => {
let _ = sender.send(Err(e));
return;
}
};
let copied_file_ids = request
.edit
.files_to_add
.iter()
.map(|file_meta| file_meta.file_id)
.collect();
region
.version_control
.apply_edit(Some(request.edit), &[], region.file_purger.clone());
let _ = sender.send(Ok(MitoCopyRegionFromResponse { copied_file_ids }));
}
/// Returns the region edit and the file ids that were copied from the source region to the target region.
///
/// If no need to copy files, returns (None, file_ids).
async fn copy_region_from(
region: &MitoRegionRef,
region_metadata_loader: RegionMetadataLoader,
source_region_id: RegionId,
target_region_id: RegionId,
parallelism: usize,
) -> Result<(Option<RegionEdit>, Vec<FileId>)> {
let table_dir = region.table_dir();
let path_type = region.path_type();
let region_dir = region_dir_from_table_dir(table_dir, source_region_id, path_type);
info!(
"Loading source region manifest from region dir: {region_dir}, target region: {target_region_id}"
);
let source_region_manifest = region_metadata_loader
.load_manifest(&region_dir, &region.version().options.storage)
.await?
.context(MissingManifestSnafu {
region_id: source_region_id,
})?;
let mut files_to_copy = vec![];
let target_region_manifest = region.manifest_ctx.manifest().await;
let file_ids = source_region_manifest
.files
.keys()
.cloned()
.collect::<Vec<_>>();
debug!(
"source region files: {:?}, source region id: {}",
source_region_manifest.files, source_region_id
);
for (file_id, file_meta) in &source_region_manifest.files {
if !target_region_manifest.files.contains_key(file_id) {
let mut new_file_meta = file_meta.clone();
new_file_meta.region_id = target_region_id;
files_to_copy.push(new_file_meta);
}
}
if files_to_copy.is_empty() {
return Ok((None, file_ids));
}
let file_descriptors = files_to_copy
.iter()
.flat_map(|file_meta| {
if file_meta.exists_index() {
let region_index_id = file_meta.index_id();
let file_id = region_index_id.file_id.file_id();
let version = region_index_id.version;
let file_size = file_meta.file_size;
let index_file_size = file_meta.index_file_size();
vec![
FileDescriptor::Data {
file_id: file_meta.file_id,
size: file_size,
},
FileDescriptor::Index {
file_id,
version,
size: index_file_size,
},
]
} else {
let file_size = file_meta.file_size;
vec![FileDescriptor::Data {
file_id: file_meta.file_id,
size: file_size,
}]
}
})
.collect();
debug!("File descriptors to copy: {:?}", file_descriptors);
let copier = RegionFileCopier::new(region.access_layer());
// TODO(weny): ensure the target region is empty.
copier
.copy_files(
source_region_id,
target_region_id,
file_descriptors,
parallelism,
)
.await?;
let edit = RegionEdit {
files_to_add: files_to_copy,
files_to_remove: vec![],
timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
committed_sequence: None,
};
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
info!("Applying {edit:?} to region {target_region_id}, reason: CopyRegionFrom");
let version = region
.manifest_ctx
.manifest_manager
.write()
.await
.update(action_list, false)
.await?;
info!(
"Successfully update manifest version to {version}, region: {target_region_id}, reason: CopyRegionFrom"
);
Ok((Some(edit), file_ids))
}
}

View File

@@ -20,13 +20,11 @@ use common_telemetry::info;
use futures::future::try_join_all; use futures::future::try_join_all;
use partition::expr::PartitionExpr; use partition::expr::PartitionExpr;
use snafu::{OptionExt, ResultExt}; use snafu::{OptionExt, ResultExt};
use store_api::region_request::PathType;
use store_api::storage::RegionId; use store_api::storage::RegionId;
use crate::error::{FetchManifestsSnafu, InvalidRequestSnafu, MissingManifestSnafu, Result}; use crate::error::{FetchManifestsSnafu, InvalidRequestSnafu, MissingManifestSnafu, Result};
use crate::manifest::action::RegionManifest; use crate::manifest::action::RegionManifest;
use crate::region::MitoRegionRef; use crate::region::{MitoRegionRef, RegionMetadataLoader};
use crate::region::opener::RegionMetadataLoader;
use crate::remap_manifest::RemapManifest; use crate::remap_manifest::RemapManifest;
use crate::request::RemapManifestsRequest; use crate::request::RemapManifestsRequest;
use crate::sst::location::region_dir_from_table_dir; use crate::sst::location::region_dir_from_table_dir;
@@ -87,10 +85,10 @@ impl<S> RegionWorkerLoop<S> {
let mut tasks = Vec::with_capacity(input_regions.len()); let mut tasks = Vec::with_capacity(input_regions.len());
let region_options = region.version().options.clone(); let region_options = region.version().options.clone();
let table_dir = region.table_dir(); let table_dir = region.table_dir();
let path_type = region.path_type();
let now = Instant::now(); let now = Instant::now();
for input_region in &input_regions { for input_region in &input_regions {
let region_dir = region_dir_from_table_dir(table_dir, *input_region, PathType::Bare); let region_dir = region_dir_from_table_dir(table_dir, *input_region, path_type);
let storage = region_options.storage.clone(); let storage = region_options.storage.clone();
let moved_region_metadata_loader = region_metadata_loader.clone(); let moved_region_metadata_loader = region_metadata_loader.clone();
tasks.push(async move { tasks.push(async move {

View File

@@ -21,12 +21,14 @@ pub use opendal::raw::{
Access, Layer, LayeredAccess, OpDelete, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead, Access, Layer, LayeredAccess, OpDelete, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead,
RpWrite, oio, RpWrite, oio,
}; };
use opendal::raw::{OpCopy, RpCopy};
pub use opendal::{Buffer, Error, ErrorKind, Metadata, Result}; pub use opendal::{Buffer, Error, ErrorKind, Metadata, Result};
pub type MockWriterFactory = Arc<dyn Fn(&str, OpWrite, oio::Writer) -> oio::Writer + Send + Sync>; pub type MockWriterFactory = Arc<dyn Fn(&str, OpWrite, oio::Writer) -> oio::Writer + Send + Sync>;
pub type MockReaderFactory = Arc<dyn Fn(&str, OpRead, oio::Reader) -> oio::Reader + Send + Sync>; pub type MockReaderFactory = Arc<dyn Fn(&str, OpRead, oio::Reader) -> oio::Reader + Send + Sync>;
pub type MockListerFactory = Arc<dyn Fn(&str, OpList, oio::Lister) -> oio::Lister + Send + Sync>; pub type MockListerFactory = Arc<dyn Fn(&str, OpList, oio::Lister) -> oio::Lister + Send + Sync>;
pub type MockDeleterFactory = Arc<dyn Fn(oio::Deleter) -> oio::Deleter + Send + Sync>; pub type MockDeleterFactory = Arc<dyn Fn(oio::Deleter) -> oio::Deleter + Send + Sync>;
pub type CopyInterceptor = Arc<dyn Fn(&str, &str, OpCopy) -> Option<Result<RpCopy>> + Send + Sync>;
#[derive(Builder)] #[derive(Builder)]
pub struct MockLayer { pub struct MockLayer {
@@ -38,6 +40,8 @@ pub struct MockLayer {
lister_factory: Option<MockListerFactory>, lister_factory: Option<MockListerFactory>,
#[builder(setter(strip_option), default)] #[builder(setter(strip_option), default)]
deleter_factory: Option<MockDeleterFactory>, deleter_factory: Option<MockDeleterFactory>,
#[builder(setter(strip_option), default)]
copy_interceptor: Option<CopyInterceptor>,
} }
impl Clone for MockLayer { impl Clone for MockLayer {
@@ -47,6 +51,7 @@ impl Clone for MockLayer {
reader_factory: self.reader_factory.clone(), reader_factory: self.reader_factory.clone(),
lister_factory: self.lister_factory.clone(), lister_factory: self.lister_factory.clone(),
deleter_factory: self.deleter_factory.clone(), deleter_factory: self.deleter_factory.clone(),
copy_interceptor: self.copy_interceptor.clone(),
} }
} }
} }
@@ -61,6 +66,7 @@ impl<A: Access> Layer<A> for MockLayer {
reader_factory: self.reader_factory.clone(), reader_factory: self.reader_factory.clone(),
lister_factory: self.lister_factory.clone(), lister_factory: self.lister_factory.clone(),
deleter_factory: self.deleter_factory.clone(), deleter_factory: self.deleter_factory.clone(),
copy_interceptor: self.copy_interceptor.clone(),
} }
} }
} }
@@ -71,6 +77,7 @@ pub struct MockAccessor<A> {
reader_factory: Option<MockReaderFactory>, reader_factory: Option<MockReaderFactory>,
lister_factory: Option<MockListerFactory>, lister_factory: Option<MockListerFactory>,
deleter_factory: Option<MockDeleterFactory>, deleter_factory: Option<MockDeleterFactory>,
copy_interceptor: Option<CopyInterceptor>,
} }
impl<A: Debug> Debug for MockAccessor<A> { impl<A: Debug> Debug for MockAccessor<A> {
@@ -214,4 +221,16 @@ impl<A: Access> LayeredAccess for MockAccessor<A> {
}) })
} }
} }
async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
let Some(copy_interceptor) = self.copy_interceptor.as_ref() else {
return self.inner.copy(from, to, args).await;
};
let Some(result) = copy_interceptor(from, to, args.clone()) else {
return self.inner.copy(from, to, args).await;
};
result
}
} }

View File

@@ -28,9 +28,9 @@ use store_api::metadata::{
ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef, ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
}; };
use store_api::region_engine::{ use store_api::region_engine::{
RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic, CopyRegionFromRequest, CopyRegionFromResponse, RegionEngine, RegionManifestInfo, RegionRole,
RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse, RegionScannerRef, RegionStatistic, RemapManifestsRequest, RemapManifestsResponse,
SettableRegionRoleState, SyncManifestResponse, SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse,
}; };
use store_api::region_request::RegionRequest; use store_api::region_request::RegionRequest;
use store_api::storage::{ConcreteDataType, RegionId, ScanRequest, SequenceNumber}; use store_api::storage::{ConcreteDataType, RegionId, ScanRequest, SequenceNumber};
@@ -125,6 +125,14 @@ impl RegionEngine for MetaRegionEngine {
unimplemented!() unimplemented!()
} }
async fn copy_region_from(
&self,
_region_id: RegionId,
_request: CopyRegionFromRequest,
) -> Result<CopyRegionFromResponse, BoxedError> {
unimplemented!()
}
fn role(&self, _region_id: RegionId) -> Option<RegionRole> { fn role(&self, _region_id: RegionId) -> Option<RegionRole> {
None None
} }

View File

@@ -37,7 +37,7 @@ use crate::metadata::RegionMetadataRef;
use crate::region_request::{ use crate::region_request::{
BatchRegionDdlRequest, RegionCatchupRequest, RegionOpenRequest, RegionRequest, BatchRegionDdlRequest, RegionCatchupRequest, RegionOpenRequest, RegionRequest,
}; };
use crate::storage::{RegionId, ScanRequest, SequenceNumber}; use crate::storage::{FileId, RegionId, ScanRequest, SequenceNumber};
/// The settable region role state. /// The settable region role state.
#[derive(Debug, PartialEq, Eq, Clone, Copy)] #[derive(Debug, PartialEq, Eq, Clone, Copy)]
@@ -713,6 +713,52 @@ pub struct RemapManifestsResponse {
pub new_manifests: HashMap<RegionId, String>, pub new_manifests: HashMap<RegionId, String>,
} }
/// Request to copy files from a source region to a target region.
#[derive(Debug, Clone)]
pub struct CopyRegionFromRequest {
/// The [`RegionId`] of the source region.
pub source_region_id: RegionId,
/// The parallelism of the copy operation.
pub parallelism: usize,
}
#[derive(Debug, Clone)]
pub struct MitoCopyRegionFromResponse {
/// The file ids that were copied from the source region to the target region.
pub copied_file_ids: Vec<FileId>,
}
#[derive(Debug, Clone)]
pub struct MetricCopyRegionFromResponse {
/// The logical regions that were newly opened after the copy operation.
pub new_opened_logical_region_ids: Vec<RegionId>,
}
/// Response to copy region from a source region to a target region.
#[derive(Debug, Clone)]
pub enum CopyRegionFromResponse {
Mito(MitoCopyRegionFromResponse),
Metric(MetricCopyRegionFromResponse),
}
impl CopyRegionFromResponse {
/// Converts the response to a mito2 response.
pub fn into_mito(self) -> Option<MitoCopyRegionFromResponse> {
match self {
CopyRegionFromResponse::Mito(response) => Some(response),
CopyRegionFromResponse::Metric(_) => None,
}
}
/// Converts the response to a metric response.
pub fn into_metric(self) -> Option<MetricCopyRegionFromResponse> {
match self {
CopyRegionFromResponse::Metric(response) => Some(response),
CopyRegionFromResponse::Mito(_) => None,
}
}
}
#[async_trait] #[async_trait]
pub trait RegionEngine: Send + Sync { pub trait RegionEngine: Send + Sync {
/// Name of this engine /// Name of this engine
@@ -843,6 +889,13 @@ pub trait RegionEngine: Send + Sync {
request: RemapManifestsRequest, request: RemapManifestsRequest,
) -> Result<RemapManifestsResponse, BoxedError>; ) -> Result<RemapManifestsResponse, BoxedError>;
/// Copies region from a source region to a target region.
async fn copy_region_from(
&self,
region_id: RegionId,
request: CopyRegionFromRequest,
) -> Result<CopyRegionFromResponse, BoxedError>;
/// Sets region role state gracefully. /// Sets region role state gracefully.
/// ///
/// After the call returns, the engine ensures no more write operations will succeed in the region. /// After the call returns, the engine ensures no more write operations will succeed in the region.