diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index 8a011c1f23..3fe4954aea 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -33,9 +33,9 @@ use servers::grpc::FlightCompression; use session::context::QueryContextRef; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{ - RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic, - RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse, - SettableRegionRoleState, SyncManifestResponse, + CopyRegionFromRequest, CopyRegionFromResponse, RegionEngine, RegionManifestInfo, RegionRole, + RegionScannerRef, RegionStatistic, RemapManifestsRequest, RemapManifestsResponse, + SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse, }; use store_api::region_request::{AffectedRows, RegionRequest}; use store_api::storage::{RegionId, ScanRequest, SequenceNumber}; @@ -299,6 +299,14 @@ impl RegionEngine for MockRegionEngine { unimplemented!() } + async fn copy_region_from( + &self, + _region_id: RegionId, + _request: CopyRegionFromRequest, + ) -> Result { + unimplemented!() + } + fn as_any(&self) -> &dyn Any { self } diff --git a/src/file-engine/src/engine.rs b/src/file-engine/src/engine.rs index 231583beb6..5dd787b919 100644 --- a/src/file-engine/src/engine.rs +++ b/src/file-engine/src/engine.rs @@ -26,10 +26,10 @@ use object_store::ObjectStore; use snafu::{OptionExt, ensure}; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{ - RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic, - RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse, - SetRegionRoleStateSuccess, SettableRegionRoleState, SinglePartitionScanner, - SyncManifestResponse, + CopyRegionFromRequest, CopyRegionFromResponse, RegionEngine, RegionManifestInfo, RegionRole, + RegionScannerRef, RegionStatistic, RemapManifestsRequest, RemapManifestsResponse, + SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState, + SinglePartitionScanner, SyncManifestResponse, }; use store_api::region_request::{ AffectedRows, RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest, @@ -163,6 +163,19 @@ impl RegionEngine for FileRegionEngine { )) } + async fn copy_region_from( + &self, + _region_id: RegionId, + _request: CopyRegionFromRequest, + ) -> Result { + Err(BoxedError::new( + UnsupportedSnafu { + operation: "copy_region_from", + } + .build(), + )) + } + fn role(&self, region_id: RegionId) -> Option { self.inner.state(region_id) } diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 3e5a1e3c48..9a4a2ef9df 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -43,9 +43,10 @@ pub(crate) use state::MetricEngineState; use store_api::metadata::RegionMetadataRef; use store_api::metric_engine_consts::METRIC_ENGINE_NAME; use store_api::region_engine::{ - BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, - RegionStatistic, RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse, - SetRegionRoleStateSuccess, SettableRegionRoleState, SyncManifestResponse, + BatchResponses, CopyRegionFromRequest, CopyRegionFromResponse, RegionEngine, + RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic, RemapManifestsRequest, + RemapManifestsResponse, SetRegionRoleStateResponse, SetRegionRoleStateSuccess, + SettableRegionRoleState, SyncManifestResponse, }; use store_api::region_request::{ BatchRegionDdlRequest, RegionCatchupRequest, RegionOpenRequest, RegionRequest, @@ -375,6 +376,14 @@ impl RegionEngine for MetricEngine { } } + async fn copy_region_from( + &self, + _region_id: RegionId, + _request: CopyRegionFromRequest, + ) -> Result { + todo!() + } + async fn set_region_role_state_gracefully( &self, region_id: RegionId, diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 798ec5eded..10c116a3a7 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -71,6 +71,8 @@ mod sync_test; #[cfg(test)] mod truncate_test; +#[cfg(test)] +mod copy_region_from_test; #[cfg(test)] mod remap_manifests_test; @@ -103,8 +105,9 @@ use store_api::metric_engine_consts::{ MANIFEST_INFO_EXTENSION_KEY, TABLE_COLUMN_METADATA_EXTENSION_KEY, }; use store_api::region_engine::{ - BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, - RegionStatistic, RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse, + BatchResponses, CopyRegionFromRequest, CopyRegionFromResponse, MitoCopyRegionFromResponse, + RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic, + RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse, }; use store_api::region_request::{ @@ -119,8 +122,8 @@ use crate::cache::{CacheManagerRef, CacheStrategy}; use crate::config::MitoConfig; use crate::engine::puffin_index::{IndexEntryContext, collect_index_entries_from_puffin}; use crate::error::{ - InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result, - SerdeJsonSnafu, SerializeColumnMetadataSnafu, SerializeManifestSnafu, + self, InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, + Result, SerdeJsonSnafu, SerializeColumnMetadataSnafu, SerializeManifestSnafu, }; #[cfg(feature = "enterprise")] use crate::extension::BoxedExtensionRangeProviderFactory; @@ -421,6 +424,17 @@ impl MitoEngine { rx.await.context(RecvSnafu)? } + /// Handles copy region from request. + /// + /// This method is only supported for internal use and is not exposed in the trait implementation. + pub async fn copy_region_from( + &self, + region_id: RegionId, + request: CopyRegionFromRequest, + ) -> Result { + self.inner.copy_region_from(region_id, request).await + } + #[cfg(test)] pub(crate) fn get_region(&self, id: RegionId) -> Option { self.find_region(id) @@ -621,7 +635,9 @@ impl MitoEngine { } } -/// Check whether the region edit is valid. Only adding files to region is considered valid now. +/// Check whether the region edit is valid. +/// +/// Only adding or removing files to region is considered valid now. fn is_valid_region_edit(edit: &RegionEdit) -> bool { !edit.files_to_add.is_empty() && edit.files_to_remove.is_empty() @@ -1054,6 +1070,18 @@ impl EngineInner { Ok(RemapManifestsResponse { new_manifests }) } + async fn copy_region_from( + &self, + region_id: RegionId, + request: CopyRegionFromRequest, + ) -> Result { + let (request, receiver) = + WorkerRequest::try_from_copy_region_from_request(region_id, request)?; + self.workers.submit_to_worker(region_id, request).await?; + let response = receiver.await.context(RecvSnafu)??; + Ok(response) + } + fn role(&self, region_id: RegionId) -> Option { self.workers.get_region(region_id).map(|region| { if region.is_follower() { @@ -1240,6 +1268,19 @@ impl RegionEngine for MitoEngine { .map_err(BoxedError::new) } + async fn copy_region_from( + &self, + _region_id: RegionId, + _request: CopyRegionFromRequest, + ) -> Result { + Err(BoxedError::new( + error::UnsupportedOperationSnafu { + err_msg: "copy_region_from is not supported", + } + .build(), + )) + } + fn role(&self, region_id: RegionId) -> Option { self.inner.role(region_id) } diff --git a/src/mito2/src/engine/copy_region_from_test.rs b/src/mito2/src/engine/copy_region_from_test.rs new file mode 100644 index 0000000000..c42e1fc781 --- /dev/null +++ b/src/mito2/src/engine/copy_region_from_test.rs @@ -0,0 +1,361 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::assert_matches::assert_matches; +use std::fs; +use std::sync::Arc; + +use api::v1::Rows; +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; +use object_store::layers::mock::{Error as MockError, ErrorKind, MockLayerBuilder}; +use store_api::region_engine::{CopyRegionFromRequest, RegionEngine, RegionRole}; +use store_api::region_request::{RegionFlushRequest, RegionRequest}; +use store_api::storage::RegionId; + +use crate::config::MitoConfig; +use crate::error::Error; +use crate::test_util::{CreateRequestBuilder, TestEnv, build_rows, put_rows, rows_schema}; + +#[tokio::test] +async fn test_engine_copy_region_from() { + common_telemetry::init_default_ut_logging(); + + test_engine_copy_region_from_with_format(true, true).await; + test_engine_copy_region_from_with_format(true, false).await; + test_engine_copy_region_from_with_format(false, true).await; + test_engine_copy_region_from_with_format(false, false).await; +} + +async fn test_engine_copy_region_from_with_format(flat_format: bool, with_index: bool) { + let mut env = TestEnv::with_prefix("copy-region-from").await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; + // Creates a source region and adds some data + let source_region_id = RegionId::new(1, 1); + let mut request = CreateRequestBuilder::new().build(); + if with_index { + request + .column_metadatas + .iter_mut() + .find(|c| c.column_schema.name == "tag_0") + .unwrap() + .column_schema + .set_inverted_index(true); + } + + let column_schemas = rows_schema(&request); + engine + .handle_request(source_region_id, RegionRequest::Create(request.clone())) + .await + .unwrap(); + let rows = Rows { + schema: column_schemas, + rows: build_rows(0, 42), + }; + put_rows(&engine, source_region_id, rows).await; + engine + .handle_request( + source_region_id, + RegionRequest::Flush(RegionFlushRequest { + row_group_size: None, + }), + ) + .await + .unwrap(); + + // Creates a target region and enters staging mode + let target_region_id = RegionId::new(1, 2); + engine + .handle_request(target_region_id, RegionRequest::Create(request)) + .await + .unwrap(); + common_telemetry::debug!("copy region from"); + let resp = engine + .copy_region_from( + target_region_id, + CopyRegionFromRequest { + source_region_id, + parallelism: 1, + }, + ) + .await + .unwrap(); + + let manifest = engine + .get_region(target_region_id) + .unwrap() + .manifest_ctx + .manifest() + .await; + assert!(!manifest.files.is_empty()); + for meta in manifest.files.values() { + assert_eq!(meta.region_id, target_region_id); + assert_eq!(meta.exists_index(), with_index); + } + + let source_region_dir = format!("{}/data/test/1_0000000001", env.data_home().display()); + let source_region_files = collect_filename_in_dir(&source_region_dir); + let target_region_dir = format!("{}/data/test/1_0000000002", env.data_home().display()); + let target_region_files = collect_filename_in_dir(&target_region_dir); + assert_eq!(source_region_files, target_region_files); + + if with_index { + let source_region_index_files = + collect_filename_in_dir(&format!("{}/index", source_region_dir)); + let target_region_index_files = + collect_filename_in_dir(&format!("{}/index", target_region_dir)); + assert_eq!(source_region_index_files, target_region_index_files); + } + common_telemetry::debug!("copy region from again"); + let resp2 = engine + .copy_region_from( + target_region_id, + CopyRegionFromRequest { + source_region_id, + parallelism: 1, + }, + ) + .await + .unwrap(); + assert_eq!(resp.copied_file_ids, resp2.copied_file_ids); +} + +#[tokio::test] +async fn test_engine_copy_region_failure() { + common_telemetry::init_default_ut_logging(); + test_engine_copy_region_failure_with_format(false).await; + test_engine_copy_region_failure_with_format(true).await; +} + +async fn test_engine_copy_region_failure_with_format(flat_format: bool) { + let mock_layer = MockLayerBuilder::default() + .copy_interceptor(Arc::new(|from, _, _args| { + if from.contains(".puffin") { + Some(Err(MockError::new(ErrorKind::Unexpected, "mock err"))) + } else { + None + } + })) + .build() + .unwrap(); + let mut env = TestEnv::new().await.with_mock_layer(mock_layer); + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; + // Creates a source region and adds some data + let source_region_id = RegionId::new(1, 1); + let mut request = CreateRequestBuilder::new().build(); + request + .column_metadatas + .iter_mut() + .find(|c| c.column_schema.name == "tag_0") + .unwrap() + .column_schema + .set_inverted_index(true); + + let column_schemas = rows_schema(&request); + engine + .handle_request(source_region_id, RegionRequest::Create(request.clone())) + .await + .unwrap(); + let rows = Rows { + schema: column_schemas, + rows: build_rows(0, 42), + }; + put_rows(&engine, source_region_id, rows).await; + engine + .handle_request( + source_region_id, + RegionRequest::Flush(RegionFlushRequest { + row_group_size: None, + }), + ) + .await + .unwrap(); + let source_region_dir = format!("{}/data/test/1_0000000001", env.data_home().display()); + assert_file_num_in_dir(&source_region_dir, 1); + assert_file_num_in_dir(&format!("{}/index", source_region_dir), 1); + let source_region_files = collect_filename_in_dir(&source_region_dir); + let source_region_index_files = + collect_filename_in_dir(&format!("{}/index", source_region_dir)); + + // Creates a target region and enters staging mode + let target_region_id = RegionId::new(1, 2); + engine + .handle_request(target_region_id, RegionRequest::Create(request)) + .await + .unwrap(); + let err = engine + .copy_region_from( + target_region_id, + CopyRegionFromRequest { + source_region_id, + parallelism: 1, + }, + ) + .await + .unwrap_err(); + assert_eq!(err.status_code(), StatusCode::StorageUnavailable); + + // Check target region directory is empty + let target_region_dir = format!("{}/data/test/1_0000000002", env.data_home().display()); + assert_file_num_in_dir(&target_region_dir, 0); + assert!(!fs::exists(format!("{}/index", target_region_dir)).unwrap()); + + // Check source region directory is not affected + let source_region_dir = format!("{}/data/test/1_0000000001", env.data_home().display()); + assert_file_num_in_dir(&source_region_dir, 1); + assert_file_num_in_dir(&format!("{}/index", source_region_dir), 1); + + assert_eq!( + source_region_files, + collect_filename_in_dir(&source_region_dir) + ); + assert_eq!( + source_region_index_files, + collect_filename_in_dir(&format!("{}/index", source_region_dir)) + ); +} + +fn assert_file_num_in_dir(dir: &str, expected_num: usize) { + let files = fs::read_dir(dir) + .unwrap() + .collect::, _>>() + .unwrap() + .into_iter() + .filter(|f| f.metadata().unwrap().is_file()) + .collect::>(); + assert_eq!( + files.len(), + expected_num, + "The number of files in the directory should be {}, got: {:?}", + expected_num, + files + ); +} + +fn collect_filename_in_dir(dir: &str) -> Vec { + let mut files = fs::read_dir(dir) + .unwrap() + .collect::, _>>() + .unwrap() + .into_iter() + .filter(|f| f.metadata().unwrap().is_file()) + .map(|f| { + f.path() + .to_string_lossy() + .rsplit("/") + .last() + .unwrap() + .to_string() + }) + .collect::>(); + files.sort_unstable(); + + files +} + +#[tokio::test] +async fn test_engine_copy_region_invalid_args() { + common_telemetry::init_default_ut_logging(); + test_engine_copy_region_invalid_args_with_format(false).await; + test_engine_copy_region_invalid_args_with_format(true).await; +} + +async fn test_engine_copy_region_invalid_args_with_format(flat_format: bool) { + let mut env = TestEnv::new().await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + engine + .handle_request(region_id, RegionRequest::Create(request.clone())) + .await + .unwrap(); + let err = engine + .copy_region_from( + region_id, + CopyRegionFromRequest { + source_region_id: RegionId::new(2, 1), + parallelism: 1, + }, + ) + .await + .unwrap_err(); + assert_eq!(err.status_code(), StatusCode::InvalidArguments); + let err = engine + .copy_region_from( + region_id, + CopyRegionFromRequest { + source_region_id: RegionId::new(1, 1), + parallelism: 1, + }, + ) + .await + .unwrap_err(); + assert_eq!(err.status_code(), StatusCode::InvalidArguments); +} + +#[tokio::test] +async fn test_engine_copy_region_unexpected_state() { + common_telemetry::init_default_ut_logging(); + test_engine_copy_region_unexpected_state_with_format(false).await; + test_engine_copy_region_unexpected_state_with_format(true).await; +} + +async fn test_engine_copy_region_unexpected_state_with_format(flat_format: bool) { + let mut env = TestEnv::new().await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + engine + .handle_request(region_id, RegionRequest::Create(request.clone())) + .await + .unwrap(); + engine + .set_region_role(region_id, RegionRole::Follower) + .unwrap(); + + let err = engine + .copy_region_from( + region_id, + CopyRegionFromRequest { + source_region_id: RegionId::new(1, 2), + parallelism: 1, + }, + ) + .await + .unwrap_err(); + assert_matches!( + err.as_any().downcast_ref::().unwrap(), + Error::RegionState { .. } + ) +} diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index b23e2340ec..cda2c75403 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -1185,6 +1185,18 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display( + "Invalid source and target region, source: {}, target: {}", + source_region_id, + target_region_id + ))] + InvalidSourceAndTargetRegion { + source_region_id: RegionId, + target_region_id: RegionId, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -1253,7 +1265,8 @@ impl ErrorExt for Error { | MissingManifest { .. } | NoOldManifests { .. } | MissingPartitionExpr { .. } - | SerializePartitionExpr { .. } => StatusCode::InvalidArguments, + | SerializePartitionExpr { .. } + | InvalidSourceAndTargetRegion { .. } => StatusCode::InvalidArguments, RegionMetadataNotFound { .. } | Join { .. } diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 661c1d876c..e83a08ba74 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -17,6 +17,7 @@ pub mod catchup; pub mod opener; pub mod options; +pub mod utils; pub(crate) mod version; use std::collections::hash_map::Entry; @@ -34,9 +35,11 @@ use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{ RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState, }; +use store_api::region_request::PathType; use store_api::sst_entry::ManifestSstEntry; -use store_api::storage::{RegionId, SequenceNumber}; +use store_api::storage::{FileId, RegionId, SequenceNumber}; use tokio::sync::RwLockWriteGuard; +pub use utils::*; use crate::access_layer::AccessLayerRef; use crate::error::{ @@ -49,6 +52,7 @@ use crate::manifest::action::{ use crate::manifest::manager::RegionManifestManager; use crate::region::version::{VersionControlRef, VersionRef}; use crate::request::{OnFailure, OptionOutputTx}; +use crate::sst::file::FileMeta; use crate::sst::file_purger::FilePurgerRef; use crate::sst::location::{index_file_path, sst_file_path}; use crate::time_provider::TimeProviderRef; @@ -215,6 +219,11 @@ impl MitoRegion { self.access_layer.table_dir() } + /// Returns the path type of the region. + pub(crate) fn path_type(&self) -> PathType { + self.access_layer.path_type() + } + /// Returns whether the region is writable. pub(crate) fn is_writable(&self) -> bool { matches!( @@ -657,6 +666,16 @@ impl MitoRegion { .collect() } + /// Returns the file metas of the region by file ids. + pub async fn file_metas(&self, file_ids: &[FileId]) -> Vec> { + let manifest_files = self.manifest_ctx.manifest().await.files.clone(); + + file_ids + .iter() + .map(|file_id| manifest_files.get(file_id).cloned()) + .collect::>() + } + /// Exit staging mode successfully by merging all staged manifests and making them visible. pub(crate) async fn exit_staging_on_success( &self, diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 128cf6bad6..60abdbd29b 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -636,54 +636,6 @@ pub fn get_object_store( } } -/// A loader for loading metadata from a region dir. -#[derive(Debug, Clone)] -pub struct RegionMetadataLoader { - config: Arc, - object_store_manager: ObjectStoreManagerRef, -} - -impl RegionMetadataLoader { - /// Creates a new `RegionOpenerBuilder`. - pub fn new(config: Arc, object_store_manager: ObjectStoreManagerRef) -> Self { - Self { - config, - object_store_manager, - } - } - - /// Loads the metadata of the region from the region dir. - pub async fn load( - &self, - region_dir: &str, - region_options: &RegionOptions, - ) -> Result> { - let manifest = self - .load_manifest(region_dir, ®ion_options.storage) - .await?; - Ok(manifest.map(|m| m.metadata.clone())) - } - - /// Loads the manifest of the region from the region dir. - pub async fn load_manifest( - &self, - region_dir: &str, - storage: &Option, - ) -> Result>> { - let object_store = get_object_store(storage, &self.object_store_manager)?; - let region_manifest_options = - RegionManifestOptions::new(&self.config, region_dir, &object_store); - let Some(manifest_manager) = - RegionManifestManager::open(region_manifest_options, &Default::default()).await? - else { - return Ok(None); - }; - - let manifest = manifest_manager.manifest(); - Ok(Some(manifest)) - } -} - /// Checks whether the recovered region has the same schema as region to create. pub(crate) fn check_recovered_region( recovered: &RegionMetadata, diff --git a/src/mito2/src/region/utils.rs b/src/mito2/src/region/utils.rs new file mode 100644 index 0000000000..25c084ef7a --- /dev/null +++ b/src/mito2/src/region/utils.rs @@ -0,0 +1,345 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; +use std::time::Instant; + +use common_base::readable_size::ReadableSize; +use common_telemetry::{debug, error, info}; +use futures::future::try_join_all; +use object_store::manager::ObjectStoreManagerRef; +use snafu::{ResultExt, ensure}; +use store_api::metadata::RegionMetadataRef; +use store_api::region_request::PathType; +use store_api::storage::{FileId, IndexVersion, RegionId}; + +use crate::access_layer::AccessLayerRef; +use crate::config::MitoConfig; +use crate::error::{self, InvalidSourceAndTargetRegionSnafu, Result}; +use crate::manifest::action::RegionManifest; +use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; +use crate::region::opener::get_object_store; +use crate::region::options::RegionOptions; +use crate::sst::file::{RegionFileId, RegionIndexId}; +use crate::sst::location; + +/// A loader for loading metadata from a region dir. +#[derive(Debug, Clone)] +pub struct RegionMetadataLoader { + config: Arc, + object_store_manager: ObjectStoreManagerRef, +} + +impl RegionMetadataLoader { + /// Creates a new `RegionMetadataLoader`. + pub fn new(config: Arc, object_store_manager: ObjectStoreManagerRef) -> Self { + Self { + config, + object_store_manager, + } + } + + /// Loads the metadata of the region from the region dir. + pub async fn load( + &self, + region_dir: &str, + region_options: &RegionOptions, + ) -> Result> { + let manifest = self + .load_manifest(region_dir, ®ion_options.storage) + .await?; + Ok(manifest.map(|m| m.metadata.clone())) + } + + /// Loads the manifest of the region from the region dir. + pub async fn load_manifest( + &self, + region_dir: &str, + storage: &Option, + ) -> Result>> { + let object_store = get_object_store(storage, &self.object_store_manager)?; + let region_manifest_options = + RegionManifestOptions::new(&self.config, region_dir, &object_store); + let Some(manifest_manager) = + RegionManifestManager::open(region_manifest_options, &Default::default()).await? + else { + return Ok(None); + }; + + let manifest = manifest_manager.manifest(); + Ok(Some(manifest)) + } +} + +/// A copier for copying files from a region to another region. +#[derive(Debug, Clone)] +pub struct RegionFileCopier { + access_layer: AccessLayerRef, +} + +/// A descriptor for a file. +#[derive(Debug, Clone, Copy)] +pub enum FileDescriptor { + /// An index file. + Index { + file_id: FileId, + version: IndexVersion, + size: u64, + }, + /// A data file. + Data { file_id: FileId, size: u64 }, +} + +impl FileDescriptor { + pub fn size(&self) -> u64 { + match self { + FileDescriptor::Index { size, .. } => *size, + FileDescriptor::Data { size, .. } => *size, + } + } +} + +/// Builds the source and target file paths for a given file descriptor. +/// +/// # Arguments +/// +/// * `source_region_id`: The ID of the source region. +/// * `target_region_id`: The ID of the target region. +/// * `file_id`: The ID of the file. +/// +/// # Returns +/// +/// A tuple containing the source and target file paths. +fn build_copy_file_paths( + source_region_id: RegionId, + target_region_id: RegionId, + file_descriptor: FileDescriptor, + table_dir: &str, + path_type: PathType, +) -> (String, String) { + match file_descriptor { + FileDescriptor::Index { + file_id, version, .. + } => ( + location::index_file_path( + table_dir, + RegionIndexId::new(RegionFileId::new(source_region_id, file_id), version), + path_type, + ), + location::index_file_path( + table_dir, + RegionIndexId::new(RegionFileId::new(target_region_id, file_id), version), + path_type, + ), + ), + FileDescriptor::Data { file_id, .. } => ( + location::sst_file_path( + table_dir, + RegionFileId::new(source_region_id, file_id), + path_type, + ), + location::sst_file_path( + table_dir, + RegionFileId::new(target_region_id, file_id), + path_type, + ), + ), + } +} + +fn build_delete_file_path( + target_region_id: RegionId, + file_descriptor: FileDescriptor, + table_dir: &str, + path_type: PathType, +) -> String { + match file_descriptor { + FileDescriptor::Index { + file_id, version, .. + } => location::index_file_path( + table_dir, + RegionIndexId::new(RegionFileId::new(target_region_id, file_id), version), + path_type, + ), + FileDescriptor::Data { file_id, .. } => location::sst_file_path( + table_dir, + RegionFileId::new(target_region_id, file_id), + path_type, + ), + } +} + +impl RegionFileCopier { + pub fn new(access_layer: AccessLayerRef) -> Self { + Self { access_layer } + } + + /// Copies files from a source region to a target region. + /// + /// # Arguments + /// + /// * `source_region_id`: The ID of the source region. + /// * `target_region_id`: The ID of the target region. + /// * `file_ids`: The IDs of the files to copy. + pub async fn copy_files( + &self, + source_region_id: RegionId, + target_region_id: RegionId, + file_ids: Vec, + parallelism: usize, + ) -> Result<()> { + ensure!( + source_region_id.table_id() == target_region_id.table_id(), + InvalidSourceAndTargetRegionSnafu { + source_region_id, + target_region_id, + }, + ); + let table_dir = self.access_layer.table_dir(); + let path_type = self.access_layer.path_type(); + let object_store = self.access_layer.object_store(); + + info!( + "Copying {} files from region {} to region {}", + file_ids.len(), + source_region_id, + target_region_id + ); + debug!( + "Copying files: {:?} from region {} to region {}", + file_ids, source_region_id, target_region_id + ); + let mut tasks = Vec::with_capacity(parallelism); + for skip in 0..parallelism { + let target_file_ids = file_ids.iter().skip(skip).step_by(parallelism).copied(); + let object_store = object_store.clone(); + tasks.push(async move { + for file_desc in target_file_ids { + let (source_path, target_path) = build_copy_file_paths( + source_region_id, + target_region_id, + file_desc, + table_dir, + path_type, + ); + let now = Instant::now(); + object_store + .copy(&source_path, &target_path) + .await + .inspect_err( + |e| error!(e; "Failed to copy file {} to {}", source_path, target_path), + ) + .context(error::OpenDalSnafu)?; + let file_size = ReadableSize(file_desc.size()); + info!( + "Copied file {} to {}, file size: {}, elapsed: {:?}", + source_path, + target_path, + file_size, + now.elapsed(), + ); + } + + Ok(()) + }); + } + + if let Err(err) = try_join_all(tasks).await { + error!(err; "Failed to copy files from region {} to region {}", source_region_id, target_region_id); + self.clean_target_region(target_region_id, file_ids).await; + return Err(err); + } + + Ok(()) + } + + /// Cleans the copied files from the target region. + async fn clean_target_region(&self, target_region_id: RegionId, file_ids: Vec) { + let table_dir = self.access_layer.table_dir(); + let path_type = self.access_layer.path_type(); + let object_store = self.access_layer.object_store(); + let delete_file_path = file_ids + .into_iter() + .map(|file_descriptor| { + build_delete_file_path(target_region_id, file_descriptor, table_dir, path_type) + }) + .collect::>(); + debug!( + "Deleting files: {:?} after failed to copy files to target region {}", + delete_file_path, target_region_id + ); + if let Err(err) = object_store.delete_iter(delete_file_path).await { + error!(err; "Failed to delete files from region {}", target_region_id); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_build_copy_file_paths() { + common_telemetry::init_default_ut_logging(); + let file_id = FileId::random(); + let source_region_id = RegionId::new(1, 1); + let target_region_id = RegionId::new(1, 2); + let file_descriptor = FileDescriptor::Data { file_id, size: 100 }; + let table_dir = "/table_dir"; + let path_type = PathType::Bare; + let (source_path, target_path) = build_copy_file_paths( + source_region_id, + target_region_id, + file_descriptor, + table_dir, + path_type, + ); + assert_eq!( + source_path, + format!("/table_dir/1_0000000001/{}.parquet", file_id) + ); + assert_eq!( + target_path, + format!("/table_dir/1_0000000002/{}.parquet", file_id) + ); + + let version = 1; + let file_descriptor = FileDescriptor::Index { + file_id, + version, + size: 100, + }; + let (source_path, target_path) = build_copy_file_paths( + source_region_id, + target_region_id, + file_descriptor, + table_dir, + path_type, + ); + assert_eq!( + source_path, + format!( + "/table_dir/1_0000000001/index/{}.{}.puffin", + file_id, version + ) + ); + assert_eq!( + target_path, + format!( + "/table_dir/1_0000000002/index/{}.{}.puffin", + file_id, version + ) + ); + } +} diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 39a0d3a3f8..4f4aaeb4bc 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -33,7 +33,9 @@ use snafu::{OptionExt, ResultExt, ensure}; use store_api::ManifestVersion; use store_api::codec::{PrimaryKeyEncoding, infer_primary_key_encoding_from_hint}; use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef}; -use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState}; +use store_api::region_engine::{ + MitoCopyRegionFromResponse, SetRegionRoleStateResponse, SettableRegionRoleState, +}; use store_api::region_request::{ AffectedRows, EnterStagingRequest, RegionAlterRequest, RegionBuildIndexRequest, RegionBulkInsertsRequest, RegionCatchupRequest, RegionCloseRequest, RegionCompactRequest, @@ -605,6 +607,9 @@ pub(crate) enum WorkerRequest { /// Remap manifests request. RemapManifests(RemapManifestsRequest), + + /// Copy region from request. + CopyRegionFrom(CopyRegionFromRequest), } impl WorkerRequest { @@ -813,6 +818,24 @@ impl WorkerRequest { Ok((WorkerRequest::RemapManifests(request), receiver)) } + + /// Converts [CopyRegionFromRequest] from a [CopyRegionFromRequest](store_api::region_engine::CopyRegionFromRequest). + pub(crate) fn try_from_copy_region_from_request( + region_id: RegionId, + store_api::region_engine::CopyRegionFromRequest { + source_region_id, + parallelism, + }: store_api::region_engine::CopyRegionFromRequest, + ) -> Result<(WorkerRequest, Receiver>)> { + let (sender, receiver) = oneshot::channel(); + let request = CopyRegionFromRequest { + region_id, + source_region_id, + parallelism, + sender, + }; + Ok((WorkerRequest::CopyRegionFrom(request), receiver)) + } } /// DDL request to a region. @@ -867,6 +890,8 @@ pub(crate) enum BackgroundNotify { RegionEdit(RegionEditResult), /// Enter staging result. EnterStaging(EnterStagingResult), + /// Copy region result. + CopyRegionFromFinished(CopyRegionFromFinished), } /// Notifies a flush job is finished. @@ -1023,6 +1048,16 @@ pub(crate) struct EnterStagingResult { pub(crate) result: Result<()>, } +#[derive(Debug)] +pub(crate) struct CopyRegionFromFinished { + /// Region id. + pub(crate) region_id: RegionId, + /// Region edit to apply. + pub(crate) edit: RegionEdit, + /// Result sender. + pub(crate) sender: Sender>, +} + /// Request to edit a region directly. #[derive(Debug)] pub(crate) struct RegionEditRequest { @@ -1077,6 +1112,18 @@ pub(crate) struct RemapManifestsRequest { pub(crate) sender: Sender>>, } +#[derive(Debug)] +pub(crate) struct CopyRegionFromRequest { + /// The [`RegionId`] of the target region. + pub(crate) region_id: RegionId, + /// The [`RegionId`] of the source region. + pub(crate) source_region_id: RegionId, + /// The parallelism of the copy operation. + pub(crate) parallelism: usize, + /// Result sender. + pub(crate) sender: Sender>, +} + #[cfg(test)] mod tests { use api::v1::value::ValueData; diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index b398f92f42..27d92a6610 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -19,6 +19,7 @@ mod handle_bulk_insert; mod handle_catchup; mod handle_close; mod handle_compaction; +mod handle_copy_region; mod handle_create; mod handle_drop; mod handle_enter_staging; @@ -1039,6 +1040,9 @@ impl RegionWorkerLoop { WorkerRequest::RemapManifests(req) => { self.handle_remap_manifests_request(req); } + WorkerRequest::CopyRegionFrom(req) => { + self.handle_copy_region_from_request(req); + } } } @@ -1154,6 +1158,9 @@ impl RegionWorkerLoop { } BackgroundNotify::EnterStaging(req) => self.handle_enter_staging_result(req).await, BackgroundNotify::RegionEdit(req) => self.handle_region_edit_result(req).await, + BackgroundNotify::CopyRegionFromFinished(req) => { + self.handle_copy_region_from_finished(req) + } } } diff --git a/src/mito2/src/worker/handle_copy_region.rs b/src/mito2/src/worker/handle_copy_region.rs new file mode 100644 index 0000000000..e929013fc6 --- /dev/null +++ b/src/mito2/src/worker/handle_copy_region.rs @@ -0,0 +1,245 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_telemetry::{debug, error, info}; +use snafu::OptionExt; +use store_api::region_engine::MitoCopyRegionFromResponse; +use store_api::storage::{FileId, RegionId}; + +use crate::error::{InvalidRequestSnafu, MissingManifestSnafu, Result}; +use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; +use crate::region::{FileDescriptor, MitoRegionRef, RegionFileCopier, RegionMetadataLoader}; +use crate::request::{ + BackgroundNotify, CopyRegionFromFinished, CopyRegionFromRequest, WorkerRequest, +}; +use crate::sst::location::region_dir_from_table_dir; +use crate::worker::{RegionWorkerLoop, WorkerRequestWithTime}; + +impl RegionWorkerLoop { + pub(crate) fn handle_copy_region_from_request(&mut self, request: CopyRegionFromRequest) { + let region_id = request.region_id; + let source_region_id = request.source_region_id; + let sender = request.sender; + let region = match self.regions.writable_region(region_id) { + Ok(region) => region, + Err(e) => { + let _ = sender.send(Err(e)); + return; + } + }; + + let same_table = source_region_id.table_id() == region_id.table_id(); + if !same_table { + let _ = sender.send( + InvalidRequestSnafu { + region_id, + reason: format!("Source and target regions must be from the same table, source_region_id: {source_region_id}, target_region_id: {region_id}"), + } + .fail(), + ); + return; + } + if source_region_id == region_id { + let _ = sender.send( + InvalidRequestSnafu { + region_id, + reason: format!("Source and target regions must be different, source_region_id: {source_region_id}, target_region_id: {region_id}"), + } + .fail(), + ); + return; + } + + let region_metadata_loader = + RegionMetadataLoader::new(self.config.clone(), self.object_store_manager.clone()); + let worker_sender = self.sender.clone(); + + common_runtime::spawn_global(async move { + let (region_edit, file_ids) = match Self::copy_region_from( + ®ion, + region_metadata_loader, + source_region_id, + region_id, + request.parallelism.max(1), + ) + .await + { + Ok(region_files) => region_files, + Err(e) => { + let _ = sender.send(Err(e)); + return; + } + }; + + match region_edit { + Some(region_edit) => { + if let Err(e) = worker_sender + .send(WorkerRequestWithTime::new(WorkerRequest::Background { + region_id, + notify: BackgroundNotify::CopyRegionFromFinished( + CopyRegionFromFinished { + region_id, + edit: region_edit, + sender, + }, + ), + })) + .await + { + error!(e; "Failed to send copy region from finished notification to worker, region_id: {}", region_id); + } + } + None => { + let _ = sender.send(Ok(MitoCopyRegionFromResponse { + copied_file_ids: file_ids, + })); + } + } + }); + } + + pub(crate) fn handle_copy_region_from_finished(&mut self, request: CopyRegionFromFinished) { + let region_id = request.region_id; + let sender = request.sender; + let region = match self.regions.writable_region(region_id) { + Ok(region) => region, + Err(e) => { + let _ = sender.send(Err(e)); + return; + } + }; + + let copied_file_ids = request + .edit + .files_to_add + .iter() + .map(|file_meta| file_meta.file_id) + .collect(); + + region + .version_control + .apply_edit(Some(request.edit), &[], region.file_purger.clone()); + + let _ = sender.send(Ok(MitoCopyRegionFromResponse { copied_file_ids })); + } + + /// Returns the region edit and the file ids that were copied from the source region to the target region. + /// + /// If no need to copy files, returns (None, file_ids). + async fn copy_region_from( + region: &MitoRegionRef, + region_metadata_loader: RegionMetadataLoader, + source_region_id: RegionId, + target_region_id: RegionId, + parallelism: usize, + ) -> Result<(Option, Vec)> { + let table_dir = region.table_dir(); + let path_type = region.path_type(); + let region_dir = region_dir_from_table_dir(table_dir, source_region_id, path_type); + info!( + "Loading source region manifest from region dir: {region_dir}, target region: {target_region_id}" + ); + let source_region_manifest = region_metadata_loader + .load_manifest(®ion_dir, ®ion.version().options.storage) + .await? + .context(MissingManifestSnafu { + region_id: source_region_id, + })?; + let mut files_to_copy = vec![]; + let target_region_manifest = region.manifest_ctx.manifest().await; + let file_ids = source_region_manifest + .files + .keys() + .cloned() + .collect::>(); + debug!( + "source region files: {:?}, source region id: {}", + source_region_manifest.files, source_region_id + ); + for (file_id, file_meta) in &source_region_manifest.files { + if !target_region_manifest.files.contains_key(file_id) { + let mut new_file_meta = file_meta.clone(); + new_file_meta.region_id = target_region_id; + files_to_copy.push(new_file_meta); + } + } + if files_to_copy.is_empty() { + return Ok((None, file_ids)); + } + + let file_descriptors = files_to_copy + .iter() + .flat_map(|file_meta| { + if file_meta.exists_index() { + let region_index_id = file_meta.index_id(); + let file_id = region_index_id.file_id.file_id(); + let version = region_index_id.version; + let file_size = file_meta.file_size; + let index_file_size = file_meta.index_file_size(); + vec![ + FileDescriptor::Data { + file_id: file_meta.file_id, + size: file_size, + }, + FileDescriptor::Index { + file_id, + version, + size: index_file_size, + }, + ] + } else { + let file_size = file_meta.file_size; + vec![FileDescriptor::Data { + file_id: file_meta.file_id, + size: file_size, + }] + } + }) + .collect(); + debug!("File descriptors to copy: {:?}", file_descriptors); + let copier = RegionFileCopier::new(region.access_layer()); + // TODO(weny): ensure the target region is empty. + copier + .copy_files( + source_region_id, + target_region_id, + file_descriptors, + parallelism, + ) + .await?; + let edit = RegionEdit { + files_to_add: files_to_copy, + files_to_remove: vec![], + timestamp_ms: Some(chrono::Utc::now().timestamp_millis()), + compaction_time_window: None, + flushed_entry_id: None, + flushed_sequence: None, + committed_sequence: None, + }; + let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())); + info!("Applying {edit:?} to region {target_region_id}, reason: CopyRegionFrom"); + let version = region + .manifest_ctx + .manifest_manager + .write() + .await + .update(action_list, false) + .await?; + info!( + "Successfully update manifest version to {version}, region: {target_region_id}, reason: CopyRegionFrom" + ); + + Ok((Some(edit), file_ids)) + } +} diff --git a/src/mito2/src/worker/handle_remap.rs b/src/mito2/src/worker/handle_remap.rs index 3039e04a1c..5e94221f7d 100644 --- a/src/mito2/src/worker/handle_remap.rs +++ b/src/mito2/src/worker/handle_remap.rs @@ -20,13 +20,11 @@ use common_telemetry::info; use futures::future::try_join_all; use partition::expr::PartitionExpr; use snafu::{OptionExt, ResultExt}; -use store_api::region_request::PathType; use store_api::storage::RegionId; use crate::error::{FetchManifestsSnafu, InvalidRequestSnafu, MissingManifestSnafu, Result}; use crate::manifest::action::RegionManifest; -use crate::region::MitoRegionRef; -use crate::region::opener::RegionMetadataLoader; +use crate::region::{MitoRegionRef, RegionMetadataLoader}; use crate::remap_manifest::RemapManifest; use crate::request::RemapManifestsRequest; use crate::sst::location::region_dir_from_table_dir; @@ -87,10 +85,10 @@ impl RegionWorkerLoop { let mut tasks = Vec::with_capacity(input_regions.len()); let region_options = region.version().options.clone(); let table_dir = region.table_dir(); - + let path_type = region.path_type(); let now = Instant::now(); for input_region in &input_regions { - let region_dir = region_dir_from_table_dir(table_dir, *input_region, PathType::Bare); + let region_dir = region_dir_from_table_dir(table_dir, *input_region, path_type); let storage = region_options.storage.clone(); let moved_region_metadata_loader = region_metadata_loader.clone(); tasks.push(async move { diff --git a/src/object-store/src/layers/mock.rs b/src/object-store/src/layers/mock.rs index 0ee0f73b21..e55af3bfe0 100644 --- a/src/object-store/src/layers/mock.rs +++ b/src/object-store/src/layers/mock.rs @@ -21,12 +21,14 @@ pub use opendal::raw::{ Access, Layer, LayeredAccess, OpDelete, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead, RpWrite, oio, }; +use opendal::raw::{OpCopy, RpCopy}; pub use opendal::{Buffer, Error, ErrorKind, Metadata, Result}; pub type MockWriterFactory = Arc oio::Writer + Send + Sync>; pub type MockReaderFactory = Arc oio::Reader + Send + Sync>; pub type MockListerFactory = Arc oio::Lister + Send + Sync>; pub type MockDeleterFactory = Arc oio::Deleter + Send + Sync>; +pub type CopyInterceptor = Arc Option> + Send + Sync>; #[derive(Builder)] pub struct MockLayer { @@ -38,6 +40,8 @@ pub struct MockLayer { lister_factory: Option, #[builder(setter(strip_option), default)] deleter_factory: Option, + #[builder(setter(strip_option), default)] + copy_interceptor: Option, } impl Clone for MockLayer { @@ -47,6 +51,7 @@ impl Clone for MockLayer { reader_factory: self.reader_factory.clone(), lister_factory: self.lister_factory.clone(), deleter_factory: self.deleter_factory.clone(), + copy_interceptor: self.copy_interceptor.clone(), } } } @@ -61,6 +66,7 @@ impl Layer for MockLayer { reader_factory: self.reader_factory.clone(), lister_factory: self.lister_factory.clone(), deleter_factory: self.deleter_factory.clone(), + copy_interceptor: self.copy_interceptor.clone(), } } } @@ -71,6 +77,7 @@ pub struct MockAccessor { reader_factory: Option, lister_factory: Option, deleter_factory: Option, + copy_interceptor: Option, } impl Debug for MockAccessor { @@ -214,4 +221,16 @@ impl LayeredAccess for MockAccessor { }) } } + + async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result { + let Some(copy_interceptor) = self.copy_interceptor.as_ref() else { + return self.inner.copy(from, to, args).await; + }; + + let Some(result) = copy_interceptor(from, to, args.clone()) else { + return self.inner.copy(from, to, args).await; + }; + + result + } } diff --git a/src/query/src/optimizer/test_util.rs b/src/query/src/optimizer/test_util.rs index 5c24915bde..8258b13490 100644 --- a/src/query/src/optimizer/test_util.rs +++ b/src/query/src/optimizer/test_util.rs @@ -28,9 +28,9 @@ use store_api::metadata::{ ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef, }; use store_api::region_engine::{ - RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic, - RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse, - SettableRegionRoleState, SyncManifestResponse, + CopyRegionFromRequest, CopyRegionFromResponse, RegionEngine, RegionManifestInfo, RegionRole, + RegionScannerRef, RegionStatistic, RemapManifestsRequest, RemapManifestsResponse, + SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse, }; use store_api::region_request::RegionRequest; use store_api::storage::{ConcreteDataType, RegionId, ScanRequest, SequenceNumber}; @@ -125,6 +125,14 @@ impl RegionEngine for MetaRegionEngine { unimplemented!() } + async fn copy_region_from( + &self, + _region_id: RegionId, + _request: CopyRegionFromRequest, + ) -> Result { + unimplemented!() + } + fn role(&self, _region_id: RegionId) -> Option { None } diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 8ef5d0596c..dd7809a0bf 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -37,7 +37,7 @@ use crate::metadata::RegionMetadataRef; use crate::region_request::{ BatchRegionDdlRequest, RegionCatchupRequest, RegionOpenRequest, RegionRequest, }; -use crate::storage::{RegionId, ScanRequest, SequenceNumber}; +use crate::storage::{FileId, RegionId, ScanRequest, SequenceNumber}; /// The settable region role state. #[derive(Debug, PartialEq, Eq, Clone, Copy)] @@ -713,6 +713,52 @@ pub struct RemapManifestsResponse { pub new_manifests: HashMap, } +/// Request to copy files from a source region to a target region. +#[derive(Debug, Clone)] +pub struct CopyRegionFromRequest { + /// The [`RegionId`] of the source region. + pub source_region_id: RegionId, + /// The parallelism of the copy operation. + pub parallelism: usize, +} + +#[derive(Debug, Clone)] +pub struct MitoCopyRegionFromResponse { + /// The file ids that were copied from the source region to the target region. + pub copied_file_ids: Vec, +} + +#[derive(Debug, Clone)] +pub struct MetricCopyRegionFromResponse { + /// The logical regions that were newly opened after the copy operation. + pub new_opened_logical_region_ids: Vec, +} + +/// Response to copy region from a source region to a target region. +#[derive(Debug, Clone)] +pub enum CopyRegionFromResponse { + Mito(MitoCopyRegionFromResponse), + Metric(MetricCopyRegionFromResponse), +} + +impl CopyRegionFromResponse { + /// Converts the response to a mito2 response. + pub fn into_mito(self) -> Option { + match self { + CopyRegionFromResponse::Mito(response) => Some(response), + CopyRegionFromResponse::Metric(_) => None, + } + } + + /// Converts the response to a metric response. + pub fn into_metric(self) -> Option { + match self { + CopyRegionFromResponse::Metric(response) => Some(response), + CopyRegionFromResponse::Mito(_) => None, + } + } +} + #[async_trait] pub trait RegionEngine: Send + Sync { /// Name of this engine @@ -843,6 +889,13 @@ pub trait RegionEngine: Send + Sync { request: RemapManifestsRequest, ) -> Result; + /// Copies region from a source region to a target region. + async fn copy_region_from( + &self, + region_id: RegionId, + request: CopyRegionFromRequest, + ) -> Result; + /// Sets region role state gracefully. /// /// After the call returns, the engine ensures no more write operations will succeed in the region.