diff --git a/src/common/meta/src/ddl/create_logical_tables/region_request.rs b/src/common/meta/src/ddl/create_logical_tables/region_request.rs index 4f1b0da7d8..ea204078d3 100644 --- a/src/common/meta/src/ddl/create_logical_tables/region_request.rs +++ b/src/common/meta/src/ddl/create_logical_tables/region_request.rs @@ -102,6 +102,6 @@ pub fn create_region_request_builder_from_raw_table_info( raw_table_info: &RawTableInfo, physical_table_id: TableId, ) -> Result { - let template = build_template_from_raw_table_info(raw_table_info)?; + let template = build_template_from_raw_table_info(raw_table_info, false)?; Ok(CreateRequestBuilder::new(template, Some(physical_table_id))) } diff --git a/src/common/meta/src/ddl/create_table/template.rs b/src/common/meta/src/ddl/create_table/template.rs index a94985a9b8..51cbe999f3 100644 --- a/src/common/meta/src/ddl/create_table/template.rs +++ b/src/common/meta/src/ddl/create_table/template.rs @@ -20,7 +20,9 @@ use api::v1::region::{CreateRequest, RegionColumnDef}; use api::v1::{ColumnDef, CreateTableExpr, SemanticType}; use common_telemetry::warn; use snafu::{OptionExt, ResultExt}; -use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY; +use store_api::metric_engine_consts::{ + LOGICAL_TABLE_METADATA_KEY, is_metric_engine_internal_column, +}; use store_api::storage::{RegionId, RegionNumber}; use table::metadata::{RawTableInfo, TableId}; @@ -30,34 +32,45 @@ use crate::wal_provider::prepare_wal_options; /// Constructs a [CreateRequest] based on the provided [RawTableInfo]. /// /// Note: This function is primarily intended for creating logical tables or allocating placeholder regions. -pub fn build_template_from_raw_table_info(raw_table_info: &RawTableInfo) -> Result { +pub fn build_template_from_raw_table_info( + raw_table_info: &RawTableInfo, + skip_internal_columns: bool, +) -> Result { let primary_key_indices = &raw_table_info.meta.primary_key_indices; - let column_defs = raw_table_info + let filtered = raw_table_info .meta .schema .column_schemas .iter() .enumerate() + .filter(|(_, c)| !skip_internal_columns || !is_metric_engine_internal_column(&c.name)) .map(|(i, c)| { let is_primary_key = primary_key_indices.contains(&i); let column_def = try_as_column_def(c, is_primary_key) .context(error::ConvertColumnDefSnafu { column: &c.name })?; - - Ok(RegionColumnDef { - column_def: Some(column_def), - // The column id will be overridden by the metric engine. - // So we just use the index as the column id. - column_id: i as u32, - }) + Ok(( + is_primary_key.then_some(i), + RegionColumnDef { + column_def: Some(column_def), + // The column id will be overridden by the metric engine. + // So we just use the index as the column id. + column_id: i as u32, + }, + )) }) - .collect::>>()?; + .collect::, RegionColumnDef)>>>()?; + let (new_primary_key_indices, column_defs): (Vec<_>, Vec<_>) = filtered.into_iter().unzip(); let options = HashMap::from(&raw_table_info.meta.options); let template = CreateRequest { region_id: 0, engine: raw_table_info.meta.engine.clone(), column_defs, - primary_key: primary_key_indices.iter().map(|i| *i as u32).collect(), + primary_key: new_primary_key_indices + .iter() + .flatten() + .map(|i| *i as u32) + .collect(), path: String::new(), options, partition: None, diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index ef7a79c61d..af911853de 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -17,6 +17,7 @@ use std::fmt::{Display, Formatter}; use std::time::Duration; use serde::{Deserialize, Deserializer, Serialize}; +use store_api::region_engine::SyncRegionFromRequest; use store_api::storage::{FileRefsManifest, GcReport, RegionId, RegionNumber}; use strum::Display; use table::metadata::TableId; @@ -530,6 +531,25 @@ impl Display for EnterStagingRegion { } } +/// Instruction payload for syncing a region from a manifest or another region. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct SyncRegion { + /// Region id to sync. + pub region_id: RegionId, + /// Request to sync the region. + pub request: SyncRegionFromRequest, +} + +impl Display for SyncRegion { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "SyncRegion(region_id={}, request={:?})", + self.region_id, self.request + ) + } +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct RemapManifest { pub region_id: RegionId, @@ -602,8 +622,11 @@ pub enum Instruction { Suspend, /// Makes regions enter staging state. EnterStagingRegions(Vec), + /// Syncs regions. + SyncRegions(Vec), /// Remaps manifests for a region. RemapManifest(RemapManifest), + /// Applies staging manifests for a region. ApplyStagingManifests(Vec), } @@ -669,6 +692,13 @@ impl Instruction { _ => None, } } + + pub fn into_sync_regions(self) -> Option> { + match self { + Self::SyncRegions(sync_regions) => Some(sync_regions), + _ => None, + } + } } /// The reply of [UpgradeRegion]. @@ -784,6 +814,31 @@ impl EnterStagingRegionsReply { } } +/// Reply for a single region sync request. +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] +pub struct SyncRegionReply { + /// Region id of the synced region. + pub region_id: RegionId, + /// Returns true if the region is successfully synced and ready. + pub ready: bool, + /// Indicates whether the region exists. + pub exists: bool, + /// Return error message if any during the operation. + pub error: Option, +} + +/// Reply for a batch of region sync requests. +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] +pub struct SyncRegionsReply { + pub replies: Vec, +} + +impl SyncRegionsReply { + pub fn new(replies: Vec) -> Self { + Self { replies } + } +} + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] pub struct RemapManifestReply { /// Returns false if the region does not exist. @@ -847,6 +902,7 @@ pub enum InstructionReply { GetFileRefs(GetFileRefsReply), GcRegions(GcRegionsReply), EnterStagingRegions(EnterStagingRegionsReply), + SyncRegions(SyncRegionsReply), RemapManifest(RemapManifestReply), ApplyStagingManifests(ApplyStagingManifestsReply), } @@ -872,6 +928,9 @@ impl Display for InstructionReply { reply.replies ) } + Self::SyncRegions(reply) => { + write!(f, "InstructionReply::SyncRegions({:?})", reply.replies) + } Self::RemapManifest(reply) => write!(f, "InstructionReply::RemapManifest({})", reply), Self::ApplyStagingManifests(reply) => write!( f, @@ -926,6 +985,13 @@ impl InstructionReply { } } + pub fn expect_sync_regions_reply(self) -> Vec { + match self { + Self::SyncRegions(reply) => reply.replies, + _ => panic!("Expected SyncRegion reply"), + } + } + pub fn expect_remap_manifest_reply(self) -> RemapManifestReply { match self { Self::RemapManifest(reply) => reply, diff --git a/src/common/meta/src/reconciliation/reconcile_logical_tables/reconcile_regions.rs b/src/common/meta/src/reconciliation/reconcile_logical_tables/reconcile_regions.rs index 17abfe70d1..598fae4781 100644 --- a/src/common/meta/src/reconciliation/reconcile_logical_tables/reconcile_regions.rs +++ b/src/common/meta/src/reconciliation/reconcile_logical_tables/reconcile_regions.rs @@ -150,7 +150,7 @@ fn create_region_request_from_raw_table_info( raw_table_info: &RawTableInfo, physical_table_id: TableId, ) -> Result { - let template = build_template_from_raw_table_info(raw_table_info)?; + let template = build_template_from_raw_table_info(raw_table_info, false)?; Ok(CreateRequestBuilder::new(template, Some(physical_table_id))) } diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index 8ad8f9da1b..daca5eb0a4 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -31,6 +31,7 @@ mod flush_region; mod gc_worker; mod open_region; mod remap_manifest; +mod sync_region; mod upgrade_region; use crate::heartbeat::handler::apply_staging_manifest::ApplyStagingManifestsHandler; @@ -42,6 +43,7 @@ use crate::heartbeat::handler::flush_region::FlushRegionsHandler; use crate::heartbeat::handler::gc_worker::GcRegionsHandler; use crate::heartbeat::handler::open_region::OpenRegionsHandler; use crate::heartbeat::handler::remap_manifest::RemapManifestHandler; +use crate::heartbeat::handler::sync_region::SyncRegionHandler; use crate::heartbeat::handler::upgrade_region::UpgradeRegionsHandler; use crate::heartbeat::task_tracker::TaskTracker; use crate::region_server::RegionServer; @@ -132,6 +134,7 @@ impl RegionHeartbeatResponseHandler { Instruction::EnterStagingRegions(_) => { Ok(Some(Box::new(EnterStagingRegionsHandler.into()))) } + Instruction::SyncRegions(_) => Ok(Some(Box::new(SyncRegionHandler.into()))), Instruction::RemapManifest(_) => Ok(Some(Box::new(RemapManifestHandler.into()))), Instruction::ApplyStagingManifests(_) => { Ok(Some(Box::new(ApplyStagingManifestsHandler.into()))) @@ -150,6 +153,7 @@ pub enum InstructionHandlers { GetFileRefs(GetFileRefsHandler), GcRegions(GcRegionsHandler), EnterStagingRegions(EnterStagingRegionsHandler), + SyncRegions(SyncRegionHandler), RemapManifest(RemapManifestHandler), ApplyStagingManifests(ApplyStagingManifestsHandler), } @@ -175,6 +179,7 @@ impl_from_handler!( GetFileRefsHandler => GetFileRefs, GcRegionsHandler => GcRegions, EnterStagingRegionsHandler => EnterStagingRegions, + SyncRegionHandler => SyncRegions, RemapManifestHandler => RemapManifest, ApplyStagingManifestsHandler => ApplyStagingManifests ); @@ -222,6 +227,7 @@ dispatch_instr!( GetFileRefs => GetFileRefs, GcRegions => GcRegions, EnterStagingRegions => EnterStagingRegions, + SyncRegions => SyncRegions, RemapManifest => RemapManifest, ApplyStagingManifests => ApplyStagingManifests, ); diff --git a/src/datanode/src/heartbeat/handler/apply_staging_manifest.rs b/src/datanode/src/heartbeat/handler/apply_staging_manifest.rs index e3ee481f98..1ad5baa56a 100644 --- a/src/datanode/src/heartbeat/handler/apply_staging_manifest.rs +++ b/src/datanode/src/heartbeat/handler/apply_staging_manifest.rs @@ -48,19 +48,32 @@ impl ApplyStagingManifestsHandler { ctx: &HandlerContext, request: ApplyStagingManifest, ) -> ApplyStagingManifestReply { - let Some(leader) = ctx.region_server.is_region_leader(request.region_id) else { - warn!("Region: {} is not found", request.region_id); + let ApplyStagingManifest { + region_id, + ref partition_expr, + central_region_id, + ref manifest_path, + } = request; + common_telemetry::info!( + "Datanode received apply staging manifest request, region_id: {}, central_region_id: {}, partition_expr: {}, manifest_path: {}", + region_id, + central_region_id, + partition_expr, + manifest_path + ); + let Some(leader) = ctx.region_server.is_region_leader(region_id) else { + warn!("Region: {} is not found", region_id); return ApplyStagingManifestReply { - region_id: request.region_id, + region_id, exists: false, ready: false, error: None, }; }; if !leader { - warn!("Region: {} is not leader", request.region_id); + warn!("Region: {} is not leader", region_id); return ApplyStagingManifestReply { - region_id: request.region_id, + region_id, exists: true, ready: false, error: Some("Region is not leader".into()), @@ -70,25 +83,25 @@ impl ApplyStagingManifestsHandler { match ctx .region_server .handle_request( - request.region_id, + region_id, RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest { - partition_expr: request.partition_expr, - central_region_id: request.central_region_id, - manifest_path: request.manifest_path, + partition_expr: partition_expr.clone(), + central_region_id, + manifest_path: manifest_path.clone(), }), ) .await { Ok(_) => ApplyStagingManifestReply { - region_id: request.region_id, + region_id, exists: true, ready: true, error: None, }, Err(err) => { - error!(err; "Failed to apply staging manifest"); + error!(err; "Failed to apply staging manifest, region_id: {}", region_id); ApplyStagingManifestReply { - region_id: request.region_id, + region_id, exists: true, ready: false, error: Some(format!("{err:?}")), diff --git a/src/datanode/src/heartbeat/handler/enter_staging.rs b/src/datanode/src/heartbeat/handler/enter_staging.rs index 9ca2eab1b8..2b9e35ded6 100644 --- a/src/datanode/src/heartbeat/handler/enter_staging.rs +++ b/src/datanode/src/heartbeat/handler/enter_staging.rs @@ -51,6 +51,11 @@ impl EnterStagingRegionsHandler { partition_expr, }: EnterStagingRegion, ) -> EnterStagingRegionReply { + common_telemetry::info!( + "Datanode received enter staging region: {}, partition_expr: {}", + region_id, + partition_expr + ); let Some(writable) = ctx.region_server.is_region_leader(region_id) else { warn!("Region: {} is not found", region_id); return EnterStagingRegionReply { @@ -85,7 +90,7 @@ impl EnterStagingRegionsHandler { error: None, }, Err(err) => { - error!(err; "Failed to enter staging region"); + error!(err; "Failed to enter staging region, region_id: {}", region_id); EnterStagingRegionReply { region_id, ready: false, diff --git a/src/datanode/src/heartbeat/handler/remap_manifest.rs b/src/datanode/src/heartbeat/handler/remap_manifest.rs index 3f57545659..f3fcf72710 100644 --- a/src/datanode/src/heartbeat/handler/remap_manifest.rs +++ b/src/datanode/src/heartbeat/handler/remap_manifest.rs @@ -13,7 +13,7 @@ // limitations under the License. use common_meta::instruction::{InstructionReply, RemapManifest, RemapManifestReply}; -use common_telemetry::warn; +use common_telemetry::{error, info, warn}; use store_api::region_engine::RemapManifestsRequest; use crate::heartbeat::handler::{HandlerContext, InstructionHandler}; @@ -34,6 +34,12 @@ impl InstructionHandler for RemapManifestHandler { region_mapping, new_partition_exprs, } = request; + info!( + "Datanode received remap manifest request, region_id: {}, input_regions: {}, target_regions: {}", + region_id, + input_regions.len(), + new_partition_exprs.len() + ); let Some(leader) = ctx.region_server.is_region_leader(region_id) else { warn!("Region: {} is not found", region_id); return Some(InstructionReply::RemapManifest(RemapManifestReply { @@ -67,11 +73,18 @@ impl InstructionHandler for RemapManifestHandler { manifest_paths: result.manifest_paths, error: None, }), - Err(e) => InstructionReply::RemapManifest(RemapManifestReply { - exists: true, - manifest_paths: Default::default(), - error: Some(format!("{e:?}")), - }), + Err(e) => { + error!( + e; + "Remap manifests failed on datanode, region_id: {}", + region_id + ); + InstructionReply::RemapManifest(RemapManifestReply { + exists: true, + manifest_paths: Default::default(), + error: Some(format!("{e:?}")), + }) + } }; Some(reply) diff --git a/src/datanode/src/heartbeat/handler/sync_region.rs b/src/datanode/src/heartbeat/handler/sync_region.rs new file mode 100644 index 0000000000..476cb20b00 --- /dev/null +++ b/src/datanode/src/heartbeat/handler/sync_region.rs @@ -0,0 +1,192 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_meta::instruction::{InstructionReply, SyncRegion, SyncRegionReply, SyncRegionsReply}; +use common_telemetry::{error, info, warn}; +use futures::future::join_all; + +use crate::heartbeat::handler::{HandlerContext, InstructionHandler}; + +/// Handler for [SyncRegion] instruction. +/// It syncs the region from a manifest or another region. +#[derive(Debug, Clone, Copy, Default)] +pub struct SyncRegionHandler; + +#[async_trait::async_trait] +impl InstructionHandler for SyncRegionHandler { + type Instruction = Vec; + + /// Handles a batch of [SyncRegion] instructions. + async fn handle( + &self, + ctx: &HandlerContext, + regions: Self::Instruction, + ) -> Option { + let futures = regions + .into_iter() + .map(|sync_region| Self::handle_sync_region(ctx, sync_region)); + let results = join_all(futures).await; + + Some(InstructionReply::SyncRegions(SyncRegionsReply::new( + results, + ))) + } +} + +impl SyncRegionHandler { + /// Handles a single [SyncRegion] instruction. + async fn handle_sync_region( + ctx: &HandlerContext, + SyncRegion { region_id, request }: SyncRegion, + ) -> SyncRegionReply { + let Some(writable) = ctx.region_server.is_region_leader(region_id) else { + warn!("Region: {} is not found", region_id); + return SyncRegionReply { + region_id, + ready: false, + exists: false, + error: None, + }; + }; + + if !writable { + warn!("Region: {} is not writable", region_id); + return SyncRegionReply { + region_id, + ready: false, + exists: true, + error: Some("Region is not writable".into()), + }; + } + + match ctx.region_server.sync_region(region_id, request).await { + Ok(_) => { + info!("Successfully synced region: {}", region_id); + SyncRegionReply { + region_id, + ready: true, + exists: true, + error: None, + } + } + Err(e) => { + error!(e; "Failed to sync region: {}", region_id); + SyncRegionReply { + region_id, + ready: false, + exists: true, + error: Some(format!("{:?}", e)), + } + } + } + } +} + +#[cfg(test)] +mod tests { + use store_api::metric_engine_consts::METRIC_ENGINE_NAME; + use store_api::region_engine::{RegionRole, SyncRegionFromRequest}; + use store_api::storage::RegionId; + + use crate::heartbeat::handler::sync_region::SyncRegionHandler; + use crate::heartbeat::handler::{HandlerContext, InstructionHandler}; + use crate::tests::{MockRegionEngine, mock_region_server}; + + #[tokio::test] + async fn test_handle_sync_region_not_found() { + let mut mock_region_server = mock_region_server(); + let (mock_engine, _) = MockRegionEngine::new(METRIC_ENGINE_NAME); + mock_region_server.register_engine(mock_engine); + + let handler_context = HandlerContext::new_for_test(mock_region_server); + let handler = SyncRegionHandler; + + let region_id = RegionId::new(1024, 1); + let sync_region = common_meta::instruction::SyncRegion { + region_id, + request: SyncRegionFromRequest::from_manifest(Default::default()), + }; + + let reply = handler + .handle(&handler_context, vec![sync_region]) + .await + .unwrap() + .expect_sync_regions_reply(); + + assert_eq!(reply.len(), 1); + assert_eq!(reply[0].region_id, region_id); + assert!(!reply[0].exists); + assert!(!reply[0].ready); + } + + #[tokio::test] + async fn test_handle_sync_region_not_writable() { + let mock_region_server = mock_region_server(); + let region_id = RegionId::new(1024, 1); + let (mock_engine, _) = MockRegionEngine::with_custom_apply_fn(METRIC_ENGINE_NAME, |r| { + r.mock_role = Some(Some(RegionRole::Follower)); + }); + mock_region_server.register_test_region(region_id, mock_engine); + + let handler_context = HandlerContext::new_for_test(mock_region_server); + let handler = SyncRegionHandler; + + let sync_region = common_meta::instruction::SyncRegion { + region_id, + request: SyncRegionFromRequest::from_manifest(Default::default()), + }; + + let reply = handler + .handle(&handler_context, vec![sync_region]) + .await + .unwrap() + .expect_sync_regions_reply(); + + assert_eq!(reply.len(), 1); + assert_eq!(reply[0].region_id, region_id); + assert!(reply[0].exists); + assert!(!reply[0].ready); + assert!(reply[0].error.is_some()); + } + + #[tokio::test] + async fn test_handle_sync_region_success() { + let mock_region_server = mock_region_server(); + let region_id = RegionId::new(1024, 1); + let (mock_engine, _) = MockRegionEngine::with_custom_apply_fn(METRIC_ENGINE_NAME, |r| { + r.mock_role = Some(Some(RegionRole::Leader)); + }); + mock_region_server.register_test_region(region_id, mock_engine); + + let handler_context = HandlerContext::new_for_test(mock_region_server); + let handler = SyncRegionHandler; + + let sync_region = common_meta::instruction::SyncRegion { + region_id, + request: SyncRegionFromRequest::from_manifest(Default::default()), + }; + + let reply = handler + .handle(&handler_context, vec![sync_region]) + .await + .unwrap() + .expect_sync_regions_reply(); + + assert_eq!(reply.len(), 1); + assert_eq!(reply[0].region_id, region_id); + assert!(reply[0].exists); + assert!(reply[0].ready); + assert!(reply[0].error.is_none()); + } +} diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index 3486d7b416..4928e92545 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -115,12 +115,17 @@ pub type MockSetReadonlyGracefullyHandler = pub type MockGetMetadataHandler = Box Result + Send + Sync>; +pub type MockSyncRegionHandler = Box< + dyn Fn(RegionId, SyncRegionFromRequest) -> Result + Send + Sync, +>; + pub struct MockRegionEngine { sender: Sender<(RegionId, RegionRequest)>, pub(crate) handle_request_delay: Option, pub(crate) handle_request_mock_fn: Option, pub(crate) handle_set_readonly_gracefully_mock_fn: Option, pub(crate) handle_get_metadata_mock_fn: Option, + pub(crate) handle_sync_region_mock_fn: Option, pub(crate) mock_role: Option>, engine: String, } @@ -136,6 +141,7 @@ impl MockRegionEngine { handle_request_mock_fn: None, handle_set_readonly_gracefully_mock_fn: None, handle_get_metadata_mock_fn: None, + handle_sync_region_mock_fn: None, mock_role: None, engine: engine.to_string(), }), @@ -156,6 +162,7 @@ impl MockRegionEngine { handle_request_mock_fn: Some(mock_fn), handle_set_readonly_gracefully_mock_fn: None, handle_get_metadata_mock_fn: None, + handle_sync_region_mock_fn: None, mock_role: None, engine: engine.to_string(), }), @@ -176,6 +183,7 @@ impl MockRegionEngine { handle_request_mock_fn: None, handle_set_readonly_gracefully_mock_fn: None, handle_get_metadata_mock_fn: Some(mock_fn), + handle_sync_region_mock_fn: None, mock_role: None, engine: engine.to_string(), }), @@ -197,6 +205,7 @@ impl MockRegionEngine { handle_request_mock_fn: None, handle_set_readonly_gracefully_mock_fn: None, handle_get_metadata_mock_fn: None, + handle_sync_region_mock_fn: None, mock_role: None, engine: engine.to_string(), }; @@ -286,10 +295,14 @@ impl RegionEngine for MockRegionEngine { async fn sync_region( &self, - _region_id: RegionId, - _request: SyncRegionFromRequest, + region_id: RegionId, + request: SyncRegionFromRequest, ) -> Result { - unimplemented!() + if let Some(mock_fn) = &self.handle_sync_region_mock_fn { + return mock_fn(region_id, request).map_err(BoxedError::new); + }; + + Ok(SyncRegionFromResponse::Mito { synced: true }) } async fn remap_manifests( diff --git a/src/meta-srv/src/procedure/region_migration/flush_leader_region.rs b/src/meta-srv/src/procedure/region_migration/flush_leader_region.rs index f9e5900cbb..f3dc0ee661 100644 --- a/src/meta-srv/src/procedure/region_migration/flush_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/flush_leader_region.rs @@ -14,19 +14,15 @@ use std::any::Any; -use api::v1::meta::MailboxMessage; -use common_meta::instruction::{FlushErrorStrategy, FlushRegions, Instruction, InstructionReply}; use common_procedure::{Context as ProcedureContext, Status}; -use common_telemetry::{info, warn}; use serde::{Deserialize, Serialize}; -use snafu::{OptionExt, ResultExt}; +use snafu::OptionExt; use tokio::time::Instant; -use crate::error::{self, Error, Result}; -use crate::handler::HeartbeatMailbox; +use crate::error::{self, Result}; use crate::procedure::region_migration::update_metadata::UpdateMetadata; use crate::procedure::region_migration::{Context, State}; -use crate::service::mailbox::Channel; +use crate::procedure::utils; /// Flushes the leader region before downgrading it. /// @@ -61,15 +57,6 @@ impl State for PreFlushRegion { } impl PreFlushRegion { - /// Builds flush leader region instruction. - fn build_flush_leader_region_instruction(&self, ctx: &Context) -> Instruction { - let pc = &ctx.persistent_ctx; - Instruction::FlushRegions(FlushRegions::sync_batch( - pc.region_ids.clone(), - FlushErrorStrategy::TryAll, - )) - } - /// Tries to flush a leader region. /// /// Ignore: @@ -89,109 +76,18 @@ impl PreFlushRegion { .context(error::ExceededDeadlineSnafu { operation: "Flush leader region", })?; - let flush_instruction = self.build_flush_leader_region_instruction(ctx); let region_ids = &ctx.persistent_ctx.region_ids; let leader = &ctx.persistent_ctx.from_peer; - let msg = MailboxMessage::json_message( - &format!("Flush leader region: {:?}", region_ids), - &format!("Metasrv@{}", ctx.server_addr()), - &format!("Datanode-{}@{}", leader.id, leader.addr), - common_time::util::current_time_millis(), - &flush_instruction, + utils::flush_region( + &ctx.mailbox, + &ctx.server_addr, + region_ids, + leader, + operation_timeout, + utils::ErrorStrategy::Ignore, ) - .with_context(|_| error::SerializeToJsonSnafu { - input: flush_instruction.to_string(), - })?; - - let ch = Channel::Datanode(leader.id); - let now = Instant::now(); - let result = ctx.mailbox.send(&ch, msg, operation_timeout).await; - - match result { - Ok(receiver) => match receiver.await { - Ok(msg) => { - let reply = HeartbeatMailbox::json_reply(&msg)?; - info!( - "Received flush leader region reply: {:?}, region: {:?}, elapsed: {:?}", - reply, - region_ids, - now.elapsed() - ); - - let reply_result = match reply { - InstructionReply::FlushRegions(flush_reply) => { - if flush_reply.results.len() != region_ids.len() { - return error::UnexpectedInstructionReplySnafu { - mailbox_message: msg.to_string(), - reason: format!( - "expect {} region flush result, but got {}", - region_ids.len(), - flush_reply.results.len() - ), - } - .fail(); - } - - match flush_reply.overall_success { - true => (true, None), - false => ( - false, - Some( - flush_reply - .results - .iter() - .filter_map(|(region_id, result)| match result { - Ok(_) => None, - Err(e) => Some(format!("{}: {}", region_id, e)), - }) - .collect::>() - .join("; "), - ), - ), - } - } - _ => { - return error::UnexpectedInstructionReplySnafu { - mailbox_message: msg.to_string(), - reason: "expect flush region reply", - } - .fail(); - } - }; - let (result, error) = reply_result; - - if let Some(error) = error { - warn!( - "Failed to flush leader regions {:?} on datanode {:?}, error: {}. Skip flush operation.", - region_ids, leader, &error - ); - } else if result { - info!( - "The flush leader regions {:?} on datanode {:?} is successful, elapsed: {:?}", - region_ids, - leader, - now.elapsed() - ); - } - - Ok(()) - } - Err(Error::MailboxTimeout { .. }) => error::ExceededDeadlineSnafu { - operation: "Flush leader regions", - } - .fail(), - Err(err) => Err(err), - }, - Err(Error::PusherNotFound { .. }) => { - warn!( - "Failed to flush leader regions({:?}), the datanode({}) is unreachable(PusherNotFound). Skip flush operation.", - region_ids, leader - ); - Ok(()) - } - Err(err) => Err(err), - } + .await } } @@ -202,11 +98,13 @@ mod tests { use store_api::storage::RegionId; use super::*; + use crate::error::Error; use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context}; use crate::procedure::region_migration::{ContextFactory, PersistentContext}; use crate::procedure::test_util::{ new_close_region_reply, new_flush_region_reply_for_region, send_mock_reply, }; + use crate::service::mailbox::Channel; fn new_persistent_context() -> PersistentContext { test_util::new_persistent_context(1, 2, RegionId::new(1024, 1)) diff --git a/src/meta-srv/src/procedure/repartition.rs b/src/meta-srv/src/procedure/repartition.rs index f734677803..a97a8f26ef 100644 --- a/src/meta-srv/src/procedure/repartition.rs +++ b/src/meta-srv/src/procedure/repartition.rs @@ -47,7 +47,7 @@ use common_procedure::{ BoxedProcedure, Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, ProcedureManagerRef, Result as ProcedureResult, Status, StringKey, UserMetadata, }; -use common_telemetry::error; +use common_telemetry::{error, info}; use partition::expr::PartitionExpr; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; @@ -232,7 +232,10 @@ impl Context { &new_region_routes, table_id, )?; - + info!( + "Updating table route for table: {}, new region routes: {:?}", + table_id, new_region_routes + ); self.table_metadata_manager .update_table_route( table_id, @@ -262,6 +265,13 @@ impl Context { .await; Ok(()) } + + /// Returns the next operation timeout. + /// + /// If the next operation timeout is not set, it will return `None`. + pub fn next_operation_timeout(&self) -> Option { + Some(std::time::Duration::from_secs(10)) + } } #[async_trait::async_trait] @@ -335,6 +345,13 @@ impl Procedure for RepartitionProcedure { async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { let state = &mut self.state; + let state_name = state.name(); + // Log state transition + common_telemetry::info!( + "Repartition procedure executing state: {}, table_id: {}", + state_name, + self.context.persistent_ctx.table_id + ); match state.next(&mut self.context, _ctx).await { Ok((next, status)) => { *state = next; diff --git a/src/meta-srv/src/procedure/repartition/allocate_region.rs b/src/meta-srv/src/procedure/repartition/allocate_region.rs index ceb50ba732..532090d6e3 100644 --- a/src/meta-srv/src/procedure/repartition/allocate_region.rs +++ b/src/meta-srv/src/procedure/repartition/allocate_region.rs @@ -65,6 +65,12 @@ impl State for AllocateRegion { &mut next_region_number, &self.plan_entries, ); + let plan_count = repartition_plan_entries.len(); + let to_allocate = Self::count_regions_to_allocate(&repartition_plan_entries); + info!( + "Repartition allocate regions start, table_id: {}, groups: {}, regions_to_allocate: {}", + table_id, plan_count, to_allocate + ); // If no region to allocate, directly dispatch the plan. if Self::count_regions_to_allocate(&repartition_plan_entries) == 0 { @@ -99,6 +105,20 @@ impl State for AllocateRegion { .await .context(error::AllocateWalOptionsSnafu { table_id })?; + let new_region_count = new_allocated_region_routes.len(); + let new_regions_brief: Vec<_> = new_allocated_region_routes + .iter() + .map(|route| { + let region_id = route.region.id; + let peer = route.leader_peer.as_ref().map(|p| p.id).unwrap_or_default(); + format!("region_id: {}, peer: {}", region_id, peer) + }) + .collect(); + info!( + "Allocated regions for repartition, table_id: {}, new_region_count: {}, new_regions: {:?}", + table_id, new_region_count, new_regions_brief + ); + let _operating_guards = Self::register_operating_regions( &ctx.memory_region_keeper, &new_allocated_region_routes, @@ -137,7 +157,6 @@ impl AllocateRegion { Self { plan_entries } } - #[allow(dead_code)] fn register_operating_regions( memory_region_keeper: &MemoryRegionKeeperRef, region_routes: &[RegionRoute], @@ -155,7 +174,6 @@ impl AllocateRegion { Ok(operating_guards) } - #[allow(dead_code)] fn generate_region_routes( region_routes: &[RegionRoute], new_allocated_region_ids: &[RegionRoute], @@ -177,7 +195,6 @@ impl AllocateRegion { /// /// This method takes the allocation plan entries and converts them to repartition plan entries, /// updating `next_region_number` for each newly allocated region. - #[allow(dead_code)] fn convert_to_repartition_plans( table_id: TableId, next_region_number: &mut RegionNumber, @@ -196,7 +213,6 @@ impl AllocateRegion { } /// Collects all regions that need to be allocated from the repartition plan entries. - #[allow(dead_code)] fn collect_allocate_regions( repartition_plan_entries: &[RepartitionPlanEntry], ) -> Vec<&RegionDescriptor> { @@ -207,7 +223,6 @@ impl AllocateRegion { } /// Prepares region allocation data: region numbers and their partition expressions. - #[allow(dead_code)] fn prepare_region_allocation_data( allocate_regions: &[&RegionDescriptor], ) -> Result> { @@ -225,7 +240,6 @@ impl AllocateRegion { } /// Calculates the total number of regions that need to be allocated. - #[allow(dead_code)] fn count_regions_to_allocate(repartition_plan_entries: &[RepartitionPlanEntry]) -> usize { repartition_plan_entries .iter() @@ -234,12 +248,10 @@ impl AllocateRegion { } /// Gets the next region number from the physical table route. - #[allow(dead_code)] fn get_next_region_number(max_region_number: RegionNumber) -> RegionNumber { max_region_number + 1 } - #[allow(dead_code)] async fn allocate_regions( node_manager: &NodeManagerRef, raw_table_info: &RawTableInfo, @@ -252,12 +264,14 @@ impl AllocateRegion { &raw_table_info.name, ); let table_id = raw_table_info.ident.table_id; - let request = build_template_from_raw_table_info(raw_table_info) + let request = build_template_from_raw_table_info(raw_table_info, true) .context(error::BuildCreateRequestSnafu { table_id })?; let builder = CreateRequestBuilder::new(request, None); + let region_count = region_routes.len(); + let wal_region_count = wal_options.len(); info!( - "Allocating regions for table: {}, region_routes: {:?}, wal_options: {:?}", - table_id, region_routes, wal_options + "Allocating regions on datanodes, table_id: {}, region_count: {}, wal_regions: {}", + table_id, region_count, wal_region_count ); let executor = CreateTableExecutor::new(table_ref.into(), false, builder); executor diff --git a/src/meta-srv/src/procedure/repartition/collect.rs b/src/meta-srv/src/procedure/repartition/collect.rs index 3efc508a1d..6381345e4d 100644 --- a/src/meta-srv/src/procedure/repartition/collect.rs +++ b/src/meta-srv/src/procedure/repartition/collect.rs @@ -15,7 +15,7 @@ use std::any::Any; use common_procedure::{Context as ProcedureContext, ProcedureId, Status, watcher}; -use common_telemetry::error; +use common_telemetry::{error, info}; use serde::{Deserialize, Serialize}; use snafu::ResultExt; @@ -64,9 +64,10 @@ impl Collect { impl State for Collect { async fn next( &mut self, - _ctx: &mut Context, + ctx: &mut Context, procedure_ctx: &ProcedureContext, ) -> Result<(Box, Status)> { + let table_id = ctx.persistent_ctx.table_id; for procedure_meta in self.inflight_procedures.iter() { let procedure_id = procedure_meta.procedure_id; let group_id = procedure_meta.group_id; @@ -93,7 +94,16 @@ impl State for Collect { } } - if !self.failed_procedures.is_empty() || !self.unknown_procedures.is_empty() { + let inflight = self.inflight_procedures.len(); + let succeeded = self.succeeded_procedures.len(); + let failed = self.failed_procedures.len(); + let unknown = self.unknown_procedures.len(); + info!( + "Collected repartition group results for table_id: {}, inflight: {}, succeeded: {}, failed: {}, unknown: {}", + table_id, inflight, succeeded, failed, unknown + ); + + if failed > 0 || unknown > 0 { // TODO(weny): retry the failed or unknown procedures. } diff --git a/src/meta-srv/src/procedure/repartition/deallocate_region.rs b/src/meta-srv/src/procedure/repartition/deallocate_region.rs index 48bfc31936..ff919e2e4f 100644 --- a/src/meta-srv/src/procedure/repartition/deallocate_region.rs +++ b/src/meta-srv/src/procedure/repartition/deallocate_region.rs @@ -62,9 +62,10 @@ impl State for DeallocateRegion { .flat_map(|p| p.pending_deallocate_region_ids.iter()) .cloned() .collect::>(); + let dealloc_count = pending_deallocate_region_ids.len(); info!( - "Deallocating regions: {:?} for table: {} during repartition procedure", - pending_deallocate_region_ids, table_id + "Deallocating regions for repartition, table_id: {}, count: {}, regions: {:?}", + table_id, dealloc_count, pending_deallocate_region_ids ); let table_lock = TableLock::Write(table_id).into(); @@ -111,7 +112,6 @@ impl State for DeallocateRegion { } impl DeallocateRegion { - #[allow(dead_code)] async fn deallocate_regions( node_manager: &NodeManagerRef, leader_region_registry: &LeaderRegionRegistryRef, @@ -136,7 +136,6 @@ impl DeallocateRegion { Ok(()) } - #[allow(dead_code)] fn filter_deallocatable_region_routes( table_id: TableId, region_routes: &[RegionRoute], @@ -161,7 +160,6 @@ impl DeallocateRegion { .collect::>() } - #[allow(dead_code)] fn generate_region_routes( region_routes: &[RegionRoute], pending_deallocate_region_ids: &HashSet, diff --git a/src/meta-srv/src/procedure/repartition/dispatch.rs b/src/meta-srv/src/procedure/repartition/dispatch.rs index d8d7b2fc7b..5b5f6e86fc 100644 --- a/src/meta-srv/src/procedure/repartition/dispatch.rs +++ b/src/meta-srv/src/procedure/repartition/dispatch.rs @@ -16,7 +16,9 @@ use std::any::Any; use std::collections::HashMap; use common_procedure::{Context as ProcedureContext, ProcedureWithId, Status}; +use common_telemetry::info; use serde::{Deserialize, Serialize}; +use store_api::metric_engine_consts::METRIC_ENGINE_NAME; use store_api::storage::RegionId; use crate::error::Result; @@ -28,7 +30,6 @@ use crate::procedure::repartition::{self, Context, State}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Dispatch; -#[allow(dead_code)] fn build_region_mapping( source_regions: &[RegionDescriptor], target_regions: &[RegionDescriptor], @@ -57,8 +58,12 @@ impl State for Dispatch { _procedure_ctx: &ProcedureContext, ) -> Result<(Box, Status)> { let table_id = ctx.persistent_ctx.table_id; - let mut procedures = Vec::with_capacity(ctx.persistent_ctx.plans.len()); - let mut procedure_metas = Vec::with_capacity(ctx.persistent_ctx.plans.len()); + let table_info_value = ctx.get_table_info_value().await?; + let table_engine = table_info_value.table_info.meta.engine; + let sync_region = table_engine == METRIC_ENGINE_NAME; + let plan_count = ctx.persistent_ctx.plans.len(); + let mut procedures = Vec::with_capacity(plan_count); + let mut procedure_metas = Vec::with_capacity(plan_count); for (plan_index, plan) in ctx.persistent_ctx.plans.iter().enumerate() { let region_mapping = build_region_mapping( &plan.source_regions, @@ -73,6 +78,9 @@ impl State for Dispatch { plan.source_regions.clone(), plan.target_regions.clone(), region_mapping, + sync_region, + plan.allocated_region_ids.clone(), + plan.pending_deallocate_region_ids.clone(), ); let group_procedure = RepartitionGroupProcedure::new(persistent_ctx, ctx); @@ -85,6 +93,14 @@ impl State for Dispatch { procedures.push(procedure); } + let group_ids: Vec<_> = procedure_metas.iter().map(|m| m.group_id).collect(); + info!( + "Dispatch repartition groups for table_id: {}, group_count: {}, group_ids: {:?}", + table_id, + group_ids.len(), + group_ids + ); + Ok(( Box::new(Collect::new(procedure_metas)), Status::suspended(procedures, true), diff --git a/src/meta-srv/src/procedure/repartition/group.rs b/src/meta-srv/src/procedure/repartition/group.rs index a80d3388cf..1604205044 100644 --- a/src/meta-srv/src/procedure/repartition/group.rs +++ b/src/meta-srv/src/procedure/repartition/group.rs @@ -17,6 +17,7 @@ pub(crate) mod enter_staging_region; pub(crate) mod remap_manifest; pub(crate) mod repartition_end; pub(crate) mod repartition_start; +pub(crate) mod sync_region; pub(crate) mod update_metadata; pub(crate) mod utils; @@ -40,7 +41,7 @@ use common_procedure::{ Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, Result as ProcedureResult, Status, StringKey, UserMetadata, }; -use common_telemetry::error; +use common_telemetry::{error, info}; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use store_api::storage::{RegionId, TableId}; @@ -55,7 +56,6 @@ use crate::service::mailbox::MailboxRef; pub type GroupId = Uuid; -#[allow(dead_code)] pub struct RepartitionGroupProcedure { state: Box, context: Context, @@ -113,6 +113,14 @@ impl Procedure for RepartitionGroupProcedure { async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { let state = &mut self.state; + let state_name = state.name(); + // Log state transition + common_telemetry::info!( + "Repartition group procedure executing state: {}, group id: {}, table id: {}", + state_name, + self.context.persistent_ctx.group_id, + self.context.persistent_ctx.table_id + ); match state.next(&mut self.context, _ctx).await { Ok((next, status)) => { @@ -221,9 +229,16 @@ pub struct PersistentContext { /// The staging manifest paths of the repartition group. /// The value will be set in [RemapManifest](crate::procedure::repartition::group::remap_manifest::RemapManifest) state. pub staging_manifest_paths: HashMap, + /// Whether sync region is needed for this group. + pub sync_region: bool, + /// The region ids of the newly allocated regions. + pub allocated_region_ids: Vec, + /// The region ids of the regions that are pending deallocation. + pub pending_deallocate_region_ids: Vec, } impl PersistentContext { + #[allow(clippy::too_many_arguments)] pub fn new( group_id: GroupId, table_id: TableId, @@ -232,6 +247,9 @@ impl PersistentContext { sources: Vec, targets: Vec, region_mapping: HashMap>, + sync_region: bool, + allocated_region_ids: Vec, + pending_deallocate_region_ids: Vec, ) -> Self { Self { group_id, @@ -243,6 +261,9 @@ impl PersistentContext { region_mapping, group_prepare_result: None, staging_manifest_paths: HashMap::new(), + sync_region, + allocated_region_ids, + pending_deallocate_region_ids, } } @@ -334,6 +355,7 @@ impl Context { new_region_routes: Vec, ) -> Result<()> { let table_id = self.persistent_ctx.table_id; + let group_id = self.persistent_ctx.group_id; // Safety: prepare result is set in [RepartitionStart] state. let prepare_result = self.persistent_ctx.group_prepare_result.as_ref().unwrap(); let central_region_datanode_table_value = self @@ -345,6 +367,10 @@ impl Context { .. } = ¢ral_region_datanode_table_value.region_info; + info!( + "Updating table route for table: {}, group_id: {}, new region routes: {:?}", + table_id, group_id, new_region_routes + ); self.table_metadata_manager .update_table_route( table_id, diff --git a/src/meta-srv/src/procedure/repartition/group/apply_staging_manifest.rs b/src/meta-srv/src/procedure/repartition/group/apply_staging_manifest.rs index 973c14f5a5..f80941cda0 100644 --- a/src/meta-srv/src/procedure/repartition/group/apply_staging_manifest.rs +++ b/src/meta-srv/src/procedure/repartition/group/apply_staging_manifest.rs @@ -31,7 +31,7 @@ use store_api::storage::RegionId; use crate::error::{self, Error, Result}; use crate::handler::HeartbeatMailbox; -use crate::procedure::repartition::group::repartition_end::RepartitionEnd; +use crate::procedure::repartition::group::update_metadata::UpdateMetadata; use crate::procedure::repartition::group::utils::{ HandleMultipleResult, group_region_routes_by_peer, handle_multiple_results, }; @@ -52,7 +52,10 @@ impl State for ApplyStagingManifest { ) -> Result<(Box, Status)> { self.apply_staging_manifests(ctx).await?; - Ok((Box::new(RepartitionEnd), Status::executing(true))) + Ok(( + Box::new(UpdateMetadata::ExitStaging), + Status::executing(true), + )) } fn as_any(&self) -> &dyn Any { @@ -125,7 +128,6 @@ impl ApplyStagingManifest { }) } - #[allow(dead_code)] async fn apply_staging_manifests(&self, ctx: &mut Context) -> Result<()> { let table_id = ctx.persistent_ctx.table_id; let group_id = ctx.persistent_ctx.group_id; @@ -150,6 +152,7 @@ impl ApplyStagingManifest { operation: "Apply staging manifests", })?; + let instruction_region_count: usize = instructions.values().map(|v| v.len()).sum(); let (peers, tasks): (Vec<_>, Vec<_>) = instructions .iter() .map(|(peer, apply_staging_manifests)| { @@ -166,8 +169,11 @@ impl ApplyStagingManifest { }) .unzip(); info!( - "Sent apply staging manifests instructions to peers: {:?} for repartition table {}, group id {}", - peers, table_id, group_id + "Sent apply staging manifests instructions, table_id: {}, group_id: {}, peers: {}, regions: {}", + table_id, + group_id, + peers.len(), + instruction_region_count ); let format_err_msg = |idx: usize, error: &Error| { @@ -292,11 +298,7 @@ impl ApplyStagingManifest { match receiver.await { Ok(msg) => { let reply = HeartbeatMailbox::json_reply(&msg)?; - info!( - "Received apply staging manifests reply: {:?}, elapsed: {:?}", - reply, - now.elapsed() - ); + let elapsed = now.elapsed(); let InstructionReply::ApplyStagingManifests(ApplyStagingManifestsReply { replies }) = reply else { @@ -306,9 +308,23 @@ impl ApplyStagingManifest { } .fail(); }; + let total = replies.len(); + let (mut ready, mut not_ready, mut with_error) = (0, 0, 0); + let region_ids = replies.iter().map(|r| r.region_id).collect::>(); for reply in replies { + if reply.error.is_some() { + with_error += 1; + } else if reply.ready { + ready += 1; + } else { + not_ready += 1; + } Self::handle_apply_staging_manifest_reply(&reply, &now, peer)?; } + info!( + "Received apply staging manifests reply, peer: {:?}, total_regions: {}, regions:{:?}, ready: {}, not_ready: {}, with_error: {}, elapsed: {:?}", + peer, total, region_ids, ready, not_ready, with_error, elapsed + ); Ok(()) } diff --git a/src/meta-srv/src/procedure/repartition/group/enter_staging_region.rs b/src/meta-srv/src/procedure/repartition/group/enter_staging_region.rs index d8f30b21bd..3ab911b942 100644 --- a/src/meta-srv/src/procedure/repartition/group/enter_staging_region.rs +++ b/src/meta-srv/src/procedure/repartition/group/enter_staging_region.rs @@ -23,7 +23,7 @@ use common_meta::instruction::{ use common_meta::peer::Peer; use common_procedure::{Context as ProcedureContext, Status}; use common_telemetry::info; -use futures::future::join_all; +use futures::future::{join_all, try_join_all}; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt, ensure}; @@ -35,6 +35,7 @@ use crate::procedure::repartition::group::utils::{ }; use crate::procedure::repartition::group::{Context, GroupPrepareResult, State}; use crate::procedure::repartition::plan::RegionDescriptor; +use crate::procedure::utils::{self, ErrorStrategy}; use crate::service::mailbox::{Channel, MailboxRef}; #[derive(Debug, Serialize, Deserialize)] @@ -48,6 +49,7 @@ impl State for EnterStagingRegion { ctx: &mut Context, _procedure_ctx: &ProcedureContext, ) -> Result<(Box, Status)> { + self.flush_pending_deallocate_regions(ctx).await?; self.enter_staging_regions(ctx).await?; Ok((Box::new(RemapManifest), Status::executing(true))) @@ -94,7 +96,6 @@ impl EnterStagingRegion { Ok(instructions) } - #[allow(dead_code)] async fn enter_staging_regions(&self, ctx: &mut Context) -> Result<()> { let table_id = ctx.persistent_ctx.table_id; let group_id = ctx.persistent_ctx.group_id; @@ -102,6 +103,8 @@ impl EnterStagingRegion { let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap(); let targets = &ctx.persistent_ctx.targets; let instructions = Self::build_enter_staging_instructions(prepare_result, targets)?; + let target_region_count = targets.len(); + let peer_count = instructions.len(); let operation_timeout = ctx.next_operation_timeout() .context(error::ExceededDeadlineSnafu { @@ -123,8 +126,8 @@ impl EnterStagingRegion { }) .unzip(); info!( - "Sent enter staging regions instructions to peers: {:?} for repartition table {}, group id {}", - peers, table_id, group_id + "Sent enter staging regions instructions, table_id: {}, group_id: {}, peers: {}, target_regions: {}", + table_id, group_id, peer_count, target_region_count ); let format_err_msg = |idx: usize, error: &Error| { @@ -242,11 +245,7 @@ impl EnterStagingRegion { match receiver.await { Ok(msg) => { let reply = HeartbeatMailbox::json_reply(&msg)?; - info!( - "Received enter staging regions reply: {:?}, elapsed: {:?}", - reply, - now.elapsed() - ); + let elapsed = now.elapsed(); let InstructionReply::EnterStagingRegions(EnterStagingRegionsReply { replies }) = reply else { @@ -256,9 +255,22 @@ impl EnterStagingRegion { } .fail(); }; + let total = replies.len(); + let (mut ready, mut not_ready, mut with_error) = (0, 0, 0); for reply in replies { + if reply.error.is_some() { + with_error += 1; + } else if reply.ready { + ready += 1; + } else { + not_ready += 1; + } Self::handle_enter_staging_region_reply(&reply, &now, peer)?; } + info!( + "Received enter staging regions reply, peer: {:?}, total_regions: {}, ready: {}, not_ready: {}, with_error: {}, elapsed: {:?}", + peer, total, ready, not_ready, with_error, elapsed + ); Ok(()) } @@ -320,6 +332,61 @@ impl EnterStagingRegion { Ok(()) } + + async fn flush_pending_deallocate_regions(&self, ctx: &mut Context) -> Result<()> { + let pending_deallocate_region_ids = &ctx.persistent_ctx.pending_deallocate_region_ids; + if pending_deallocate_region_ids.is_empty() { + return Ok(()); + } + + let table_id = ctx.persistent_ctx.table_id; + let group_id = ctx.persistent_ctx.group_id; + let operation_timeout = + ctx.next_operation_timeout() + .context(error::ExceededDeadlineSnafu { + operation: "Flush pending deallocate regions", + })?; + let result = &ctx.persistent_ctx.group_prepare_result.as_ref().unwrap(); + let source_routes = result + .source_routes + .iter() + .filter(|route| pending_deallocate_region_ids.contains(&route.region.id)) + .cloned() + .collect::>(); + let peer_region_ids_map = group_region_routes_by_peer(&source_routes); + info!( + "Flushing pending deallocate regions, table_id: {}, group_id: {}, peer_region_ids_map: {:?}", + table_id, group_id, peer_region_ids_map + ); + let now = Instant::now(); + let tasks = peer_region_ids_map + .iter() + .map(|(peer, region_ids)| { + utils::flush_region( + &ctx.mailbox, + &ctx.server_addr, + region_ids, + peer, + operation_timeout, + ErrorStrategy::Retry, + ) + }) + .collect::>(); + + try_join_all(tasks).await?; + info!( + "Flushed pending deallocate regions: {:?}, table_id: {}, group_id: {}, elapsed: {:?}", + source_routes + .iter() + .map(|route| route.region.id) + .collect::>(), + table_id, + group_id, + now.elapsed() + ); + + Ok(()) + } } #[cfg(test)] diff --git a/src/meta-srv/src/procedure/repartition/group/remap_manifest.rs b/src/meta-srv/src/procedure/repartition/group/remap_manifest.rs index af4b3bcf45..d8815b47b8 100644 --- a/src/meta-srv/src/procedure/repartition/group/remap_manifest.rs +++ b/src/meta-srv/src/procedure/repartition/group/remap_manifest.rs @@ -65,6 +65,13 @@ impl State for RemapManifest { .await?; let table_id = ctx.persistent_ctx.table_id; let group_id = ctx.persistent_ctx.group_id; + let manifest_count = manifest_paths.len(); + let input_region_count = ctx.persistent_ctx.sources.len(); + let target_region_count = ctx.persistent_ctx.targets.len(); + info!( + "Remap manifests finished for repartition, table_id: {}, group_id: {}, input_regions: {}, target_regions: {}, manifest_paths: {}", + table_id, group_id, input_region_count, target_region_count, manifest_count + ); if manifest_paths.len() != ctx.persistent_ctx.targets.len() { warn!( @@ -156,11 +163,7 @@ impl RemapManifest { match receiver.await { Ok(msg) => { let reply = HeartbeatMailbox::json_reply(&msg)?; - info!( - "Received remap manifest reply: {:?}, elapsed: {:?}", - reply, - now.elapsed() - ); + let elapsed = now.elapsed(); let InstructionReply::RemapManifest(reply) = reply else { return error::UnexpectedInstructionReplySnafu { mailbox_message: msg.to_string(), @@ -168,6 +171,11 @@ impl RemapManifest { } .fail(); }; + let manifest_count = reply.manifest_paths.len(); + info!( + "Received remap manifest reply for central_region: {}, manifest_paths: {}, elapsed: {:?}", + remap.region_id, manifest_count, elapsed + ); Self::handle_remap_manifest_reply(remap.region_id, reply, &now, peer) } diff --git a/src/meta-srv/src/procedure/repartition/group/repartition_start.rs b/src/meta-srv/src/procedure/repartition/group/repartition_start.rs index fd61c4f8d3..3c8f011584 100644 --- a/src/meta-srv/src/procedure/repartition/group/repartition_start.rs +++ b/src/meta-srv/src/procedure/repartition/group/repartition_start.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::any::Any; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use common_meta::rpc::router::RegionRoute; use common_procedure::{Context as ProcedureContext, Status}; @@ -22,6 +22,7 @@ use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt, ensure}; use crate::error::{self, Result}; +use crate::procedure::repartition::group::sync_region::SyncRegion; use crate::procedure::repartition::group::update_metadata::UpdateMetadata; use crate::procedure::repartition::group::{ Context, GroupId, GroupPrepareResult, State, region_routes, @@ -56,7 +57,6 @@ impl RepartitionStart { /// Ensures that both source and target regions are present in the region routes. /// /// Both source and target regions must be present in the region routes (target regions should be allocated before repartitioning). - #[allow(dead_code)] fn ensure_route_present( group_id: GroupId, region_routes: &[RegionRoute], @@ -172,6 +172,28 @@ impl State for RepartitionStart { ctx.persistent_ctx.targets.len() ); + if ctx.persistent_ctx.sync_region { + let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap(); + let allocated_region_ids: HashSet<_> = ctx + .persistent_ctx + .allocated_region_ids + .iter() + .copied() + .collect(); + let region_routes: Vec<_> = prepare_result + .target_routes + .iter() + .filter(|route| allocated_region_ids.contains(&route.region.id)) + .cloned() + .collect(); + if !region_routes.is_empty() { + return Ok(( + Box::new(SyncRegion { region_routes }), + Status::executing(true), + )); + } + } + Ok(( Box::new(UpdateMetadata::ApplyStaging), Status::executing(true), diff --git a/src/meta-srv/src/procedure/repartition/group/sync_region.rs b/src/meta-srv/src/procedure/repartition/group/sync_region.rs new file mode 100644 index 0000000000..6e3b6ac35a --- /dev/null +++ b/src/meta-srv/src/procedure/repartition/group/sync_region.rs @@ -0,0 +1,445 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::collections::HashMap; +use std::time::{Duration, Instant}; + +use api::v1::meta::MailboxMessage; +use common_meta::instruction::{Instruction, InstructionReply, SyncRegionReply, SyncRegionsReply}; +use common_meta::peer::Peer; +use common_meta::rpc::router::RegionRoute; +use common_procedure::{Context as ProcedureContext, Status}; +use common_telemetry::info; +use futures::future::join_all; +use serde::{Deserialize, Serialize}; +use snafu::{OptionExt, ResultExt, ensure}; +use store_api::region_engine::SyncRegionFromRequest; +use store_api::storage::RegionId; + +use crate::error::{self, Error, Result}; +use crate::handler::HeartbeatMailbox; +use crate::procedure::repartition::group::update_metadata::UpdateMetadata; +use crate::procedure::repartition::group::utils::{ + HandleMultipleResult, group_region_routes_by_peer, handle_multiple_results, +}; +use crate::procedure::repartition::group::{Context, State}; +use crate::procedure::utils::ErrorStrategy; +use crate::service::mailbox::{Channel, MailboxRef}; + +const DEFAULT_SYNC_REGION_PARALLELISM: usize = 3; + +/// The state of syncing regions for a repartition group. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SyncRegion { + pub region_routes: Vec, +} + +#[async_trait::async_trait] +#[typetag::serde] +impl State for SyncRegion { + async fn next( + &mut self, + ctx: &mut Context, + _procedure_ctx: &ProcedureContext, + ) -> Result<(Box, Status)> { + Self::flush_central_region(ctx).await?; + self.sync_regions(ctx).await?; + + Ok(( + Box::new(UpdateMetadata::ApplyStaging), + Status::executing(true), + )) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +impl SyncRegion { + async fn flush_central_region(ctx: &mut Context) -> Result<()> { + let operation_timeout = + ctx.next_operation_timeout() + .context(error::ExceededDeadlineSnafu { + operation: "Flush central region", + })?; + let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap(); + + crate::procedure::utils::flush_region( + &ctx.mailbox, + &ctx.server_addr, + &[prepare_result.central_region], + &prepare_result.central_region_datanode, + operation_timeout, + ErrorStrategy::Retry, + ) + .await + } + + /// Builds instructions to sync regions on datanodes. + fn build_sync_region_instructions( + central_region: RegionId, + region_routes: &[RegionRoute], + ) -> HashMap> { + let target_region_routes_by_peer = group_region_routes_by_peer(region_routes); + let mut instructions = HashMap::with_capacity(target_region_routes_by_peer.len()); + + for (peer, region_ids) in target_region_routes_by_peer { + let sync_regions = region_ids + .into_iter() + .map(|region_id| { + let request = SyncRegionFromRequest::FromRegion { + source_region_id: central_region, + parallelism: DEFAULT_SYNC_REGION_PARALLELISM, + }; + common_meta::instruction::SyncRegion { region_id, request } + }) + .collect(); + instructions.insert((*peer).clone(), sync_regions); + } + + instructions + } + + /// Syncs regions on datanodes. + async fn sync_regions(&self, ctx: &mut Context) -> Result<()> { + let table_id = ctx.persistent_ctx.table_id; + let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap(); + let instructions = Self::build_sync_region_instructions( + prepare_result.central_region, + &self.region_routes, + ); + let operation_timeout = + ctx.next_operation_timeout() + .context(error::ExceededDeadlineSnafu { + operation: "Sync regions", + })?; + + let (peers, tasks): (Vec<_>, Vec<_>) = instructions + .iter() + .map(|(peer, sync_regions)| { + ( + peer, + Self::sync_region( + &ctx.mailbox, + &ctx.server_addr, + peer, + sync_regions, + operation_timeout, + ), + ) + }) + .unzip(); + + info!( + "Sent sync regions instructions to peers: {:?} for repartition table {}", + peers, table_id + ); + + let format_err_msg = |idx: usize, error: &Error| { + let peer = peers[idx]; + format!( + "Failed to sync regions on datanode {:?}, error: {:?}", + peer, error + ) + }; + + let results = join_all(tasks).await; + let result = handle_multiple_results(&results); + + match result { + HandleMultipleResult::AllSuccessful => Ok(()), + HandleMultipleResult::AllRetryable(retryable_errors) => error::RetryLaterSnafu { + reason: format!( + "All retryable errors during syncing regions for repartition table {}: {:?}", + table_id, + retryable_errors + .iter() + .map(|(idx, error)| format_err_msg(*idx, error)) + .collect::>() + .join(",") + ), + } + .fail(), + HandleMultipleResult::AllNonRetryable(non_retryable_errors) => error::UnexpectedSnafu { + violated: format!( + "All non retryable errors during syncing regions for repartition table {}: {:?}", + table_id, + non_retryable_errors + .iter() + .map(|(idx, error)| format_err_msg(*idx, error)) + .collect::>() + .join(",") + ), + } + .fail(), + HandleMultipleResult::PartialRetryable { + retryable_errors, + non_retryable_errors, + } => error::UnexpectedSnafu { + violated: format!( + "Partial retryable errors during syncing regions for repartition table {}: {:?}, non retryable errors: {:?}", + table_id, + retryable_errors + .iter() + .map(|(idx, error)| format_err_msg(*idx, error)) + .collect::>() + .join(","), + non_retryable_errors + .iter() + .map(|(idx, error)| format_err_msg(*idx, error)) + .collect::>() + .join(","), + ), + } + .fail(), + } + } + + /// Syncs regions on a datanode. + async fn sync_region( + mailbox: &MailboxRef, + server_addr: &str, + peer: &Peer, + sync_regions: &[common_meta::instruction::SyncRegion], + timeout: Duration, + ) -> Result<()> { + let ch = Channel::Datanode(peer.id); + let instruction = Instruction::SyncRegions(sync_regions.to_vec()); + let message = MailboxMessage::json_message( + &format!( + "Sync regions: {:?}", + sync_regions.iter().map(|r| r.region_id).collect::>() + ), + &format!("Metasrv@{}", server_addr), + &format!("Datanode-{}@{}", peer.id, peer.addr), + common_time::util::current_time_millis(), + &instruction, + ) + .with_context(|_| error::SerializeToJsonSnafu { + input: instruction.to_string(), + })?; + + let now = std::time::Instant::now(); + let receiver = mailbox.send(&ch, message, timeout).await; + + let receiver = match receiver { + Ok(receiver) => receiver, + Err(error::Error::PusherNotFound { .. }) => error::RetryLaterSnafu { + reason: format!( + "Pusher not found for sync regions on datanode {:?}, elapsed: {:?}", + peer, + now.elapsed() + ), + } + .fail()?, + Err(err) => { + return Err(err); + } + }; + + match receiver.await { + Ok(msg) => { + let reply = HeartbeatMailbox::json_reply(&msg)?; + info!( + "Received sync regions reply: {:?}, elapsed: {:?}", + reply, + now.elapsed() + ); + let InstructionReply::SyncRegions(SyncRegionsReply { replies }) = reply else { + return error::UnexpectedInstructionReplySnafu { + mailbox_message: msg.to_string(), + reason: "expect sync regions reply", + } + .fail(); + }; + for reply in replies { + Self::handle_sync_region_reply(&reply, &now, peer)?; + } + Ok(()) + } + Err(error::Error::MailboxTimeout { .. }) => { + let reason = format!( + "Mailbox received timeout for sync regions on datanode {:?}, elapsed: {:?}", + peer, + now.elapsed() + ); + error::RetryLaterSnafu { reason }.fail() + } + Err(err) => Err(err), + } + } + + fn handle_sync_region_reply( + SyncRegionReply { + region_id, + ready, + exists, + error, + }: &SyncRegionReply, + now: &Instant, + peer: &Peer, + ) -> Result<()> { + ensure!( + exists, + error::UnexpectedSnafu { + violated: format!( + "Region {} doesn't exist on datanode {:?}, elapsed: {:?}", + region_id, + peer, + now.elapsed() + ) + } + ); + + if let Some(error) = error { + return error::RetryLaterSnafu { + reason: format!( + "Failed to sync region {} on datanode {:?}, error: {:?}, elapsed: {:?}", + region_id, + peer, + error, + now.elapsed() + ), + } + .fail(); + } + + ensure!( + ready, + error::RetryLaterSnafu { + reason: format!( + "Region {} failed to sync on datanode {:?}, elapsed: {:?}", + region_id, + peer, + now.elapsed() + ), + } + ); + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + + use common_meta::peer::Peer; + use common_meta::rpc::router::{Region, RegionRoute}; + use store_api::region_engine::SyncRegionFromRequest; + use store_api::storage::RegionId; + + use crate::error::Error; + use crate::procedure::repartition::group::GroupPrepareResult; + use crate::procedure::repartition::group::sync_region::SyncRegion; + use crate::procedure::repartition::test_util::{TestingEnv, new_persistent_context}; + use crate::procedure::test_util::{new_sync_region_reply, send_mock_reply}; + use crate::service::mailbox::Channel; + + #[test] + fn test_build_sync_region_instructions() { + let table_id = 1024; + let central_region = RegionId::new(table_id, 1); + let region_routes = vec![RegionRoute { + region: Region { + id: RegionId::new(table_id, 3), + ..Default::default() + }, + leader_peer: Some(Peer::empty(1)), + ..Default::default() + }]; + + let instructions = + SyncRegion::build_sync_region_instructions(central_region, ®ion_routes); + assert_eq!(instructions.len(), 1); + let peer_instructions = instructions.get(&Peer::empty(1)).unwrap(); + assert_eq!(peer_instructions.len(), 1); + assert_eq!(peer_instructions[0].region_id, RegionId::new(table_id, 3)); + let SyncRegionFromRequest::FromRegion { + source_region_id, .. + } = &peer_instructions[0].request + else { + panic!("expect from region request"); + }; + assert_eq!(*source_region_id, central_region); + } + + fn test_prepare_result(table_id: u32) -> GroupPrepareResult { + GroupPrepareResult { + source_routes: vec![], + target_routes: vec![], + central_region: RegionId::new(table_id, 1), + central_region_datanode: Peer::empty(1), + } + } + + #[tokio::test] + async fn test_sync_regions_all_successful() { + let mut env = TestingEnv::new(); + let table_id = 1024; + let mut persistent_context = new_persistent_context(table_id, vec![], vec![]); + persistent_context.group_prepare_result = Some(test_prepare_result(table_id)); + + let (tx, rx) = tokio::sync::mpsc::channel(1); + env.mailbox_ctx + .insert_heartbeat_response_receiver(Channel::Datanode(1), tx) + .await; + send_mock_reply(env.mailbox_ctx.mailbox().clone(), rx, |id| { + Ok(new_sync_region_reply( + id, + RegionId::new(1024, 3), + true, + true, + None, + )) + }); + + let mut ctx = env.create_context(persistent_context); + let region_routes = vec![RegionRoute { + region: Region { + id: RegionId::new(table_id, 3), + ..Default::default() + }, + leader_peer: Some(Peer::empty(1)), + ..Default::default() + }]; + let sync_region = SyncRegion { region_routes }; + + sync_region.sync_regions(&mut ctx).await.unwrap(); + } + + #[tokio::test] + async fn test_sync_regions_retryable() { + let env = TestingEnv::new(); + let table_id = 1024; + let mut persistent_context = new_persistent_context(table_id, vec![], vec![]); + persistent_context.group_prepare_result = Some(test_prepare_result(table_id)); + + let mut ctx = env.create_context(persistent_context); + let region_routes = vec![RegionRoute { + region: Region { + id: RegionId::new(table_id, 3), + ..Default::default() + }, + leader_peer: Some(Peer::empty(1)), + ..Default::default() + }]; + let sync_region = SyncRegion { region_routes }; + + let err = sync_region.sync_regions(&mut ctx).await.unwrap_err(); + assert_matches!(err, Error::RetryLater { .. }); + } +} diff --git a/src/meta-srv/src/procedure/repartition/group/update_metadata.rs b/src/meta-srv/src/procedure/repartition/group/update_metadata.rs index ecd9a93046..443d8e3b09 100644 --- a/src/meta-srv/src/procedure/repartition/group/update_metadata.rs +++ b/src/meta-srv/src/procedure/repartition/group/update_metadata.rs @@ -13,6 +13,7 @@ // limitations under the License. pub(crate) mod apply_staging_region; +pub(crate) mod exit_staging_region; pub(crate) mod rollback_staging_region; use std::any::Any; @@ -28,11 +29,14 @@ use crate::procedure::repartition::group::repartition_end::RepartitionEnd; use crate::procedure::repartition::group::{Context, State}; #[derive(Debug, Serialize, Deserialize)] +#[allow(clippy::enum_variant_names)] pub enum UpdateMetadata { /// Applies the new partition expressions for staging regions. ApplyStaging, /// Rolls back the new partition expressions for staging regions. RollbackStaging, + /// Exits the staging regions. + ExitStaging, } #[async_trait::async_trait] @@ -62,7 +66,18 @@ impl State for UpdateMetadata { if let Err(err) = ctx.invalidate_table_cache().await { warn!( - "Failed to broadcast the invalidate table cache message during the rollback staging regions, error: {err:?}" + err; + "Failed to broadcast the invalidate table cache message during the rollback staging regions" + ); + }; + Ok((Box::new(RepartitionEnd), Status::executing(false))) + } + UpdateMetadata::ExitStaging => { + self.exit_staging_regions(ctx).await?; + if let Err(err) = ctx.invalidate_table_cache().await { + warn!( + err; + "Failed to broadcast the invalidate table cache message during the exit staging regions" ); }; Ok((Box::new(RepartitionEnd), Status::executing(false))) diff --git a/src/meta-srv/src/procedure/repartition/group/update_metadata/apply_staging_region.rs b/src/meta-srv/src/procedure/repartition/group/update_metadata/apply_staging_region.rs index 6f342931a8..17c730a3c2 100644 --- a/src/meta-srv/src/procedure/repartition/group/update_metadata/apply_staging_region.rs +++ b/src/meta-srv/src/procedure/repartition/group/update_metadata/apply_staging_region.rs @@ -16,7 +16,7 @@ use std::collections::HashMap; use common_error::ext::BoxedError; use common_meta::rpc::router::RegionRoute; -use common_telemetry::error; +use common_telemetry::{error, info}; use snafu::{OptionExt, ResultExt}; use crate::error::{self, Result}; @@ -77,7 +77,6 @@ impl UpdateMetadata { /// - Source region not found. /// - Failed to update the table route. /// - Central region datanode table value not found. - #[allow(dead_code)] pub(crate) async fn apply_staging_regions(&self, ctx: &mut Context) -> Result<()> { let table_id = ctx.persistent_ctx.table_id; let group_id = ctx.persistent_ctx.group_id; @@ -90,6 +89,13 @@ impl UpdateMetadata { region_routes, )?; + let source_count = ctx.persistent_ctx.sources.len(); + let target_count = ctx.persistent_ctx.targets.len(); + info!( + "Apply staging regions for repartition, table_id: {}, group_id: {}, sources: {}, targets: {}", + table_id, group_id, source_count, target_count + ); + if let Err(err) = ctx .update_table_route(¤t_table_route_value, new_region_routes) .await diff --git a/src/meta-srv/src/procedure/repartition/group/update_metadata/exit_staging_region.rs b/src/meta-srv/src/procedure/repartition/group/update_metadata/exit_staging_region.rs new file mode 100644 index 0000000000..4e82d4874b --- /dev/null +++ b/src/meta-srv/src/procedure/repartition/group/update_metadata/exit_staging_region.rs @@ -0,0 +1,104 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use common_error::ext::BoxedError; +use common_meta::rpc::router::RegionRoute; +use common_telemetry::{error, info}; +use snafu::{OptionExt, ResultExt}; + +use crate::error::{self, Result}; +use crate::procedure::repartition::group::update_metadata::UpdateMetadata; +use crate::procedure::repartition::group::{Context, GroupId, region_routes}; +use crate::procedure::repartition::plan::RegionDescriptor; + +impl UpdateMetadata { + fn exit_staging_region_routes( + group_id: GroupId, + sources: &[RegionDescriptor], + targets: &[RegionDescriptor], + current_region_routes: &[RegionRoute], + ) -> Result> { + let mut region_routes = current_region_routes.to_vec(); + let mut region_routes_map = region_routes + .iter_mut() + .map(|route| (route.region.id, route)) + .collect::>(); + + for target in targets { + let region_route = region_routes_map.get_mut(&target.region_id).context( + error::RepartitionTargetRegionMissingSnafu { + group_id, + region_id: target.region_id, + }, + )?; + region_route.clear_leader_staging(); + } + + for source in sources { + let region_route = region_routes_map.get_mut(&source.region_id).context( + error::RepartitionSourceRegionMissingSnafu { + group_id, + region_id: source.region_id, + }, + )?; + region_route.clear_leader_staging(); + } + + Ok(region_routes) + } + + /// Exits the staging regions. + /// + /// Abort: + /// - Table route is not physical. + /// - Target region not found. + /// - Source region not found. + /// - Failed to update the table route. + /// - Central region datanode table value not found. + pub(crate) async fn exit_staging_regions(&self, ctx: &mut Context) -> Result<()> { + let table_id = ctx.persistent_ctx.table_id; + let group_id = ctx.persistent_ctx.group_id; + let current_table_route_value = ctx.get_table_route_value().await?; + let region_routes = region_routes(table_id, current_table_route_value.get_inner_ref())?; + let new_region_routes = Self::exit_staging_region_routes( + group_id, + &ctx.persistent_ctx.sources, + &ctx.persistent_ctx.targets, + region_routes, + )?; + + let source_count = ctx.persistent_ctx.sources.len(); + let target_count = ctx.persistent_ctx.targets.len(); + info!( + "Exit staging regions for repartition, table_id: {}, group_id: {}, sources: {}, targets: {}", + table_id, group_id, source_count, target_count + ); + + if let Err(err) = ctx + .update_table_route(¤t_table_route_value, new_region_routes) + .await + { + error!(err; "Failed to update the table route during the updating metadata for repartition: {table_id}, group_id: {group_id}"); + return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu { + reason: format!( + "Failed to update the table route during the updating metadata for repartition: {table_id}, group_id: {group_id}" + ), + }); + }; + + Ok(()) + } +} diff --git a/src/meta-srv/src/procedure/repartition/group/update_metadata/rollback_staging_region.rs b/src/meta-srv/src/procedure/repartition/group/update_metadata/rollback_staging_region.rs index 3d147d82ad..b9a50e7075 100644 --- a/src/meta-srv/src/procedure/repartition/group/update_metadata/rollback_staging_region.rs +++ b/src/meta-srv/src/procedure/repartition/group/update_metadata/rollback_staging_region.rs @@ -16,7 +16,7 @@ use std::collections::HashMap; use common_error::ext::BoxedError; use common_meta::rpc::router::RegionRoute; -use common_telemetry::error; +use common_telemetry::{error, info}; use snafu::{OptionExt, ResultExt}; use crate::error::{self, Result}; @@ -29,7 +29,6 @@ impl UpdateMetadata { /// Abort: /// - Source region not found. /// - Target region not found. - #[allow(dead_code)] fn rollback_staging_region_routes( group_id: GroupId, source_routes: &[RegionRoute], @@ -74,7 +73,6 @@ impl UpdateMetadata { /// - Target region not found. /// - Failed to update the table route. /// - Central region datanode table value not found. - #[allow(dead_code)] pub(crate) async fn rollback_staging_regions(&self, ctx: &mut Context) -> Result<()> { let table_id = ctx.persistent_ctx.table_id; let group_id = ctx.persistent_ctx.group_id; @@ -89,6 +87,13 @@ impl UpdateMetadata { region_routes, )?; + let source_count = prepare_result.source_routes.len(); + let target_count = prepare_result.target_routes.len(); + info!( + "Rollback staging regions for repartition, table_id: {}, group_id: {}, sources: {}, targets: {}", + table_id, group_id, source_count, target_count + ); + if let Err(err) = ctx .update_table_route(¤t_table_route_value, new_region_routes) .await diff --git a/src/meta-srv/src/procedure/repartition/repartition_start.rs b/src/meta-srv/src/procedure/repartition/repartition_start.rs index 88e84546c8..ace33e77ee 100644 --- a/src/meta-srv/src/procedure/repartition/repartition_start.rs +++ b/src/meta-srv/src/procedure/repartition/repartition_start.rs @@ -16,6 +16,7 @@ use std::any::Any; use common_meta::key::table_route::PhysicalTableRouteValue; use common_procedure::{Context as ProcedureContext, Status}; +use common_telemetry::debug; use partition::expr::PartitionExpr; use partition::subtask::{self, RepartitionSubtask}; use serde::{Deserialize, Serialize}; @@ -69,6 +70,17 @@ impl State for RepartitionStart { ); let plans = Self::build_plan(&table_route, &self.from_exprs, &self.to_exprs)?; + let plan_count = plans.len(); + let total_source_regions: usize = plans.iter().map(|p| p.source_regions.len()).sum(); + let total_target_regions: usize = + plans.iter().map(|p| p.target_partition_exprs.len()).sum(); + common_telemetry::info!( + "Repartition start, table_id: {}, plans: {}, total_source_regions: {}, total_target_regions: {}", + table_id, + plan_count, + total_source_regions, + total_target_regions + ); if plans.is_empty() { return Ok((Box::new(RepartitionEnd), Status::done())); @@ -86,7 +98,6 @@ impl State for RepartitionStart { } impl RepartitionStart { - #[allow(dead_code)] fn build_plan( physical_route: &PhysicalTableRouteValue, from_exprs: &[PartitionExpr], @@ -106,7 +117,6 @@ impl RepartitionStart { )) } - #[allow(dead_code)] fn build_plan_entries( subtasks: Vec, source_index: &[RegionDescriptor], @@ -159,8 +169,9 @@ impl RepartitionStart { .find_map(|(region_id, existing_expr)| { (existing_expr == &expr_json).then_some(*region_id) }) - .with_context(|| error::RepartitionSourceExprMismatchSnafu { - expr: expr_json, + .with_context(|| error::RepartitionSourceExprMismatchSnafu { expr: &expr_json }) + .inspect_err(|_| { + debug!("Failed to find matching region for partition expression: {}, existing regions: {:?}", expr_json, existing_regions); })?; Ok(RegionDescriptor { diff --git a/src/meta-srv/src/procedure/repartition/test_util.rs b/src/meta-srv/src/procedure/repartition/test_util.rs index 35fdeee759..cb4042bbe8 100644 --- a/src/meta-srv/src/procedure/repartition/test_util.rs +++ b/src/meta-srv/src/procedure/repartition/test_util.rs @@ -96,5 +96,8 @@ pub fn new_persistent_context( region_mapping: HashMap::new(), group_prepare_result: None, staging_manifest_paths: HashMap::new(), + sync_region: false, + allocated_region_ids: vec![], + pending_deallocate_region_ids: vec![], } } diff --git a/src/meta-srv/src/procedure/test_util.rs b/src/meta-srv/src/procedure/test_util.rs index 37a339e061..76f3f4249a 100644 --- a/src/meta-srv/src/procedure/test_util.rs +++ b/src/meta-srv/src/procedure/test_util.rs @@ -18,7 +18,8 @@ use api::v1::meta::mailbox_message::Payload; use api::v1::meta::{HeartbeatResponse, MailboxMessage}; use common_meta::instruction::{ DowngradeRegionReply, DowngradeRegionsReply, EnterStagingRegionReply, EnterStagingRegionsReply, - FlushRegionReply, InstructionReply, SimpleReply, UpgradeRegionReply, UpgradeRegionsReply, + FlushRegionReply, InstructionReply, SimpleReply, SyncRegionReply, SyncRegionsReply, + UpgradeRegionReply, UpgradeRegionsReply, }; use common_meta::key::TableMetadataManagerRef; use common_meta::key::table_route::TableRouteValue; @@ -253,6 +254,34 @@ pub fn new_enter_staging_region_reply( } } +/// Generates a [InstructionReply::SyncRegions] reply. +pub fn new_sync_region_reply( + id: u64, + region_id: RegionId, + ready: bool, + exists: bool, + error: Option, +) -> MailboxMessage { + MailboxMessage { + id, + subject: "mock".to_string(), + from: "datanode".to_string(), + to: "meta".to_string(), + timestamp_millis: current_time_millis(), + payload: Some(Payload::Json( + serde_json::to_string(&InstructionReply::SyncRegions(SyncRegionsReply::new(vec![ + SyncRegionReply { + region_id, + ready, + exists, + error, + }, + ]))) + .unwrap(), + )), + } +} + /// Mock the test data for WAL pruning. pub async fn new_wal_prune_metadata( table_metadata_manager: TableMetadataManagerRef, diff --git a/src/meta-srv/src/procedure/utils.rs b/src/meta-srv/src/procedure/utils.rs index b120b39b79..e12268f1c3 100644 --- a/src/meta-srv/src/procedure/utils.rs +++ b/src/meta-srv/src/procedure/utils.rs @@ -12,6 +12,185 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::time::Duration; + +use api::v1::meta::MailboxMessage; +use common_meta::instruction::{FlushErrorStrategy, FlushRegions, Instruction, InstructionReply}; +use common_meta::peer::Peer; +use common_telemetry::{info, warn}; +use snafu::ResultExt; +use store_api::storage::RegionId; +use tokio::time::Instant; + +use crate::error::{self, Error, Result}; +use crate::handler::HeartbeatMailbox; +use crate::service::mailbox::{Channel, MailboxRef}; + +pub(crate) enum ErrorStrategy { + Ignore, + Retry, +} + +fn handle_flush_region_reply( + reply: &InstructionReply, + region_ids: &[RegionId], + msg: &MailboxMessage, +) -> Result<(bool, Option)> { + let result = match reply { + InstructionReply::FlushRegions(flush_reply) => { + if flush_reply.results.len() != region_ids.len() { + return error::UnexpectedInstructionReplySnafu { + mailbox_message: msg.to_string(), + reason: format!( + "expect {} region flush result, but got {}", + region_ids.len(), + flush_reply.results.len() + ), + } + .fail(); + } + + match flush_reply.overall_success { + true => (true, None), + false => ( + false, + Some( + flush_reply + .results + .iter() + .filter_map(|(region_id, result)| match result { + Ok(_) => None, + Err(e) => Some(format!("{}: {:?}", region_id, e)), + }) + .collect::>() + .join("; "), + ), + ), + } + } + _ => { + return error::UnexpectedInstructionReplySnafu { + mailbox_message: msg.to_string(), + reason: "expect flush region reply", + } + .fail(); + } + }; + + Ok(result) +} + +/// Flushes the regions on the datanode. +/// +/// Retry Or Ignore: +/// - [PusherNotFound](error::Error::PusherNotFound), The datanode is unreachable. +/// - Failed to flush region on the Datanode. +/// +/// Abort: +/// - [MailboxTimeout](error::Error::MailboxTimeout), Timeout. +/// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply). +/// - [ExceededDeadline](error::Error::ExceededDeadline) +/// - Invalid JSON. +pub(crate) async fn flush_region( + mailbox: &MailboxRef, + server_addr: &str, + region_ids: &[RegionId], + datanode: &Peer, + timeout: Duration, + error_strategy: ErrorStrategy, +) -> Result<()> { + let flush_instruction = Instruction::FlushRegions(FlushRegions::sync_batch( + region_ids.to_vec(), + FlushErrorStrategy::TryAll, + )); + + let msg = MailboxMessage::json_message( + &format!("Flush regions: {:?}", region_ids), + &format!("Metasrv@{}", server_addr), + &format!("Datanode-{}@{}", datanode.id, datanode.addr), + common_time::util::current_time_millis(), + &flush_instruction, + ) + .with_context(|_| error::SerializeToJsonSnafu { + input: flush_instruction.to_string(), + })?; + + let ch = Channel::Datanode(datanode.id); + let now = Instant::now(); + let receiver = mailbox.send(&ch, msg, timeout).await; + let receiver = match receiver { + Ok(receiver) => receiver, + Err(error::Error::PusherNotFound { .. }) => match error_strategy { + ErrorStrategy::Ignore => { + warn!( + "Failed to flush regions({:?}), the datanode({}) is unreachable(PusherNotFound). Skip flush operation.", + region_ids, datanode + ); + return Ok(()); + } + ErrorStrategy::Retry => error::RetryLaterSnafu { + reason: format!( + "Pusher not found for flush regions on datanode {:?}, elapsed: {:?}", + datanode, + now.elapsed() + ), + } + .fail()?, + }, + Err(err) => { + return Err(err); + } + }; + + match receiver.await { + Ok(msg) => { + let reply = HeartbeatMailbox::json_reply(&msg)?; + info!( + "Received flush region reply: {:?}, regions: {:?}, elapsed: {:?}", + reply, + region_ids, + now.elapsed() + ); + let (result, error) = handle_flush_region_reply(&reply, region_ids, &msg)?; + if let Some(error) = error { + match error_strategy { + ErrorStrategy::Ignore => { + warn!( + "Failed to flush regions {:?}, the datanode({}) error is ignored: {}", + region_ids, datanode, error + ); + } + ErrorStrategy::Retry => { + return error::RetryLaterSnafu { + reason: format!( + "Failed to flush regions {:?}, the datanode({}) error is retried: {}", + region_ids, + datanode, + error, + ), + } + .fail()?; + } + } + } else if result { + info!( + "The flush regions {:?} on datanode {:?} is successful, elapsed: {:?}", + region_ids, + datanode, + now.elapsed() + ); + } + + Ok(()) + } + Err(Error::MailboxTimeout { .. }) => error::ExceededDeadlineSnafu { + operation: "Flush regions", + } + .fail(), + Err(err) => Err(err), + } +} + #[cfg(any(test, feature = "mock"))] pub mod mock { use std::io::Error; diff --git a/src/metric-engine/src/engine/drop.rs b/src/metric-engine/src/engine/drop.rs index 11e2fb7a5f..6cd5f22a78 100644 --- a/src/metric-engine/src/engine/drop.rs +++ b/src/metric-engine/src/engine/drop.rs @@ -14,7 +14,7 @@ //! Drop a metric region -use common_telemetry::info; +use common_telemetry::{debug, info}; use snafu::ResultExt; use store_api::region_engine::RegionEngine; use store_api::region_request::{AffectedRows, RegionDropRequest, RegionRequest}; @@ -46,6 +46,15 @@ impl MetricEngineInner { .physical_region_states() .get(&data_region_id) { + debug!( + "Physical region {} is busy, there are still some logical regions: {:?}", + data_region_id, + state + .logical_regions() + .iter() + .map(|id| id.to_string()) + .collect::>() + ); (true, !state.logical_regions().is_empty()) } else { // the second argument is not used, just pass in a dummy value diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index ab6d4cd702..8d97fcd166 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -314,11 +314,8 @@ impl MitoRegion { /// Sets the dropping state. /// You should call this method in the worker loop. - pub(crate) fn set_dropping(&self) -> Result<()> { - self.compare_exchange_state( - RegionLeaderState::Writable, - RegionRoleState::Leader(RegionLeaderState::Dropping), - ) + pub(crate) fn set_dropping(&self, expect: RegionLeaderState) -> Result<()> { + self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Dropping)) } /// Sets the truncating state. diff --git a/src/mito2/src/worker/handle_copy_region.rs b/src/mito2/src/worker/handle_copy_region.rs index e929013fc6..aa5b26448e 100644 --- a/src/mito2/src/worker/handle_copy_region.rs +++ b/src/mito2/src/worker/handle_copy_region.rs @@ -31,7 +31,7 @@ impl RegionWorkerLoop { let region_id = request.region_id; let source_region_id = request.source_region_id; let sender = request.sender; - let region = match self.regions.writable_region(region_id) { + let region = match self.regions.writable_non_staging_region(region_id) { Ok(region) => region, Err(e) => { let _ = sender.send(Err(e)); diff --git a/src/mito2/src/worker/handle_drop.rs b/src/mito2/src/worker/handle_drop.rs index 9d36507407..8afe15bfde 100644 --- a/src/mito2/src/worker/handle_drop.rs +++ b/src/mito2/src/worker/handle_drop.rs @@ -42,12 +42,18 @@ where &mut self, region_id: RegionId, ) -> Result { - let region = self.regions.writable_non_staging_region(region_id)?; + let region = self.regions.writable_region(region_id)?; info!("Try to drop region: {}, worker: {}", region_id, self.id); + let is_staging = region.is_staging(); + let expect_state = if is_staging { + RegionLeaderState::Staging + } else { + RegionLeaderState::Writable + }; // Marks the region as dropping. - region.set_dropping()?; + region.set_dropping(expect_state)?; // Writes dropping marker // We rarely drop a region so we still operate in the worker loop. let region_dir = region.access_layer.build_region_dir(region_id); diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 5d5bf8f4ce..2a078f0a84 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -638,7 +638,7 @@ impl RegionStatistic { } /// Request to sync the region from a manifest or a region. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub enum SyncRegionFromRequest { /// Syncs the region using manifest information. /// Used in leader-follower manifest sync scenarios. diff --git a/tests/cases/distributed/repartition/repartition.result b/tests/cases/distributed/repartition/repartition.result index 05775c91f4..0df60a0ac2 100644 --- a/tests/cases/distributed/repartition/repartition.result +++ b/tests/cases/distributed/repartition/repartition.result @@ -99,3 +99,155 @@ DROP TABLE alter_repartition_table; Affected Rows: 0 +-- Metric engine repartition test +CREATE TABLE metric_physical_table ( + ts TIMESTAMP TIME INDEX, + host STRING, + cpu DOUBLE, + PRIMARY KEY(host) +) +PARTITION ON COLUMNS (host) ( + host < 'h1', + host >= 'h1' AND host < 'h2', + host >= 'h2' +) +ENGINE = metric +WITH ( + physical_metric_table = "true" +); + +Affected Rows: 0 + +CREATE TABLE logical_table_v1 ( + ts TIMESTAMP TIME INDEX, + host STRING PRIMARY KEY, + cpu DOUBLE, +) +ENGINE = metric +WITH ( + on_physical_table = "metric_physical_table" +); + +Affected Rows: 0 + +CREATE TABLE logical_table_v2 ( + ts TIMESTAMP TIME INDEX, + host STRING PRIMARY KEY, + cpu DOUBLE, +) +ENGINE = metric +WITH ( + on_physical_table = "metric_physical_table" +); + +Affected Rows: 0 + +-- Split physical table partition +ALTER TABLE metric_physical_table SPLIT PARTITION ( + host < 'h1' +) INTO ( + host < 'h0', + host >= 'h0' AND host < 'h1' +); + +Affected Rows: 0 + +SHOW CREATE TABLE metric_physical_table; + ++-----------------------+------------------------------------------------------+ +| Table | Create Table | ++-----------------------+------------------------------------------------------+ +| metric_physical_table | CREATE TABLE IF NOT EXISTS "metric_physical_table" ( | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "host" STRING NULL, | +| | "cpu" DOUBLE NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("host") | +| | ) | +| | PARTITION ON COLUMNS ("host") ( | +| | host < 'h0', | +| | host >= 'h1' AND host < 'h2', | +| | host >= 'h2', | +| | host >= 'h0' AND host < 'h1' | +| | ) | +| | ENGINE=metric | +| | WITH( | +| | physical_metric_table = 'true' | +| | ) | ++-----------------------+------------------------------------------------------+ + +-- Verify select * works and returns empty +SELECT * FROM metric_physical_table; + +++ +++ + +SELECT * FROM logical_table_v1; + +++ +++ + +SELECT * FROM logical_table_v2; + +++ +++ + +-- Merge physical table partition +ALTER TABLE metric_physical_table MERGE PARTITION ( + host < 'h0', + host >= 'h0' AND host < 'h1' +); + +Affected Rows: 0 + +SHOW CREATE TABLE metric_physical_table; + ++-----------------------+------------------------------------------------------+ +| Table | Create Table | ++-----------------------+------------------------------------------------------+ +| metric_physical_table | CREATE TABLE IF NOT EXISTS "metric_physical_table" ( | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "host" STRING NULL, | +| | "cpu" DOUBLE NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("host") | +| | ) | +| | PARTITION ON COLUMNS ("host") ( | +| | host < 'h0' OR host >= 'h0' AND host < 'h1', | +| | host >= 'h1' AND host < 'h2', | +| | host >= 'h2' | +| | ) | +| | ENGINE=metric | +| | WITH( | +| | physical_metric_table = 'true' | +| | ) | ++-----------------------+------------------------------------------------------+ + +-- Verify select * works and returns empty +SELECT * FROM metric_physical_table; + +++ +++ + +SELECT * FROM logical_table_v1; + +++ +++ + +SELECT * FROM logical_table_v2; + +++ +++ + +DROP TABLE logical_table_v1; + +Affected Rows: 0 + +DROP TABLE logical_table_v2; + +Affected Rows: 0 + +DROP TABLE metric_physical_table; + +Affected Rows: 0 + diff --git a/tests/cases/distributed/repartition/repartition.sql b/tests/cases/distributed/repartition/repartition.sql index 35fd01feb3..d2051a9a15 100644 --- a/tests/cases/distributed/repartition/repartition.sql +++ b/tests/cases/distributed/repartition/repartition.sql @@ -46,3 +46,78 @@ ALTER TABLE alter_repartition_table REPARTITION ( ); DROP TABLE alter_repartition_table; + +-- Metric engine repartition test +CREATE TABLE metric_physical_table ( + ts TIMESTAMP TIME INDEX, + host STRING, + cpu DOUBLE, + PRIMARY KEY(host) +) +PARTITION ON COLUMNS (host) ( + host < 'h1', + host >= 'h1' AND host < 'h2', + host >= 'h2' +) +ENGINE = metric +WITH ( + physical_metric_table = "true" +); + +CREATE TABLE logical_table_v1 ( + ts TIMESTAMP TIME INDEX, + host STRING PRIMARY KEY, + cpu DOUBLE, +) +ENGINE = metric +WITH ( + on_physical_table = "metric_physical_table" +); + +CREATE TABLE logical_table_v2 ( + ts TIMESTAMP TIME INDEX, + host STRING PRIMARY KEY, + cpu DOUBLE, +) +ENGINE = metric +WITH ( + on_physical_table = "metric_physical_table" +); + +-- Split physical table partition +ALTER TABLE metric_physical_table SPLIT PARTITION ( + host < 'h1' +) INTO ( + host < 'h0', + host >= 'h0' AND host < 'h1' +); + +SHOW CREATE TABLE metric_physical_table; + +-- Verify select * works and returns empty +SELECT * FROM metric_physical_table; + +SELECT * FROM logical_table_v1; + +SELECT * FROM logical_table_v2; + +-- Merge physical table partition +ALTER TABLE metric_physical_table MERGE PARTITION ( + host < 'h0', + host >= 'h0' AND host < 'h1' +); + +SHOW CREATE TABLE metric_physical_table; + +-- Verify select * works and returns empty +SELECT * FROM metric_physical_table; + +SELECT * FROM logical_table_v1; + +SELECT * FROM logical_table_v2; + +DROP TABLE logical_table_v1; + +DROP TABLE logical_table_v2; + +DROP TABLE metric_physical_table;