diff --git a/Cargo.lock b/Cargo.lock index a64cbdeb5d..731e1c81d0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7622,6 +7622,7 @@ dependencies = [ "async-trait", "base64 0.22.1", "bytes", + "chrono", "common-base", "common-error", "common-macro", diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 9261c0cca8..0d710cc0bf 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -66,7 +66,7 @@ use store_api::metric_engine_consts::{ }; use store_api::region_engine::{ RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, SetRegionRoleStateResponse, - SettableRegionRoleState, + SettableRegionRoleState, SyncRegionFromRequest, }; use store_api::region_request::{ AffectedRows, BatchRegionDdlRequest, RegionCatchupRequest, RegionCloseRequest, @@ -536,10 +536,13 @@ impl RegionServer { let tracing_context = TracingContext::from_current_span(); let span = tracing_context.attach(info_span!("RegionServer::handle_sync_region_request")); - self.sync_region(region_id, manifest_info) - .trace(span) - .await - .map(|_| RegionResponse::new(AffectedRows::default())) + self.sync_region( + region_id, + SyncRegionFromRequest::from_manifest(manifest_info), + ) + .trace(span) + .await + .map(|_| RegionResponse::new(AffectedRows::default())) } /// Handles the ListMetadata request and retrieves metadata for specified regions. @@ -588,7 +591,7 @@ impl RegionServer { pub async fn sync_region( &self, region_id: RegionId, - manifest_info: RegionManifestInfo, + request: SyncRegionFromRequest, ) -> Result<()> { let engine_with_status = self .inner @@ -597,7 +600,7 @@ impl RegionServer { .with_context(|| RegionNotFoundSnafu { region_id })?; self.inner - .handle_sync_region(engine_with_status.engine(), region_id, manifest_info) + .handle_sync_region(engine_with_status.engine(), region_id, request) .await } @@ -1269,10 +1272,10 @@ impl RegionServerInner { &self, engine: &RegionEngineRef, region_id: RegionId, - manifest_info: RegionManifestInfo, + request: SyncRegionFromRequest, ) -> Result<()> { let Some(new_opened_regions) = engine - .sync_region(region_id, manifest_info) + .sync_region(region_id, request) .await .with_context(|_| HandleRegionRequestSnafu { region_id })? .new_opened_logical_region_ids() diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index 3fe4954aea..3486d7b416 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -33,9 +33,9 @@ use servers::grpc::FlightCompression; use session::context::QueryContextRef; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{ - CopyRegionFromRequest, CopyRegionFromResponse, RegionEngine, RegionManifestInfo, RegionRole, - RegionScannerRef, RegionStatistic, RemapManifestsRequest, RemapManifestsResponse, - SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse, + RegionEngine, RegionRole, RegionScannerRef, RegionStatistic, RemapManifestsRequest, + RemapManifestsResponse, SetRegionRoleStateResponse, SettableRegionRoleState, + SyncRegionFromRequest, SyncRegionFromResponse, }; use store_api::region_request::{AffectedRows, RegionRequest}; use store_api::storage::{RegionId, ScanRequest, SequenceNumber}; @@ -287,8 +287,8 @@ impl RegionEngine for MockRegionEngine { async fn sync_region( &self, _region_id: RegionId, - _manifest_info: RegionManifestInfo, - ) -> Result { + _request: SyncRegionFromRequest, + ) -> Result { unimplemented!() } @@ -299,14 +299,6 @@ impl RegionEngine for MockRegionEngine { unimplemented!() } - async fn copy_region_from( - &self, - _region_id: RegionId, - _request: CopyRegionFromRequest, - ) -> Result { - unimplemented!() - } - fn as_any(&self) -> &dyn Any { self } diff --git a/src/file-engine/src/engine.rs b/src/file-engine/src/engine.rs index 5dd787b919..693ae325df 100644 --- a/src/file-engine/src/engine.rs +++ b/src/file-engine/src/engine.rs @@ -26,10 +26,9 @@ use object_store::ObjectStore; use snafu::{OptionExt, ensure}; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{ - CopyRegionFromRequest, CopyRegionFromResponse, RegionEngine, RegionManifestInfo, RegionRole, - RegionScannerRef, RegionStatistic, RemapManifestsRequest, RemapManifestsResponse, - SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState, - SinglePartitionScanner, SyncManifestResponse, + RegionEngine, RegionRole, RegionScannerRef, RegionStatistic, RemapManifestsRequest, + RemapManifestsResponse, SetRegionRoleStateResponse, SetRegionRoleStateSuccess, + SettableRegionRoleState, SinglePartitionScanner, SyncRegionFromRequest, SyncRegionFromResponse, }; use store_api::region_request::{ AffectedRows, RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest, @@ -145,10 +144,10 @@ impl RegionEngine for FileRegionEngine { async fn sync_region( &self, _region_id: RegionId, - _manifest_info: RegionManifestInfo, - ) -> Result { + _request: SyncRegionFromRequest, + ) -> Result { // File engine doesn't need to sync region manifest. - Ok(SyncManifestResponse::NotSupported) + Ok(SyncRegionFromResponse::NotSupported) } async fn remap_manifests( @@ -163,19 +162,6 @@ impl RegionEngine for FileRegionEngine { )) } - async fn copy_region_from( - &self, - _region_id: RegionId, - _request: CopyRegionFromRequest, - ) -> Result { - Err(BoxedError::new( - UnsupportedSnafu { - operation: "copy_region_from", - } - .build(), - )) - } - fn role(&self, region_id: RegionId) -> Option { self.inner.state(region_id) } diff --git a/src/metric-engine/Cargo.toml b/src/metric-engine/Cargo.toml index 9beadade16..dbbbff8fde 100644 --- a/src/metric-engine/Cargo.toml +++ b/src/metric-engine/Cargo.toml @@ -23,6 +23,7 @@ common-recordbatch.workspace = true common-runtime.workspace = true common-telemetry.workspace = true common-time.workspace = true +chrono.workspace = true datafusion.workspace = true datatypes.workspace = true futures-util.workspace = true diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 1a250243e6..5798271ef8 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -43,10 +43,10 @@ pub(crate) use state::MetricEngineState; use store_api::metadata::RegionMetadataRef; use store_api::metric_engine_consts::METRIC_ENGINE_NAME; use store_api::region_engine::{ - BatchResponses, CopyRegionFromRequest, CopyRegionFromResponse, RegionEngine, - RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic, RemapManifestsRequest, - RemapManifestsResponse, SetRegionRoleStateResponse, SetRegionRoleStateSuccess, - SettableRegionRoleState, SyncManifestResponse, + BatchResponses, RegionEngine, RegionRole, RegionScannerRef, RegionStatistic, + RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse, + SetRegionRoleStateSuccess, SettableRegionRoleState, SyncRegionFromRequest, + SyncRegionFromResponse, }; use store_api::region_request::{ BatchRegionDdlRequest, RegionCatchupRequest, RegionOpenRequest, RegionRequest, @@ -361,12 +361,30 @@ impl RegionEngine for MetricEngine { async fn sync_region( &self, region_id: RegionId, - manifest_info: RegionManifestInfo, - ) -> Result { - self.inner - .sync_region(region_id, manifest_info) - .await - .map_err(BoxedError::new) + request: SyncRegionFromRequest, + ) -> Result { + match request { + SyncRegionFromRequest::FromManifest(manifest_info) => self + .inner + .sync_region_from_manifest(region_id, manifest_info) + .await + .map_err(BoxedError::new), + SyncRegionFromRequest::FromRegion { + source_region_id, + parallelism, + } => { + if self.inner.is_physical_region(region_id) { + self.inner + .sync_region_from_region(region_id, source_region_id, parallelism) + .await + .map_err(BoxedError::new) + } else { + Err(BoxedError::new( + error::UnsupportedSyncRegionFromRequestSnafu { region_id }.build(), + )) + } + } + } } async fn remap_manifests( @@ -383,14 +401,6 @@ impl RegionEngine for MetricEngine { } } - async fn copy_region_from( - &self, - _region_id: RegionId, - _request: CopyRegionFromRequest, - ) -> Result { - todo!() - } - async fn set_region_role_state_gracefully( &self, region_id: RegionId, diff --git a/src/metric-engine/src/engine/open.rs b/src/metric-engine/src/engine/open.rs index 44302a56bb..70e3280183 100644 --- a/src/metric-engine/src/engine/open.rs +++ b/src/metric-engine/src/engine/open.rs @@ -290,6 +290,11 @@ impl MetricEngineInner { .metadata_region .logical_regions(physical_region_id) .await?; + common_telemetry::debug!( + "Recover states for physical region {}, logical regions: {:?}", + physical_region_id, + logical_regions + ); let physical_columns = self .data_region .physical_columns(physical_region_id) diff --git a/src/metric-engine/src/engine/sync.rs b/src/metric-engine/src/engine/sync.rs index 4a2741c12b..315b63f98a 100644 --- a/src/metric-engine/src/engine/sync.rs +++ b/src/metric-engine/src/engine/sync.rs @@ -12,242 +12,5 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::time::Instant; - -use common_telemetry::info; -use snafu::{OptionExt, ResultExt, ensure}; -use store_api::region_engine::{RegionEngine, RegionManifestInfo, SyncManifestResponse}; -use store_api::storage::RegionId; - -use crate::engine::MetricEngineInner; -use crate::error::{ - MetricManifestInfoSnafu, MitoSyncOperationSnafu, PhysicalRegionNotFoundSnafu, Result, -}; -use crate::utils; - -impl MetricEngineInner { - pub async fn sync_region( - &self, - region_id: RegionId, - manifest_info: RegionManifestInfo, - ) -> Result { - ensure!( - manifest_info.is_metric(), - MetricManifestInfoSnafu { region_id } - ); - - let metadata_region_id = utils::to_metadata_region_id(region_id); - // checked by ensure above - let metadata_manifest_version = manifest_info - .metadata_manifest_version() - .unwrap_or_default(); - let metadata_flushed_entry_id = manifest_info - .metadata_flushed_entry_id() - .unwrap_or_default(); - let metadata_region_manifest = - RegionManifestInfo::mito(metadata_manifest_version, metadata_flushed_entry_id, 0); - let metadata_synced = self - .mito - .sync_region(metadata_region_id, metadata_region_manifest) - .await - .context(MitoSyncOperationSnafu)? - .is_data_synced(); - - let data_region_id = utils::to_data_region_id(region_id); - let data_manifest_version = manifest_info.data_manifest_version(); - let data_flushed_entry_id = manifest_info.data_flushed_entry_id(); - let data_region_manifest = - RegionManifestInfo::mito(data_manifest_version, data_flushed_entry_id, 0); - - let data_synced = self - .mito - .sync_region(data_region_id, data_region_manifest) - .await - .context(MitoSyncOperationSnafu)? - .is_data_synced(); - - if !metadata_synced { - return Ok(SyncManifestResponse::Metric { - metadata_synced, - data_synced, - new_opened_logical_region_ids: vec![], - }); - } - - let now = Instant::now(); - // Recovers the states from the metadata region - // if the metadata manifest version is updated. - let physical_region_options = *self - .state - .read() - .unwrap() - .physical_region_states() - .get(&data_region_id) - .context(PhysicalRegionNotFoundSnafu { - region_id: data_region_id, - })? - .options(); - let new_opened_logical_region_ids = self - .recover_states(data_region_id, physical_region_options) - .await?; - info!( - "Sync metadata region for physical region {}, cost: {:?}, new opened logical region ids: {:?}", - data_region_id, - now.elapsed(), - new_opened_logical_region_ids - ); - - Ok(SyncManifestResponse::Metric { - metadata_synced, - data_synced, - new_opened_logical_region_ids, - }) - } -} - -#[cfg(test)] -mod tests { - use std::collections::HashMap; - - use api::v1::SemanticType; - use common_query::prelude::greptime_timestamp; - use common_telemetry::info; - use datatypes::data_type::ConcreteDataType; - use datatypes::schema::ColumnSchema; - use store_api::metadata::ColumnMetadata; - use store_api::region_engine::{RegionEngine, RegionManifestInfo}; - use store_api::region_request::{ - AddColumn, AlterKind, RegionAlterRequest, RegionFlushRequest, RegionRequest, - }; - use store_api::storage::RegionId; - - use crate::metadata_region::MetadataRegion; - use crate::test_util::TestEnv; - - #[tokio::test] - async fn test_sync_region_with_new_created_logical_regions() { - common_telemetry::init_default_ut_logging(); - let mut env = TestEnv::with_prefix("sync_with_new_created_logical_regions").await; - env.init_metric_region().await; - - info!("creating follower engine"); - // Create a follower engine. - let (_follower_mito, follower_metric) = env.create_follower_engine().await; - - let physical_region_id = env.default_physical_region_id(); - - // Flushes the physical region - let metric_engine = env.metric(); - metric_engine - .handle_request( - env.default_physical_region_id(), - RegionRequest::Flush(RegionFlushRequest::default()), - ) - .await - .unwrap(); - - let response = follower_metric - .sync_region(physical_region_id, RegionManifestInfo::metric(1, 0, 1, 0)) - .await - .unwrap(); - assert!(response.is_metric()); - let new_opened_logical_region_ids = response.new_opened_logical_region_ids().unwrap(); - assert_eq!(new_opened_logical_region_ids, vec![RegionId::new(3, 2)]); - - // Sync again, no new logical region should be opened - let response = follower_metric - .sync_region(physical_region_id, RegionManifestInfo::metric(1, 0, 1, 0)) - .await - .unwrap(); - assert!(response.is_metric()); - let new_opened_logical_region_ids = response.new_opened_logical_region_ids().unwrap(); - assert!(new_opened_logical_region_ids.is_empty()); - } - - fn test_alter_logical_region_request() -> RegionAlterRequest { - RegionAlterRequest { - kind: AlterKind::AddColumns { - columns: vec![AddColumn { - column_metadata: ColumnMetadata { - column_id: 0, - semantic_type: SemanticType::Tag, - column_schema: ColumnSchema::new( - "tag1", - ConcreteDataType::string_datatype(), - false, - ), - }, - location: None, - }], - }, - } - } - - #[tokio::test] - async fn test_sync_region_alter_alter_logical_region() { - common_telemetry::init_default_ut_logging(); - let mut env = TestEnv::with_prefix("sync_region_alter_alter_logical_region").await; - env.init_metric_region().await; - - info!("creating follower engine"); - let physical_region_id = env.default_physical_region_id(); - // Flushes the physical region - let metric_engine = env.metric(); - metric_engine - .handle_request( - env.default_physical_region_id(), - RegionRequest::Flush(RegionFlushRequest::default()), - ) - .await - .unwrap(); - - // Create a follower engine. - let (follower_mito, follower_metric) = env.create_follower_engine().await; - let metric_engine = env.metric(); - let engine_inner = env.metric().inner; - let region_id = env.default_logical_region_id(); - let request = test_alter_logical_region_request(); - - engine_inner - .alter_logical_regions( - physical_region_id, - vec![(region_id, request)], - &mut HashMap::new(), - ) - .await - .unwrap(); - - // Flushes the physical region - metric_engine - .handle_request( - env.default_physical_region_id(), - RegionRequest::Flush(RegionFlushRequest::default()), - ) - .await - .unwrap(); - - // Sync the follower engine - let response = follower_metric - .sync_region(physical_region_id, RegionManifestInfo::metric(2, 0, 2, 0)) - .await - .unwrap(); - assert!(response.is_metric()); - let new_opened_logical_region_ids = response.new_opened_logical_region_ids().unwrap(); - assert!(new_opened_logical_region_ids.is_empty()); - - let logical_region_id = env.default_logical_region_id(); - let metadata_region = MetadataRegion::new(follower_mito.clone()); - let semantic_type = metadata_region - .column_semantic_type(physical_region_id, logical_region_id, "tag1") - .await - .unwrap() - .unwrap(); - assert_eq!(semantic_type, SemanticType::Tag); - let timestamp_index = metadata_region - .column_semantic_type(physical_region_id, logical_region_id, greptime_timestamp()) - .await - .unwrap() - .unwrap(); - assert_eq!(timestamp_index, SemanticType::Timestamp); - } -} +mod manifest; +mod region; diff --git a/src/metric-engine/src/engine/sync/manifest.rs b/src/metric-engine/src/engine/sync/manifest.rs new file mode 100644 index 0000000000..4574a07a6c --- /dev/null +++ b/src/metric-engine/src/engine/sync/manifest.rs @@ -0,0 +1,268 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Instant; + +use common_telemetry::info; +use snafu::{OptionExt, ResultExt, ensure}; +use store_api::region_engine::{RegionEngine, RegionManifestInfo, SyncRegionFromResponse}; +use store_api::storage::RegionId; + +use crate::engine::MetricEngineInner; +use crate::error::{ + MetricManifestInfoSnafu, MitoSyncOperationSnafu, PhysicalRegionNotFoundSnafu, Result, +}; +use crate::utils; + +impl MetricEngineInner { + /// Syncs the region from the given manifest information (leader-follower scenario). + /// + /// This operation: + /// 1. Syncs the metadata region manifest to the target version. + /// 2. Syncs the data region manifest to the target version. + /// 3. Recovers states and returns newly opened logical regions (if metadata was synced) + pub async fn sync_region_from_manifest( + &self, + region_id: RegionId, + manifest_info: RegionManifestInfo, + ) -> Result { + ensure!( + manifest_info.is_metric(), + MetricManifestInfoSnafu { region_id } + ); + + let metadata_region_id = utils::to_metadata_region_id(region_id); + // checked by ensure above + let metadata_manifest_version = manifest_info + .metadata_manifest_version() + .unwrap_or_default(); + let metadata_flushed_entry_id = manifest_info + .metadata_flushed_entry_id() + .unwrap_or_default(); + let metadata_region_manifest = + RegionManifestInfo::mito(metadata_manifest_version, metadata_flushed_entry_id, 0); + let metadata_synced = self + .mito + .sync_region(metadata_region_id, metadata_region_manifest.into()) + .await + .context(MitoSyncOperationSnafu)? + .is_data_synced(); + + let data_region_id = utils::to_data_region_id(region_id); + let data_manifest_version = manifest_info.data_manifest_version(); + let data_flushed_entry_id = manifest_info.data_flushed_entry_id(); + let data_region_manifest = + RegionManifestInfo::mito(data_manifest_version, data_flushed_entry_id, 0); + + let data_synced = self + .mito + .sync_region(data_region_id, data_region_manifest.into()) + .await + .context(MitoSyncOperationSnafu)? + .is_data_synced(); + + if !metadata_synced { + return Ok(SyncRegionFromResponse::Metric { + metadata_synced, + data_synced, + new_opened_logical_region_ids: vec![], + }); + } + + let now = Instant::now(); + // Recovers the states from the metadata region + // if the metadata manifest version is updated. + let physical_region_options = *self + .state + .read() + .unwrap() + .physical_region_states() + .get(&data_region_id) + .context(PhysicalRegionNotFoundSnafu { + region_id: data_region_id, + })? + .options(); + let new_opened_logical_region_ids = self + .recover_states(data_region_id, physical_region_options) + .await?; + info!( + "Sync metadata region for physical region {}, cost: {:?}, new opened logical region ids: {:?}", + data_region_id, + now.elapsed(), + new_opened_logical_region_ids + ); + + Ok(SyncRegionFromResponse::Metric { + metadata_synced, + data_synced, + new_opened_logical_region_ids, + }) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use api::v1::SemanticType; + use common_query::prelude::greptime_timestamp; + use common_telemetry::info; + use datatypes::data_type::ConcreteDataType; + use datatypes::schema::ColumnSchema; + use store_api::metadata::ColumnMetadata; + use store_api::region_engine::{RegionEngine, RegionManifestInfo}; + use store_api::region_request::{ + AddColumn, AlterKind, RegionAlterRequest, RegionFlushRequest, RegionRequest, + }; + use store_api::storage::RegionId; + + use crate::metadata_region::MetadataRegion; + use crate::test_util::TestEnv; + + #[tokio::test] + async fn test_sync_region_with_new_created_logical_regions() { + common_telemetry::init_default_ut_logging(); + let mut env = TestEnv::with_prefix("sync_with_new_created_logical_regions").await; + env.init_metric_region().await; + + info!("creating follower engine"); + // Create a follower engine. + let (_follower_mito, follower_metric) = env.create_follower_engine().await; + + let physical_region_id = env.default_physical_region_id(); + + // Flushes the physical region + let metric_engine = env.metric(); + metric_engine + .handle_request( + env.default_physical_region_id(), + RegionRequest::Flush(RegionFlushRequest::default()), + ) + .await + .unwrap(); + + let response = follower_metric + .sync_region( + physical_region_id, + RegionManifestInfo::metric(1, 0, 1, 0).into(), + ) + .await + .unwrap(); + assert!(response.is_metric()); + let new_opened_logical_region_ids = response.new_opened_logical_region_ids().unwrap(); + assert_eq!(new_opened_logical_region_ids, vec![RegionId::new(3, 2)]); + + // Sync again, no new logical region should be opened + let response = follower_metric + .sync_region( + physical_region_id, + RegionManifestInfo::metric(1, 0, 1, 0).into(), + ) + .await + .unwrap(); + assert!(response.is_metric()); + let new_opened_logical_region_ids = response.new_opened_logical_region_ids().unwrap(); + assert!(new_opened_logical_region_ids.is_empty()); + } + + fn test_alter_logical_region_request() -> RegionAlterRequest { + RegionAlterRequest { + kind: AlterKind::AddColumns { + columns: vec![AddColumn { + column_metadata: ColumnMetadata { + column_id: 0, + semantic_type: SemanticType::Tag, + column_schema: ColumnSchema::new( + "tag1", + ConcreteDataType::string_datatype(), + false, + ), + }, + location: None, + }], + }, + } + } + + #[tokio::test] + async fn test_sync_region_alter_alter_logical_region() { + common_telemetry::init_default_ut_logging(); + let mut env = TestEnv::with_prefix("sync_region_alter_alter_logical_region").await; + env.init_metric_region().await; + + info!("creating follower engine"); + let physical_region_id = env.default_physical_region_id(); + // Flushes the physical region + let metric_engine = env.metric(); + metric_engine + .handle_request( + env.default_physical_region_id(), + RegionRequest::Flush(RegionFlushRequest::default()), + ) + .await + .unwrap(); + + // Create a follower engine. + let (follower_mito, follower_metric) = env.create_follower_engine().await; + let metric_engine = env.metric(); + let engine_inner = env.metric().inner; + let region_id = env.default_logical_region_id(); + let request = test_alter_logical_region_request(); + + engine_inner + .alter_logical_regions( + physical_region_id, + vec![(region_id, request)], + &mut HashMap::new(), + ) + .await + .unwrap(); + + // Flushes the physical region + metric_engine + .handle_request( + env.default_physical_region_id(), + RegionRequest::Flush(RegionFlushRequest::default()), + ) + .await + .unwrap(); + + // Sync the follower engine + let response = follower_metric + .sync_region( + physical_region_id, + RegionManifestInfo::metric(2, 0, 2, 0).into(), + ) + .await + .unwrap(); + assert!(response.is_metric()); + let new_opened_logical_region_ids = response.new_opened_logical_region_ids().unwrap(); + assert!(new_opened_logical_region_ids.is_empty()); + + let logical_region_id = env.default_logical_region_id(); + let metadata_region = MetadataRegion::new(follower_mito.clone()); + let semantic_type = metadata_region + .column_semantic_type(physical_region_id, logical_region_id, "tag1") + .await + .unwrap() + .unwrap(); + assert_eq!(semantic_type, SemanticType::Tag); + let timestamp_index = metadata_region + .column_semantic_type(physical_region_id, logical_region_id, greptime_timestamp()) + .await + .unwrap() + .unwrap(); + assert_eq!(timestamp_index, SemanticType::Timestamp); + } +} diff --git a/src/metric-engine/src/engine/sync/region.rs b/src/metric-engine/src/engine/sync/region.rs new file mode 100644 index 0000000000..3fe51d1b68 --- /dev/null +++ b/src/metric-engine/src/engine/sync/region.rs @@ -0,0 +1,386 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Instant; + +use common_error::ext::BoxedError; +use common_telemetry::info; +use mito2::manifest::action::RegionEdit; +use snafu::{OptionExt, ResultExt, ensure}; +use store_api::region_engine::{MitoCopyRegionFromRequest, SyncRegionFromResponse}; +use store_api::storage::RegionId; + +use crate::engine::MetricEngineInner; +use crate::error::{ + MissingFilesSnafu, MitoCopyRegionFromOperationSnafu, MitoEditRegionSnafu, + PhysicalRegionNotFoundSnafu, Result, +}; +use crate::utils; + +impl MetricEngineInner { + /// Syncs the logical regions from the source region to the target region in the metric engine. + /// + /// This operation: + /// 1. Copies SST files from source metadata region to target metadata region + /// 2. Transforms logical region metadata (updates region numbers to match target) + /// 3. Edits target manifest to remove old file entries (copied files) + /// 4. Recovers states and returns newly opened logical region IDs + /// + /// **Note**: Only the metadata region is synced. The data region is not affected. + pub(crate) async fn sync_region_from_region( + &self, + region_id: RegionId, + source_region_id: RegionId, + parallelism: usize, + ) -> Result { + let source_metadata_region_id = utils::to_metadata_region_id(source_region_id); + let target_metadata_region_id = utils::to_metadata_region_id(region_id); + let target_data_region_id = utils::to_data_region_id(region_id); + let source_data_region_id = utils::to_data_region_id(source_region_id); + info!( + "Syncing region from region {} to region {}, parallelism: {}", + source_region_id, region_id, parallelism + ); + + let res = self + .mito + .copy_region_from( + target_metadata_region_id, + MitoCopyRegionFromRequest { + source_region_id: source_metadata_region_id, + parallelism, + }, + ) + .await + .map_err(BoxedError::new) + .context(MitoCopyRegionFromOperationSnafu { + source_region_id: source_metadata_region_id, + target_region_id: target_metadata_region_id, + })?; + + if res.copied_file_ids.is_empty() { + info!( + "No files were copied from source region {} to target region {}, copied file ids are empty", + source_metadata_region_id, target_metadata_region_id + ); + return Ok(SyncRegionFromResponse::Metric { + metadata_synced: false, + data_synced: false, + new_opened_logical_region_ids: vec![], + }); + } + + let target_region = self.mito.find_region(target_metadata_region_id).context( + PhysicalRegionNotFoundSnafu { + region_id: target_metadata_region_id, + }, + )?; + let files_to_remove = target_region.file_metas(&res.copied_file_ids).await; + let missing_file_ids = res + .copied_file_ids + .iter() + .zip(&files_to_remove) + .filter_map(|(file_id, maybe_meta)| { + if maybe_meta.is_none() { + Some(*file_id) + } else { + None + } + }) + .collect::>(); + // `copy_region_from` does not trigger compaction, + // so there should be no files removed and thus no missing files. + ensure!( + missing_file_ids.is_empty(), + MissingFilesSnafu { + region_id: target_metadata_region_id, + file_ids: missing_file_ids, + } + ); + let files_to_remove = files_to_remove.into_iter().flatten().collect::>(); + // Transform the logical region metadata of the target data region. + self.metadata_region + .transform_logical_region_metadata(target_data_region_id, source_data_region_id) + .await?; + + let edit = RegionEdit { + files_to_add: vec![], + files_to_remove: files_to_remove.clone(), + timestamp_ms: Some(chrono::Utc::now().timestamp_millis()), + compaction_time_window: None, + flushed_entry_id: None, + flushed_sequence: None, + committed_sequence: None, + }; + self.mito + .edit_region(target_metadata_region_id, edit) + .await + .map_err(BoxedError::new) + .context(MitoEditRegionSnafu { + region_id: target_metadata_region_id, + })?; + info!( + "Successfully edit metadata region: {} after syncing from source metadata region: {}, files to remove: {:?}", + target_metadata_region_id, + source_metadata_region_id, + files_to_remove + .iter() + .map(|meta| meta.file_id) + .collect::>(), + ); + + let now = Instant::now(); + // Always recover states from the target metadata region after syncing + // from the source metadata region. + let physical_region_options = *self + .state + .read() + .unwrap() + .physical_region_states() + .get(&target_data_region_id) + .context(PhysicalRegionNotFoundSnafu { + region_id: target_data_region_id, + })? + .options(); + let new_opened_logical_region_ids = self + .recover_states(target_data_region_id, physical_region_options) + .await?; + info!( + "Sync metadata region from source region {} to target region {}, recover states cost: {:?}, new opened logical region ids: {:?}", + source_metadata_region_id, + target_metadata_region_id, + now.elapsed(), + new_opened_logical_region_ids + ); + + Ok(SyncRegionFromResponse::Metric { + metadata_synced: true, + data_synced: false, + new_opened_logical_region_ids, + }) + } +} + +#[cfg(test)] +mod tests { + + use common_error::ext::ErrorExt; + use common_error::status_code::StatusCode; + use common_telemetry::debug; + use store_api::metric_engine_consts::{METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY}; + use store_api::region_engine::{RegionEngine, SyncRegionFromRequest}; + use store_api::region_request::{ + BatchRegionDdlRequest, PathType, RegionCloseRequest, RegionFlushRequest, RegionOpenRequest, + RegionRequest, + }; + use store_api::storage::RegionId; + + use crate::metadata_region::MetadataRegion; + use crate::test_util::{TestEnv, create_logical_region_request}; + + async fn assert_logical_table_columns( + metadata_region: &MetadataRegion, + physical_region_id: RegionId, + logical_region_id: RegionId, + expected_columns: &[&str], + ) { + let mut columns = metadata_region + .logical_columns(physical_region_id, logical_region_id) + .await + .unwrap() + .into_iter() + .map(|(n, _)| n) + .collect::>(); + columns.sort_unstable(); + assert_eq!(columns, expected_columns); + } + + #[tokio::test] + async fn test_sync_region_from_region() { + common_telemetry::init_default_ut_logging(); + let env = TestEnv::new().await; + let metric_engine = env.metric(); + let source_physical_region_id = RegionId::new(1024, 0); + let logical_region_id1 = RegionId::new(1025, 0); + let logical_region_id2 = RegionId::new(1026, 0); + env.create_physical_region(source_physical_region_id, "/test_dir1", vec![]) + .await; + let region_create_request1 = + create_logical_region_request(&["job"], source_physical_region_id, "logical1"); + let region_create_request2 = + create_logical_region_request(&["host"], source_physical_region_id, "logical2"); + metric_engine + .handle_batch_ddl_requests(BatchRegionDdlRequest::Create(vec![ + (logical_region_id1, region_create_request1), + (logical_region_id2, region_create_request2), + ])) + .await + .unwrap(); + debug!("Flushing source physical region"); + metric_engine + .handle_request( + source_physical_region_id, + RegionRequest::Flush(RegionFlushRequest { + row_group_size: None, + }), + ) + .await + .unwrap(); + let logical_regions = metric_engine + .logical_regions(source_physical_region_id) + .await + .unwrap(); + assert!(logical_regions.contains(&logical_region_id1)); + assert!(logical_regions.contains(&logical_region_id2)); + + let target_physical_region_id = RegionId::new(1024, 1); + let target_logical_region_id1 = RegionId::new(1025, 1); + let target_logical_region_id2 = RegionId::new(1026, 1); + // Prepare target physical region + env.create_physical_region(target_physical_region_id, "/test_dir1", vec![]) + .await; + let r = metric_engine + .sync_region( + target_physical_region_id, + SyncRegionFromRequest::FromRegion { + source_region_id: source_physical_region_id, + parallelism: 1, + }, + ) + .await + .unwrap(); + let new_opened_logical_region_ids = r.new_opened_logical_region_ids().unwrap(); + assert_eq!(new_opened_logical_region_ids.len(), 2); + assert!(new_opened_logical_region_ids.contains(&target_logical_region_id1)); + assert!(new_opened_logical_region_ids.contains(&target_logical_region_id2)); + debug!("Sync region from again"); + assert_logical_table_columns( + &env.metadata_region(), + target_physical_region_id, + target_logical_region_id1, + &["greptime_timestamp", "greptime_value", "job"], + ) + .await; + assert_logical_table_columns( + &env.metadata_region(), + target_physical_region_id, + target_logical_region_id2, + &["greptime_timestamp", "greptime_value", "host"], + ) + .await; + let logical_regions = env + .metadata_region() + .logical_regions(target_physical_region_id) + .await + .unwrap(); + assert_eq!(logical_regions.len(), 2); + assert!(logical_regions.contains(&target_logical_region_id1)); + assert!(logical_regions.contains(&target_logical_region_id2)); + + // Should be ok to sync region from again. + let r = metric_engine + .sync_region( + target_physical_region_id, + SyncRegionFromRequest::FromRegion { + source_region_id: source_physical_region_id, + parallelism: 1, + }, + ) + .await + .unwrap(); + let new_opened_logical_region_ids = r.new_opened_logical_region_ids().unwrap(); + assert!(new_opened_logical_region_ids.is_empty()); + + // Try to close region and reopen it, should be ok. + metric_engine + .handle_request( + target_physical_region_id, + RegionRequest::Close(RegionCloseRequest {}), + ) + .await + .unwrap(); + let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())] + .into_iter() + .collect(); + metric_engine + .handle_request( + target_physical_region_id, + RegionRequest::Open(RegionOpenRequest { + engine: METRIC_ENGINE_NAME.to_string(), + table_dir: "/test_dir1".to_string(), + path_type: PathType::Bare, + options: physical_region_option, + skip_wal_replay: false, + checkpoint: None, + }), + ) + .await + .unwrap(); + let logical_regions = env + .metadata_region() + .logical_regions(target_physical_region_id) + .await + .unwrap(); + assert_eq!(logical_regions.len(), 2); + assert!(logical_regions.contains(&target_logical_region_id1)); + assert!(logical_regions.contains(&target_logical_region_id2)); + } + + #[tokio::test] + async fn test_sync_region_from_region_with_no_files() { + common_telemetry::init_default_ut_logging(); + let env = TestEnv::new().await; + let metric_engine = env.metric(); + let source_physical_region_id = RegionId::new(1024, 0); + env.create_physical_region(source_physical_region_id, "/test_dir1", vec![]) + .await; + let target_physical_region_id = RegionId::new(1024, 1); + env.create_physical_region(target_physical_region_id, "/test_dir1", vec![]) + .await; + let r = metric_engine + .sync_region( + target_physical_region_id, + SyncRegionFromRequest::FromRegion { + source_region_id: source_physical_region_id, + parallelism: 1, + }, + ) + .await + .unwrap(); + let new_opened_logical_region_ids = r.new_opened_logical_region_ids().unwrap(); + assert!(new_opened_logical_region_ids.is_empty()); + } + + #[tokio::test] + async fn test_sync_region_from_region_source_not_exist() { + common_telemetry::init_default_ut_logging(); + let env = TestEnv::new().await; + let metric_engine = env.metric(); + let source_physical_region_id = RegionId::new(1024, 0); + let target_physical_region_id = RegionId::new(1024, 1); + env.create_physical_region(target_physical_region_id, "/test_dir1", vec![]) + .await; + let err = metric_engine + .sync_region( + target_physical_region_id, + SyncRegionFromRequest::FromRegion { + source_region_id: source_physical_region_id, + parallelism: 1, + }, + ) + .await + .unwrap_err(); + assert_eq!(err.status_code(), StatusCode::InvalidArguments); + } +} diff --git a/src/metric-engine/src/error.rs b/src/metric-engine/src/error.rs index 3d00b737c3..7366232458 100644 --- a/src/metric-engine/src/error.rs +++ b/src/metric-engine/src/error.rs @@ -21,7 +21,7 @@ use common_macro::stack_trace_debug; use datatypes::prelude::ConcreteDataType; use snafu::{Location, Snafu}; use store_api::region_request::RegionRequest; -use store_api::storage::RegionId; +use store_api::storage::{FileId, RegionId}; #[derive(Snafu)] #[snafu(visibility(pub))] @@ -128,6 +128,27 @@ pub enum Error { location: Location, }, + #[snafu(display( + "Mito copy region from operation fails, source region id: {}, target region id: {}", + source_region_id, + target_region_id + ))] + MitoCopyRegionFromOperation { + source: BoxedError, + #[snafu(implicit)] + location: Location, + source_region_id: RegionId, + target_region_id: RegionId, + }, + + #[snafu(display("Mito edit region operation fails, region id: {}", region_id))] + MitoEditRegion { + region_id: RegionId, + source: BoxedError, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to encode primary key"))] EncodePrimaryKey { source: mito_codec::error::Error, @@ -256,6 +277,21 @@ pub enum Error { location: Location, }, + #[snafu(display("Unsupported sync region from request for region {}", region_id))] + UnsupportedSyncRegionFromRequest { + region_id: RegionId, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Missing file metas in region {}, file ids: {:?}", region_id, file_ids))] + MissingFiles { + region_id: RegionId, + #[snafu(implicit)] + location: Location, + file_ids: Vec, + }, + #[snafu(display("Unsupported alter kind: {}", kind))] UnsupportedAlterKind { kind: String, @@ -339,11 +375,12 @@ impl ErrorExt for Error { | ParseRegionOptions { .. } | UnexpectedRequest { .. } | UnsupportedAlterKind { .. } - | UnsupportedRemapManifestsRequest { .. } => StatusCode::InvalidArguments, + | UnsupportedRemapManifestsRequest { .. } + | UnsupportedSyncRegionFromRequest { .. } => StatusCode::InvalidArguments, - ForbiddenPhysicalAlter { .. } | UnsupportedRegionRequest { .. } => { - StatusCode::Unsupported - } + ForbiddenPhysicalAlter { .. } + | UnsupportedRegionRequest { .. } + | MissingFiles { .. } => StatusCode::Unsupported, DeserializeColumnMetadata { .. } | SerializeColumnMetadata { .. } @@ -369,7 +406,9 @@ impl ErrorExt for Error { | MitoSyncOperation { source, .. } | MitoEnterStagingOperation { source, .. } | BatchOpenMitoRegion { source, .. } - | BatchCatchupMitoRegion { source, .. } => source.status_code(), + | BatchCatchupMitoRegion { source, .. } + | MitoCopyRegionFromOperation { source, .. } + | MitoEditRegion { source, .. } => source.status_code(), EncodePrimaryKey { source, .. } => source.status_code(), diff --git a/src/metric-engine/src/metadata_region.rs b/src/metric-engine/src/metadata_region.rs index c34b44e4a7..dd3f45d5b7 100644 --- a/src/metric-engine/src/metadata_region.rs +++ b/src/metric-engine/src/metadata_region.rs @@ -25,6 +25,7 @@ use base64::Engine; use base64::engine::general_purpose::STANDARD_NO_PAD; use common_base::readable_size::ReadableSize; use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; +use common_telemetry::{debug, info, warn}; use datafusion::prelude::{col, lit}; use futures_util::TryStreamExt; use futures_util::stream::BoxStream; @@ -400,14 +401,11 @@ impl MetadataRegion { .await .context(CacheGetSnafu)?; - let range = region_metadata.key_values.range(prefix.to_string()..); let mut result = HashMap::new(); - for (k, v) in range { - if !k.starts_with(prefix) { - break; - } - result.insert(k.clone(), v.clone()); - } + get_all_with_prefix(®ion_metadata, prefix, |k, v| { + result.insert(k.to_string(), v.to_string()); + Ok(()) + })?; Ok(result) } @@ -558,6 +556,109 @@ impl MetadataRegion { Ok(()) } + + /// Updates logical region metadata so that any entries previously referencing + /// `source_region_id` are modified to reference the data region of `physical_region_id`. + /// + /// This method should be called after copying files from `source_region_id` + /// into the target region. It scans the metadata for the target physical + /// region, finds logical regions with the same region number as the source, + /// and reinserts region and column entries updated to use the target's + /// region number. + pub async fn transform_logical_region_metadata( + &self, + physical_region_id: RegionId, + source_region_id: RegionId, + ) -> Result<()> { + let metadata_region_id = utils::to_metadata_region_id(physical_region_id); + let data_region_id = utils::to_data_region_id(physical_region_id); + let logical_regions = self + .logical_regions(data_region_id) + .await? + .into_iter() + .filter(|r| r.region_number() == source_region_id.region_number()) + .collect::>(); + if logical_regions.is_empty() { + info!( + "No logical regions found from source region {}, physical region id: {}", + source_region_id, physical_region_id, + ); + return Ok(()); + } + + let metadata = self.load_all(metadata_region_id).await?; + let mut output = Vec::new(); + for logical_region_id in &logical_regions { + let prefix = MetadataRegion::concat_column_key_prefix(*logical_region_id); + get_all_with_prefix(&metadata, &prefix, |k, v| { + // Safety: we have checked the prefix + let (src_logical_region_id, column_name) = Self::parse_column_key(k)?.unwrap(); + // Change the region number to the data region number. + let new_key = MetadataRegion::concat_column_key( + RegionId::new( + src_logical_region_id.table_id(), + data_region_id.region_number(), + ), + &column_name, + ); + output.push((new_key, v.to_string())); + Ok(()) + })?; + + let new_key = MetadataRegion::concat_region_key(RegionId::new( + logical_region_id.table_id(), + data_region_id.region_number(), + )); + output.push((new_key, String::new())); + } + + if output.is_empty() { + warn!( + "No logical regions metadata found from source region {}, physical region id: {}", + source_region_id, physical_region_id + ); + return Ok(()); + } + + debug!( + "Transform logical regions metadata to physical region {}, source region: {}, transformed metadata: {}", + data_region_id, + source_region_id, + output.len(), + ); + + let put_request = MetadataRegion::build_put_request_from_iter(output.into_iter()); + self.mito + .handle_request( + metadata_region_id, + store_api::region_request::RegionRequest::Put(put_request), + ) + .await + .context(MitoWriteOperationSnafu)?; + info!( + "Transformed {} logical regions metadata to physical region {}, source region: {}", + logical_regions.len(), + data_region_id, + source_region_id + ); + self.cache.invalidate(&metadata_region_id).await; + Ok(()) + } +} + +fn get_all_with_prefix( + region_metadata: &RegionMetadataCacheEntry, + prefix: &str, + mut callback: impl FnMut(&str, &str) -> Result<()>, +) -> Result<()> { + let range = region_metadata.key_values.range(prefix.to_string()..); + for (k, v) in range { + if !k.starts_with(prefix) { + break; + } + callback(k, v)?; + } + Ok(()) } #[cfg(test)] diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 2b7b834cd3..4ac52683cf 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -89,6 +89,7 @@ use api::region::RegionResponse; use async_trait::async_trait; use common_base::Plugins; use common_error::ext::BoxedError; +use common_meta::error::UnexpectedSnafu; use common_meta::key::SchemaMetadataManagerRef; use common_recordbatch::{MemoryPermit, QueryMemoryTracker, SendableRecordBatchStream}; use common_stat::get_total_memory_bytes; @@ -107,10 +108,10 @@ use store_api::metric_engine_consts::{ MANIFEST_INFO_EXTENSION_KEY, TABLE_COLUMN_METADATA_EXTENSION_KEY, }; use store_api::region_engine::{ - BatchResponses, CopyRegionFromRequest, CopyRegionFromResponse, MitoCopyRegionFromResponse, - RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic, - RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse, - SettableRegionRoleState, SyncManifestResponse, + BatchResponses, MitoCopyRegionFromRequest, MitoCopyRegionFromResponse, RegionEngine, + RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic, RemapManifestsRequest, + RemapManifestsResponse, SetRegionRoleStateResponse, SettableRegionRoleState, + SyncRegionFromRequest, SyncRegionFromResponse, }; use store_api::region_request::{ AffectedRows, RegionCatchupRequest, RegionOpenRequest, RegionRequest, @@ -124,8 +125,8 @@ use crate::cache::{CacheManagerRef, CacheStrategy}; use crate::config::MitoConfig; use crate::engine::puffin_index::{IndexEntryContext, collect_index_entries_from_puffin}; use crate::error::{ - self, InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, - Result, SerdeJsonSnafu, SerializeColumnMetadataSnafu, SerializeManifestSnafu, + InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result, + SerdeJsonSnafu, SerializeColumnMetadataSnafu, SerializeManifestSnafu, }; #[cfg(feature = "enterprise")] use crate::extension::BoxedExtensionRangeProviderFactory; @@ -397,7 +398,7 @@ impl MitoEngine { } /// Edit region's metadata by [RegionEdit] directly. Use with care. - /// Now we only allow adding files to region (the [RegionEdit] struct can only contain a non-empty "files_to_add" field). + /// Now we only allow adding files or removing files from region (the [RegionEdit] struct can only contain a non-empty "files_to_add" or "files_to_remove" field). /// Other region editing intention will result in an "invalid request" error. /// Also note that if a region is to be edited directly, we MUST not write data to it thereafter. pub async fn edit_region(&self, region_id: RegionId, edit: RegionEdit) -> Result<()> { @@ -432,7 +433,7 @@ impl MitoEngine { pub async fn copy_region_from( &self, region_id: RegionId, - request: CopyRegionFromRequest, + request: MitoCopyRegionFromRequest, ) -> Result { self.inner.copy_region_from(region_id, request).await } @@ -641,8 +642,7 @@ impl MitoEngine { /// /// Only adding or removing files to region is considered valid now. fn is_valid_region_edit(edit: &RegionEdit) -> bool { - !edit.files_to_add.is_empty() - && edit.files_to_remove.is_empty() + (!edit.files_to_add.is_empty() || !edit.files_to_remove.is_empty()) && matches!( edit, RegionEdit { @@ -1075,7 +1075,7 @@ impl EngineInner { async fn copy_region_from( &self, region_id: RegionId, - request: CopyRegionFromRequest, + request: MitoCopyRegionFromRequest, ) -> Result { let (request, receiver) = WorkerRequest::try_from_copy_region_from_request(region_id, request)?; @@ -1249,15 +1249,21 @@ impl RegionEngine for MitoEngine { async fn sync_region( &self, region_id: RegionId, - manifest_info: RegionManifestInfo, - ) -> Result { + request: SyncRegionFromRequest, + ) -> Result { + let manifest_info = request + .into_region_manifest_info() + .context(UnexpectedSnafu { + err_msg: "Expected a manifest info request", + }) + .map_err(BoxedError::new)?; let (_, synced) = self .inner .sync_region(region_id, manifest_info) .await .map_err(BoxedError::new)?; - Ok(SyncManifestResponse::Mito { synced }) + Ok(SyncRegionFromResponse::Mito { synced }) } async fn remap_manifests( @@ -1270,19 +1276,6 @@ impl RegionEngine for MitoEngine { .map_err(BoxedError::new) } - async fn copy_region_from( - &self, - _region_id: RegionId, - _request: CopyRegionFromRequest, - ) -> Result { - Err(BoxedError::new( - error::UnsupportedOperationSnafu { - err_msg: "copy_region_from is not supported", - } - .build(), - )) - } - fn role(&self, region_id: RegionId) -> Option { self.inner.role(region_id) } @@ -1421,7 +1414,7 @@ mod tests { }; assert!(is_valid_region_edit(&edit)); - // Invalid: "files_to_add" is empty + // Invalid: "files_to_add" and "files_to_remove" are both empty let edit = RegionEdit { files_to_add: vec![], files_to_remove: vec![], @@ -1433,7 +1426,7 @@ mod tests { }; assert!(!is_valid_region_edit(&edit)); - // Invalid: "files_to_remove" is not empty + // Valid: "files_to_remove" is not empty let edit = RegionEdit { files_to_add: vec![FileMeta::default()], files_to_remove: vec![FileMeta::default()], @@ -1443,7 +1436,7 @@ mod tests { flushed_sequence: None, committed_sequence: None, }; - assert!(!is_valid_region_edit(&edit)); + assert!(is_valid_region_edit(&edit)); // Invalid: other fields are not all "None"s let edit = RegionEdit { diff --git a/src/mito2/src/engine/copy_region_from_test.rs b/src/mito2/src/engine/copy_region_from_test.rs index c42e1fc781..75580d5c0b 100644 --- a/src/mito2/src/engine/copy_region_from_test.rs +++ b/src/mito2/src/engine/copy_region_from_test.rs @@ -20,7 +20,7 @@ use api::v1::Rows; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use object_store::layers::mock::{Error as MockError, ErrorKind, MockLayerBuilder}; -use store_api::region_engine::{CopyRegionFromRequest, RegionEngine, RegionRole}; +use store_api::region_engine::{MitoCopyRegionFromRequest, RegionEngine, RegionRole}; use store_api::region_request::{RegionFlushRequest, RegionRequest}; use store_api::storage::RegionId; @@ -89,7 +89,7 @@ async fn test_engine_copy_region_from_with_format(flat_format: bool, with_index: let resp = engine .copy_region_from( target_region_id, - CopyRegionFromRequest { + MitoCopyRegionFromRequest { source_region_id, parallelism: 1, }, @@ -126,7 +126,7 @@ async fn test_engine_copy_region_from_with_format(flat_format: bool, with_index: let resp2 = engine .copy_region_from( target_region_id, - CopyRegionFromRequest { + MitoCopyRegionFromRequest { source_region_id, parallelism: 1, }, @@ -207,7 +207,7 @@ async fn test_engine_copy_region_failure_with_format(flat_format: bool) { let err = engine .copy_region_from( target_region_id, - CopyRegionFromRequest { + MitoCopyRegionFromRequest { source_region_id, parallelism: 1, }, @@ -225,7 +225,6 @@ async fn test_engine_copy_region_failure_with_format(flat_format: bool) { let source_region_dir = format!("{}/data/test/1_0000000001", env.data_home().display()); assert_file_num_in_dir(&source_region_dir, 1); assert_file_num_in_dir(&format!("{}/index", source_region_dir), 1); - assert_eq!( source_region_files, collect_filename_in_dir(&source_region_dir) @@ -298,7 +297,7 @@ async fn test_engine_copy_region_invalid_args_with_format(flat_format: bool) { let err = engine .copy_region_from( region_id, - CopyRegionFromRequest { + MitoCopyRegionFromRequest { source_region_id: RegionId::new(2, 1), parallelism: 1, }, @@ -309,7 +308,7 @@ async fn test_engine_copy_region_invalid_args_with_format(flat_format: bool) { let err = engine .copy_region_from( region_id, - CopyRegionFromRequest { + MitoCopyRegionFromRequest { source_region_id: RegionId::new(1, 1), parallelism: 1, }, @@ -347,7 +346,7 @@ async fn test_engine_copy_region_unexpected_state_with_format(flat_format: bool) let err = engine .copy_region_from( region_id, - CopyRegionFromRequest { + MitoCopyRegionFromRequest { source_region_id: RegionId::new(1, 2), parallelism: 1, }, diff --git a/src/mito2/src/engine/sync_test.rs b/src/mito2/src/engine/sync_test.rs index 6b98d4ba0f..65b997e498 100644 --- a/src/mito2/src/engine/sync_test.rs +++ b/src/mito2/src/engine/sync_test.rs @@ -153,7 +153,7 @@ async fn test_sync_after_flush_region_with_format(flat_format: bool) { // Returns error since the max manifest is 1 let manifest_info = RegionManifestInfo::mito(2, 0, 0); let err = follower_engine - .sync_region(region_id, manifest_info) + .sync_region(region_id, manifest_info.into()) .await .unwrap_err(); let err = err.as_any().downcast_ref::().unwrap(); @@ -161,7 +161,7 @@ async fn test_sync_after_flush_region_with_format(flat_format: bool) { let manifest_info = RegionManifestInfo::mito(1, 0, 0); follower_engine - .sync_region(region_id, manifest_info) + .sync_region(region_id, manifest_info.into()) .await .unwrap(); common_telemetry::info!("Scan the region on the follower engine after sync"); @@ -266,7 +266,7 @@ async fn test_sync_after_alter_region_with_format(flat_format: bool) { // Sync the region from the leader engine to the follower engine let manifest_info = RegionManifestInfo::mito(2, 0, 0); follower_engine - .sync_region(region_id, manifest_info) + .sync_region(region_id, manifest_info.into()) .await .unwrap(); let expected = "\ diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 14afa1c76d..7c40ee0d23 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -824,13 +824,13 @@ impl WorkerRequest { Ok((WorkerRequest::RemapManifests(request), receiver)) } - /// Converts [CopyRegionFromRequest] from a [CopyRegionFromRequest](store_api::region_engine::CopyRegionFromRequest). + /// Converts [CopyRegionFromRequest] from a [MitoCopyRegionFromRequest](store_api::region_engine::MitoCopyRegionFromRequest). pub(crate) fn try_from_copy_region_from_request( region_id: RegionId, - store_api::region_engine::CopyRegionFromRequest { + store_api::region_engine::MitoCopyRegionFromRequest { source_region_id, parallelism, - }: store_api::region_engine::CopyRegionFromRequest, + }: store_api::region_engine::MitoCopyRegionFromRequest, ) -> Result<(WorkerRequest, Receiver>)> { let (sender, receiver) = oneshot::channel(); let request = CopyRegionFromRequest { diff --git a/src/query/src/optimizer/test_util.rs b/src/query/src/optimizer/test_util.rs index 8258b13490..08d4d4774b 100644 --- a/src/query/src/optimizer/test_util.rs +++ b/src/query/src/optimizer/test_util.rs @@ -28,9 +28,9 @@ use store_api::metadata::{ ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef, }; use store_api::region_engine::{ - CopyRegionFromRequest, CopyRegionFromResponse, RegionEngine, RegionManifestInfo, RegionRole, - RegionScannerRef, RegionStatistic, RemapManifestsRequest, RemapManifestsResponse, - SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse, + RegionEngine, RegionRole, RegionScannerRef, RegionStatistic, RemapManifestsRequest, + RemapManifestsResponse, SetRegionRoleStateResponse, SettableRegionRoleState, + SyncRegionFromRequest, SyncRegionFromResponse, }; use store_api::region_request::RegionRequest; use store_api::storage::{ConcreteDataType, RegionId, ScanRequest, SequenceNumber}; @@ -113,8 +113,8 @@ impl RegionEngine for MetaRegionEngine { async fn sync_region( &self, _region_id: RegionId, - _manifest_info: RegionManifestInfo, - ) -> Result { + _request: SyncRegionFromRequest, + ) -> Result { unimplemented!() } @@ -125,14 +125,6 @@ impl RegionEngine for MetaRegionEngine { unimplemented!() } - async fn copy_region_from( - &self, - _region_id: RegionId, - _request: CopyRegionFromRequest, - ) -> Result { - unimplemented!() - } - fn role(&self, _region_id: RegionId) -> Option { None } diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index dd7809a0bf..a179d1dc14 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -637,9 +637,64 @@ impl RegionStatistic { } } -/// The response of syncing the manifest. +/// Request to sync the region from a manifest or a region. +#[derive(Debug, Clone)] +pub enum SyncRegionFromRequest { + /// Syncs the region using manifest information. + /// Used in leader-follower manifest sync scenarios. + FromManifest(RegionManifestInfo), + /// Syncs the region from another region. + /// + /// Used by the metric engine to sync logical regions from a source physical region + /// to a target physical region. This copies metadata region SST files and transforms + /// logical region entries to use the target's region number. + FromRegion { + /// The [`RegionId`] of the source region. + source_region_id: RegionId, + /// The parallelism of the sync operation. + parallelism: usize, + }, +} + +impl From for SyncRegionFromRequest { + fn from(manifest_info: RegionManifestInfo) -> Self { + SyncRegionFromRequest::FromManifest(manifest_info) + } +} + +impl SyncRegionFromRequest { + /// Creates a new request from a manifest info. + pub fn from_manifest(manifest_info: RegionManifestInfo) -> Self { + SyncRegionFromRequest::FromManifest(manifest_info) + } + + /// Creates a new request from a region. + pub fn from_region(source_region_id: RegionId, parallelism: usize) -> Self { + SyncRegionFromRequest::FromRegion { + source_region_id, + parallelism, + } + } + + /// Returns true if the request is from a manifest. + pub fn is_from_manifest(&self) -> bool { + matches!(self, SyncRegionFromRequest::FromManifest { .. }) + } + + /// Converts the request to a region manifest info. + /// + /// Returns None if the request is not from a manifest. + pub fn into_region_manifest_info(self) -> Option { + match self { + SyncRegionFromRequest::FromManifest(manifest_info) => Some(manifest_info), + SyncRegionFromRequest::FromRegion { .. } => None, + } + } +} + +/// The response of syncing the region. #[derive(Debug)] -pub enum SyncManifestResponse { +pub enum SyncRegionFromResponse { NotSupported, Mito { /// Indicates if the data region was synced. @@ -656,35 +711,30 @@ pub enum SyncManifestResponse { }, } -impl SyncManifestResponse { +impl SyncRegionFromResponse { /// Returns true if data region is synced. pub fn is_data_synced(&self) -> bool { match self { - SyncManifestResponse::NotSupported => false, - SyncManifestResponse::Mito { synced } => *synced, - SyncManifestResponse::Metric { data_synced, .. } => *data_synced, + SyncRegionFromResponse::NotSupported => false, + SyncRegionFromResponse::Mito { synced } => *synced, + SyncRegionFromResponse::Metric { data_synced, .. } => *data_synced, } } - /// Returns true if the engine is supported the sync operation. - pub fn is_supported(&self) -> bool { - matches!(self, SyncManifestResponse::NotSupported) - } - /// Returns true if the engine is a mito2 engine. pub fn is_mito(&self) -> bool { - matches!(self, SyncManifestResponse::Mito { .. }) + matches!(self, SyncRegionFromResponse::Mito { .. }) } /// Returns true if the engine is a metric engine. pub fn is_metric(&self) -> bool { - matches!(self, SyncManifestResponse::Metric { .. }) + matches!(self, SyncRegionFromResponse::Metric { .. }) } /// Returns the new opened logical region ids. pub fn new_opened_logical_region_ids(self) -> Option> { match self { - SyncManifestResponse::Metric { + SyncRegionFromResponse::Metric { new_opened_logical_region_ids, .. } => Some(new_opened_logical_region_ids), @@ -715,7 +765,7 @@ pub struct RemapManifestsResponse { /// Request to copy files from a source region to a target region. #[derive(Debug, Clone)] -pub struct CopyRegionFromRequest { +pub struct MitoCopyRegionFromRequest { /// The [`RegionId`] of the source region. pub source_region_id: RegionId, /// The parallelism of the copy operation. @@ -728,37 +778,6 @@ pub struct MitoCopyRegionFromResponse { pub copied_file_ids: Vec, } -#[derive(Debug, Clone)] -pub struct MetricCopyRegionFromResponse { - /// The logical regions that were newly opened after the copy operation. - pub new_opened_logical_region_ids: Vec, -} - -/// Response to copy region from a source region to a target region. -#[derive(Debug, Clone)] -pub enum CopyRegionFromResponse { - Mito(MitoCopyRegionFromResponse), - Metric(MetricCopyRegionFromResponse), -} - -impl CopyRegionFromResponse { - /// Converts the response to a mito2 response. - pub fn into_mito(self) -> Option { - match self { - CopyRegionFromResponse::Mito(response) => Some(response), - CopyRegionFromResponse::Metric(_) => None, - } - } - - /// Converts the response to a metric response. - pub fn into_metric(self) -> Option { - match self { - CopyRegionFromResponse::Metric(response) => Some(response), - CopyRegionFromResponse::Mito(_) => None, - } - } -} - #[async_trait] pub trait RegionEngine: Send + Sync { /// Name of this engine @@ -880,8 +899,8 @@ pub trait RegionEngine: Send + Sync { async fn sync_region( &self, region_id: RegionId, - manifest_info: RegionManifestInfo, - ) -> Result; + request: SyncRegionFromRequest, + ) -> Result; /// Remaps manifests from old regions to new regions. async fn remap_manifests( @@ -889,13 +908,6 @@ pub trait RegionEngine: Send + Sync { request: RemapManifestsRequest, ) -> Result; - /// Copies region from a source region to a target region. - async fn copy_region_from( - &self, - region_id: RegionId, - request: CopyRegionFromRequest, - ) -> Result; - /// Sets region role state gracefully. /// /// After the call returns, the engine ensures no more write operations will succeed in the region.