diff --git a/src/common/function/src/admin/migrate_region.rs b/src/common/function/src/admin/migrate_region.rs index 0a487973d3..b1f79c0c07 100644 --- a/src/common/function/src/admin/migrate_region.rs +++ b/src/common/function/src/admin/migrate_region.rs @@ -25,12 +25,13 @@ use session::context::QueryContextRef; use crate::handlers::ProcedureServiceHandlerRef; use crate::helper::cast_u64; -const DEFAULT_TIMEOUT_SECS: u64 = 30; +/// The default timeout for migrate region procedure. +const DEFAULT_TIMEOUT_SECS: u64 = 300; /// A function to migrate a region from source peer to target peer. /// Returns the submitted procedure id if success. Only available in cluster mode. /// -/// - `migrate_region(region_id, from_peer, to_peer)`, with timeout(30 seconds). +/// - `migrate_region(region_id, from_peer, to_peer)`, with timeout(300 seconds). /// - `migrate_region(region_id, from_peer, to_peer, timeout(secs))`. /// /// The parameters: diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index afdc14dff0..5e00437332 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -57,6 +57,8 @@ impl Display for RegionIdent { pub struct DowngradeRegionReply { /// Returns the `last_entry_id` if available. pub last_entry_id: Option, + /// Returns the `metadata_last_entry_id` if available (Only available for metric engine). + pub metadata_last_entry_id: Option, /// Indicates whether the region exists. pub exists: bool, /// Return error if any during the operation. @@ -136,16 +138,14 @@ pub struct DowngradeRegion { /// `None` stands for don't flush before downgrading the region. #[serde(default)] pub flush_timeout: Option, - /// Rejects all write requests after flushing. - pub reject_write: bool, } impl Display for DowngradeRegion { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "DowngradeRegion(region_id={}, flush_timeout={:?}, rejct_write={})", - self.region_id, self.flush_timeout, self.reject_write + "DowngradeRegion(region_id={}, flush_timeout={:?})", + self.region_id, self.flush_timeout, ) } } @@ -157,6 +157,8 @@ pub struct UpgradeRegion { pub region_id: RegionId, /// The `last_entry_id` of old leader region. pub last_entry_id: Option, + /// The `last_entry_id` of old leader metadata region (Only used for metric engine). + pub metadata_last_entry_id: Option, /// The timeout of waiting for a wal replay. /// /// `None` stands for no wait, diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index bf93d15128..17950847ed 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -220,7 +220,6 @@ mod tests { let instruction = Instruction::DowngradeRegion(DowngradeRegion { region_id: RegionId::new(2048, 1), flush_timeout: Some(Duration::from_secs(1)), - reject_write: false, }); assert!(heartbeat_handler .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction)))); @@ -229,6 +228,7 @@ mod tests { let instruction = Instruction::UpgradeRegion(UpgradeRegion { region_id, last_entry_id: None, + metadata_last_entry_id: None, replay_timeout: None, location_id: None, }); @@ -419,7 +419,6 @@ mod tests { let instruction = Instruction::DowngradeRegion(DowngradeRegion { region_id, flush_timeout: Some(Duration::from_secs(1)), - reject_write: false, }); let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction)); @@ -442,7 +441,6 @@ mod tests { let instruction = Instruction::DowngradeRegion(DowngradeRegion { region_id: RegionId::new(2048, 1), flush_timeout: Some(Duration::from_secs(1)), - reject_write: false, }); let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction)); let control = heartbeat_handler.handle(&mut ctx).await.unwrap(); diff --git a/src/datanode/src/heartbeat/handler/downgrade_region.rs b/src/datanode/src/heartbeat/handler/downgrade_region.rs index 216a460921..d82e4e065b 100644 --- a/src/datanode/src/heartbeat/handler/downgrade_region.rs +++ b/src/datanode/src/heartbeat/handler/downgrade_region.rs @@ -14,7 +14,7 @@ use common_meta::instruction::{DowngradeRegion, DowngradeRegionReply, InstructionReply}; use common_telemetry::tracing::info; -use common_telemetry::warn; +use common_telemetry::{error, warn}; use futures_util::future::BoxFuture; use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState}; use store_api::region_request::{RegionFlushRequest, RegionRequest}; @@ -33,25 +33,32 @@ impl HandlerContext { .set_region_role_state_gracefully(region_id, SettableRegionRoleState::Follower) .await { - Ok(SetRegionRoleStateResponse::Success { last_entry_id }) => { + Ok(SetRegionRoleStateResponse::Success(success)) => { Some(InstructionReply::DowngradeRegion(DowngradeRegionReply { - last_entry_id, + last_entry_id: success.last_entry_id(), + metadata_last_entry_id: success.metadata_last_entry_id(), exists: true, error: None, })) } Ok(SetRegionRoleStateResponse::NotFound) => { + warn!("Region: {region_id} is not found"); Some(InstructionReply::DowngradeRegion(DowngradeRegionReply { last_entry_id: None, + metadata_last_entry_id: None, exists: false, error: None, })) } - Err(err) => Some(InstructionReply::DowngradeRegion(DowngradeRegionReply { - last_entry_id: None, - exists: true, - error: Some(format!("{err:?}")), - })), + Err(err) => { + error!(err; "Failed to convert region to {}", SettableRegionRoleState::Follower); + Some(InstructionReply::DowngradeRegion(DowngradeRegionReply { + last_entry_id: None, + metadata_last_entry_id: None, + exists: true, + error: Some(format!("{err:?}")), + })) + } } } @@ -60,7 +67,6 @@ impl HandlerContext { DowngradeRegion { region_id, flush_timeout, - reject_write, }: DowngradeRegion, ) -> BoxFuture<'static, Option> { Box::pin(async move { @@ -68,6 +74,7 @@ impl HandlerContext { warn!("Region: {region_id} is not found"); return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply { last_entry_id: None, + metadata_last_entry_id: None, exists: false, error: None, })); @@ -89,33 +96,35 @@ impl HandlerContext { return self.downgrade_to_follower_gracefully(region_id).await; }; - if reject_write { - // Sets region to downgrading, the downgrading region will reject all write requests. - match self - .region_server - .set_region_role_state_gracefully( - region_id, - SettableRegionRoleState::DowngradingLeader, - ) - .await - { - Ok(SetRegionRoleStateResponse::Success { .. }) => {} - Ok(SetRegionRoleStateResponse::NotFound) => { - warn!("Region: {region_id} is not found"); - return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply { - last_entry_id: None, - exists: false, - error: None, - })); - } - Err(err) => { - warn!(err; "Failed to convert region to downgrading leader"); - return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply { - last_entry_id: None, - exists: true, - error: Some(format!("{err:?}")), - })); - } + // Sets region to downgrading, + // the downgrading region will reject all write requests. + // However, the downgrading region will still accept read, flush requests. + match self + .region_server + .set_region_role_state_gracefully( + region_id, + SettableRegionRoleState::DowngradingLeader, + ) + .await + { + Ok(SetRegionRoleStateResponse::Success { .. }) => {} + Ok(SetRegionRoleStateResponse::NotFound) => { + warn!("Region: {region_id} is not found"); + return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply { + last_entry_id: None, + metadata_last_entry_id: None, + exists: false, + error: None, + })); + } + Err(err) => { + error!(err; "Failed to convert region to downgrading leader"); + return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply { + last_entry_id: None, + metadata_last_entry_id: None, + exists: true, + error: Some(format!("{err:?}")), + })); } } @@ -144,20 +153,25 @@ impl HandlerContext { } let mut watcher = register_result.into_watcher(); - let result = self.catchup_tasks.wait(&mut watcher, flush_timeout).await; + let result = self.downgrade_tasks.wait(&mut watcher, flush_timeout).await; match result { WaitResult::Timeout => { Some(InstructionReply::DowngradeRegion(DowngradeRegionReply { last_entry_id: None, + metadata_last_entry_id: None, exists: true, - error: Some(format!("Flush region: {region_id} is timeout")), + error: Some(format!( + "Flush region timeout, region: {region_id}, timeout: {:?}", + flush_timeout + )), })) } WaitResult::Finish(Ok(_)) => self.downgrade_to_follower_gracefully(region_id).await, WaitResult::Finish(Err(err)) => { Some(InstructionReply::DowngradeRegion(DowngradeRegionReply { last_entry_id: None, + metadata_last_entry_id: None, exists: true, error: Some(format!("{err:?}")), })) @@ -174,7 +188,9 @@ mod tests { use common_meta::instruction::{DowngradeRegion, InstructionReply}; use mito2::engine::MITO_ENGINE_NAME; - use store_api::region_engine::{RegionRole, SetRegionRoleStateResponse}; + use store_api::region_engine::{ + RegionRole, SetRegionRoleStateResponse, SetRegionRoleStateSuccess, + }; use store_api::region_request::RegionRequest; use store_api::storage::RegionId; use tokio::time::Instant; @@ -198,7 +214,6 @@ mod tests { .handle_downgrade_region_instruction(DowngradeRegion { region_id, flush_timeout, - reject_write: false, }) .await; assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_))); @@ -227,7 +242,9 @@ mod tests { Ok(0) })); region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| { - Ok(SetRegionRoleStateResponse::success(Some(1024))) + Ok(SetRegionRoleStateResponse::success( + SetRegionRoleStateSuccess::mito(1024), + )) })) }); mock_region_server.register_test_region(region_id, mock_engine); @@ -240,7 +257,6 @@ mod tests { .handle_downgrade_region_instruction(DowngradeRegion { region_id, flush_timeout, - reject_write: false, }) .await; assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_))); @@ -262,7 +278,9 @@ mod tests { region_engine.mock_role = Some(Some(RegionRole::Leader)); region_engine.handle_request_delay = Some(Duration::from_secs(100)); region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| { - Ok(SetRegionRoleStateResponse::success(Some(1024))) + Ok(SetRegionRoleStateResponse::success( + SetRegionRoleStateSuccess::mito(1024), + )) })) }); mock_region_server.register_test_region(region_id, mock_engine); @@ -274,7 +292,6 @@ mod tests { .handle_downgrade_region_instruction(DowngradeRegion { region_id, flush_timeout: Some(flush_timeout), - reject_write: false, }) .await; assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_))); @@ -295,7 +312,9 @@ mod tests { region_engine.mock_role = Some(Some(RegionRole::Leader)); region_engine.handle_request_delay = Some(Duration::from_millis(300)); region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| { - Ok(SetRegionRoleStateResponse::success(Some(1024))) + Ok(SetRegionRoleStateResponse::success( + SetRegionRoleStateSuccess::mito(1024), + )) })) }); mock_region_server.register_test_region(region_id, mock_engine); @@ -312,7 +331,6 @@ mod tests { .handle_downgrade_region_instruction(DowngradeRegion { region_id, flush_timeout, - reject_write: false, }) .await; assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_))); @@ -327,7 +345,6 @@ mod tests { .handle_downgrade_region_instruction(DowngradeRegion { region_id, flush_timeout: Some(Duration::from_millis(500)), - reject_write: false, }) .await; assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_))); @@ -356,7 +373,9 @@ mod tests { .fail() })); region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| { - Ok(SetRegionRoleStateResponse::success(Some(1024))) + Ok(SetRegionRoleStateResponse::success( + SetRegionRoleStateSuccess::mito(1024), + )) })) }); mock_region_server.register_test_region(region_id, mock_engine); @@ -373,7 +392,6 @@ mod tests { .handle_downgrade_region_instruction(DowngradeRegion { region_id, flush_timeout, - reject_write: false, }) .await; assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_))); @@ -388,7 +406,6 @@ mod tests { .handle_downgrade_region_instruction(DowngradeRegion { region_id, flush_timeout: Some(Duration::from_millis(500)), - reject_write: false, }) .await; assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_))); @@ -419,7 +436,6 @@ mod tests { .handle_downgrade_region_instruction(DowngradeRegion { region_id, flush_timeout: None, - reject_write: false, }) .await; assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_))); @@ -451,7 +467,6 @@ mod tests { .handle_downgrade_region_instruction(DowngradeRegion { region_id, flush_timeout: None, - reject_write: false, }) .await; assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_))); diff --git a/src/datanode/src/heartbeat/handler/upgrade_region.rs b/src/datanode/src/heartbeat/handler/upgrade_region.rs index a23ae71a3d..b9a324c197 100644 --- a/src/datanode/src/heartbeat/handler/upgrade_region.rs +++ b/src/datanode/src/heartbeat/handler/upgrade_region.rs @@ -26,6 +26,7 @@ impl HandlerContext { UpgradeRegion { region_id, last_entry_id, + metadata_last_entry_id, replay_timeout, location_id, }: UpgradeRegion, @@ -63,6 +64,7 @@ impl HandlerContext { RegionRequest::Catchup(RegionCatchupRequest { set_writable: true, entry_id: last_entry_id, + metadata_entry_id: metadata_last_entry_id, location_id, }), ) @@ -147,6 +149,7 @@ mod tests { .handle_upgrade_region_instruction(UpgradeRegion { region_id, last_entry_id: None, + metadata_last_entry_id: None, replay_timeout, location_id: None, }) @@ -185,6 +188,7 @@ mod tests { .handle_upgrade_region_instruction(UpgradeRegion { region_id, last_entry_id: None, + metadata_last_entry_id: None, replay_timeout, location_id: None, }) @@ -224,6 +228,7 @@ mod tests { .handle_upgrade_region_instruction(UpgradeRegion { region_id, last_entry_id: None, + metadata_last_entry_id: None, replay_timeout, location_id: None, }) @@ -267,6 +272,7 @@ mod tests { region_id, replay_timeout, last_entry_id: None, + metadata_last_entry_id: None, location_id: None, }) .await; @@ -284,6 +290,7 @@ mod tests { .handle_upgrade_region_instruction(UpgradeRegion { region_id, last_entry_id: None, + metadata_last_entry_id: None, replay_timeout: Some(Duration::from_millis(500)), location_id: None, }) @@ -326,6 +333,7 @@ mod tests { .handle_upgrade_region_instruction(UpgradeRegion { region_id, last_entry_id: None, + metadata_last_entry_id: None, replay_timeout: None, location_id: None, }) @@ -344,6 +352,7 @@ mod tests { .handle_upgrade_region_instruction(UpgradeRegion { region_id, last_entry_id: None, + metadata_last_entry_id: None, replay_timeout: Some(Duration::from_millis(200)), location_id: None, }) diff --git a/src/file-engine/src/engine.rs b/src/file-engine/src/engine.rs index 9e9d1aa405..cf5e5c7576 100644 --- a/src/file-engine/src/engine.rs +++ b/src/file-engine/src/engine.rs @@ -27,7 +27,8 @@ use snafu::{ensure, OptionExt}; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{ RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic, - SetRegionRoleStateResponse, SettableRegionRoleState, SinglePartitionScanner, + SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState, + SinglePartitionScanner, }; use store_api::region_request::{ AffectedRows, RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest, @@ -132,7 +133,9 @@ impl RegionEngine for FileRegionEngine { let exists = self.inner.get_region(region_id).await.is_some(); if exists { - Ok(SetRegionRoleStateResponse::success(None)) + Ok(SetRegionRoleStateResponse::success( + SetRegionRoleStateSuccess::file(), + )) } else { Ok(SetRegionRoleStateResponse::NotFound) } diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index f13939195b..a98320f9e7 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -127,6 +127,8 @@ pub struct VolatileContext { leader_region_lease_deadline: Option, /// The last_entry_id of leader region. leader_region_last_entry_id: Option, + /// The last_entry_id of leader metadata region (Only used for metric engine). + leader_region_metadata_last_entry_id: Option, /// Elapsed time of downgrading region and upgrading region. operations_elapsed: Duration, } @@ -148,6 +150,11 @@ impl VolatileContext { pub fn set_last_entry_id(&mut self, last_entry_id: u64) { self.leader_region_last_entry_id = Some(last_entry_id) } + + /// Sets the `leader_region_metadata_last_entry_id`. + pub fn set_metadata_last_entry_id(&mut self, last_entry_id: u64) { + self.leader_region_metadata_last_entry_id = Some(last_entry_id); + } } /// Used to generate new [Context]. diff --git a/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs b/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs index 4e09e421d0..94256ba5ec 100644 --- a/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs +++ b/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs @@ -16,7 +16,7 @@ use std::any::Any; use std::time::Duration; use api::v1::meta::MailboxMessage; -use common_meta::distributed_time_constants::MAILBOX_RTT_SECS; +use common_meta::distributed_time_constants::REGION_LEASE_SECS; use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; use common_meta::key::datanode_table::RegionInfo; use common_meta::RegionIdent; @@ -31,7 +31,8 @@ use crate::procedure::region_migration::migration_end::RegionMigrationEnd; use crate::procedure::region_migration::{Context, State}; use crate::service::mailbox::Channel; -const CLOSE_DOWNGRADED_REGION_TIMEOUT: Duration = Duration::from_secs(MAILBOX_RTT_SECS); +/// Uses lease time of a region as the timeout of closing a downgraded region. +const CLOSE_DOWNGRADED_REGION_TIMEOUT: Duration = Duration::from_secs(REGION_LEASE_SECS); #[derive(Debug, Serialize, Deserialize)] pub struct CloseDowngradedRegion; diff --git a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs index 41d437d466..02b7216fe7 100644 --- a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs @@ -22,10 +22,8 @@ use common_meta::instruction::{ }; use common_procedure::Status; use common_telemetry::{error, info, warn}; -use common_wal::options::WalOptions; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; -use store_api::storage::RegionId; use tokio::time::{sleep, Instant}; use crate::error::{self, Result}; @@ -97,32 +95,15 @@ impl DowngradeLeaderRegion { &self, ctx: &Context, flush_timeout: Duration, - reject_write: bool, ) -> Instruction { let pc = &ctx.persistent_ctx; let region_id = pc.region_id; Instruction::DowngradeRegion(DowngradeRegion { region_id, flush_timeout: Some(flush_timeout), - reject_write, }) } - async fn should_reject_write(ctx: &mut Context, region_id: RegionId) -> Result { - let datanode_table_value = ctx.get_from_peer_datanode_table_value().await?; - if let Some(wal_option) = datanode_table_value - .region_info - .region_wal_options - .get(®ion_id.region_number()) - { - let options: WalOptions = serde_json::from_str(wal_option) - .with_context(|_| error::DeserializeFromJsonSnafu { input: wal_option })?; - return Ok(matches!(options, WalOptions::RaftEngine)); - } - - Ok(true) - } - /// Tries to downgrade a leader region. /// /// Retry: @@ -143,9 +124,7 @@ impl DowngradeLeaderRegion { .context(error::ExceededDeadlineSnafu { operation: "Downgrade region", })?; - let reject_write = Self::should_reject_write(ctx, region_id).await?; - let downgrade_instruction = - self.build_downgrade_region_instruction(ctx, operation_timeout, reject_write); + let downgrade_instruction = self.build_downgrade_region_instruction(ctx, operation_timeout); let leader = &ctx.persistent_ctx.from_peer; let msg = MailboxMessage::json_message( @@ -174,6 +153,7 @@ impl DowngradeLeaderRegion { ); let InstructionReply::DowngradeRegion(DowngradeRegionReply { last_entry_id, + metadata_last_entry_id, exists, error, }) = reply @@ -202,9 +182,10 @@ impl DowngradeLeaderRegion { ); } else { info!( - "Region {} leader is downgraded, last_entry_id: {:?}, elapsed: {:?}", + "Region {} leader is downgraded, last_entry_id: {:?}, metadata_last_entry_id: {:?}, elapsed: {:?}", region_id, last_entry_id, + metadata_last_entry_id, now.elapsed() ); } @@ -213,6 +194,11 @@ impl DowngradeLeaderRegion { ctx.volatile_ctx.set_last_entry_id(last_entry_id); } + if let Some(metadata_last_entry_id) = metadata_last_entry_id { + ctx.volatile_ctx + .set_metadata_last_entry_id(metadata_last_entry_id); + } + Ok(()) } Err(error::Error::MailboxTimeout { .. }) => { @@ -276,7 +262,6 @@ mod tests { use common_meta::key::test_utils::new_test_table_info; use common_meta::peer::Peer; use common_meta::rpc::router::{Region, RegionRoute}; - use common_wal::options::KafkaWalOptions; use store_api::storage::RegionId; use tokio::time::Instant; @@ -331,41 +316,6 @@ mod tests { assert!(!err.is_retryable()); } - #[tokio::test] - async fn test_should_reject_writes() { - let persistent_context = new_persistent_context(); - let region_id = persistent_context.region_id; - let env = TestingEnv::new(); - let mut ctx = env.context_factory().new_context(persistent_context); - let wal_options = - HashMap::from([(1, serde_json::to_string(&WalOptions::RaftEngine).unwrap())]); - prepare_table_metadata(&ctx, wal_options).await; - - let reject_write = DowngradeLeaderRegion::should_reject_write(&mut ctx, region_id) - .await - .unwrap(); - assert!(reject_write); - - // Remote WAL - let persistent_context = new_persistent_context(); - let region_id = persistent_context.region_id; - let env = TestingEnv::new(); - let mut ctx = env.context_factory().new_context(persistent_context); - let wal_options = HashMap::from([( - 1, - serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions { - topic: "my_topic".to_string(), - })) - .unwrap(), - )]); - prepare_table_metadata(&ctx, wal_options).await; - - let reject_write = DowngradeLeaderRegion::should_reject_write(&mut ctx, region_id) - .await - .unwrap(); - assert!(!reject_write); - } - #[tokio::test] async fn test_pusher_dropped() { let state = DowngradeLeaderRegion::default(); diff --git a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs index 6f42c670dd..552b9d3863 100644 --- a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs @@ -76,10 +76,12 @@ impl UpgradeCandidateRegion { let pc = &ctx.persistent_ctx; let region_id = pc.region_id; let last_entry_id = ctx.volatile_ctx.leader_region_last_entry_id; + let metadata_last_entry_id = ctx.volatile_ctx.leader_region_metadata_last_entry_id; Instruction::UpgradeRegion(UpgradeRegion { region_id, last_entry_id, + metadata_last_entry_id, replay_timeout: Some(replay_timeout), location_id: Some(ctx.persistent_ctx.from_peer.id), }) diff --git a/src/meta-srv/src/procedure/test_util.rs b/src/meta-srv/src/procedure/test_util.rs index 61254b133b..34ce23abd4 100644 --- a/src/meta-srv/src/procedure/test_util.rs +++ b/src/meta-srv/src/procedure/test_util.rs @@ -135,6 +135,7 @@ pub fn new_downgrade_region_reply( payload: Some(Payload::Json( serde_json::to_string(&InstructionReply::DowngradeRegion(DowngradeRegionReply { last_entry_id, + metadata_last_entry_id: None, exists: exist, error, })) diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index d1e837d078..74978fda78 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -40,7 +40,7 @@ use store_api::metadata::RegionMetadataRef; use store_api::metric_engine_consts::METRIC_ENGINE_NAME; use store_api::region_engine::{ RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic, - SetRegionRoleStateResponse, SettableRegionRoleState, + SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState, }; use store_api::region_request::{BatchRegionDdlRequest, RegionRequest}; use store_api::storage::{RegionId, ScanRequest, SequenceNumber}; @@ -352,17 +352,39 @@ impl RegionEngine for MetricEngine { region_id: RegionId, region_role_state: SettableRegionRoleState, ) -> std::result::Result { - self.inner + let metadata_result = match self + .inner .mito .set_region_role_state_gracefully( utils::to_metadata_region_id(region_id), region_role_state, ) - .await?; - self.inner + .await? + { + SetRegionRoleStateResponse::Success(success) => success, + SetRegionRoleStateResponse::NotFound => { + return Ok(SetRegionRoleStateResponse::NotFound) + } + }; + + let data_result = match self + .inner .mito .set_region_role_state_gracefully(region_id, region_role_state) - .await + .await? + { + SetRegionRoleStateResponse::Success(success) => success, + SetRegionRoleStateResponse::NotFound => { + return Ok(SetRegionRoleStateResponse::NotFound) + } + }; + + Ok(SetRegionRoleStateResponse::success( + SetRegionRoleStateSuccess::metric( + data_result.last_entry_id().unwrap_or_default(), + metadata_result.last_entry_id().unwrap_or_default(), + ), + )) } /// Returns the physical region role. diff --git a/src/metric-engine/src/engine/catchup.rs b/src/metric-engine/src/engine/catchup.rs index 44713f0bc4..d2e92f6e0e 100644 --- a/src/metric-engine/src/engine/catchup.rs +++ b/src/metric-engine/src/engine/catchup.rs @@ -56,7 +56,8 @@ impl MetricEngineInner { metadata_region_id, RegionRequest::Catchup(RegionCatchupRequest { set_writable: req.set_writable, - entry_id: None, + entry_id: req.metadata_entry_id, + metadata_entry_id: None, location_id: req.location_id, }), ) @@ -70,6 +71,7 @@ impl MetricEngineInner { RegionRequest::Catchup(RegionCatchupRequest { set_writable: req.set_writable, entry_id: req.entry_id, + metadata_entry_id: None, location_id: req.location_id, }), ) diff --git a/src/mito2/src/engine/catchup_test.rs b/src/mito2/src/engine/catchup_test.rs index a9de0d6008..8a24fecf3a 100644 --- a/src/mito2/src/engine/catchup_test.rs +++ b/src/mito2/src/engine/catchup_test.rs @@ -35,8 +35,8 @@ use crate::test_util::{ use crate::wal::EntryId; fn get_last_entry_id(resp: SetRegionRoleStateResponse) -> Option { - if let SetRegionRoleStateResponse::Success { last_entry_id } = resp { - last_entry_id + if let SetRegionRoleStateResponse::Success(success) = resp { + success.last_entry_id() } else { unreachable!(); } @@ -118,6 +118,7 @@ async fn test_catchup_with_last_entry_id(factory: Option) { RegionRequest::Catchup(RegionCatchupRequest { set_writable: false, entry_id: last_entry_id, + metadata_entry_id: None, location_id: None, }), ) @@ -150,6 +151,7 @@ async fn test_catchup_with_last_entry_id(factory: Option) { RegionRequest::Catchup(RegionCatchupRequest { set_writable: true, entry_id: last_entry_id, + metadata_entry_id: None, location_id: None, }), ) @@ -237,6 +239,7 @@ async fn test_catchup_with_incorrect_last_entry_id(factory: Option) { RegionRequest::Catchup(RegionCatchupRequest { set_writable: false, entry_id: None, + metadata_entry_id: None, location_id: None, }), ) @@ -353,6 +358,7 @@ async fn test_catchup_without_last_entry_id(factory: Option) { RegionRequest::Catchup(RegionCatchupRequest { set_writable: true, entry_id: None, + metadata_entry_id: None, location_id: None, }), ) @@ -442,6 +448,7 @@ async fn test_catchup_with_manifest_update(factory: Option) { RegionRequest::Catchup(RegionCatchupRequest { set_writable: false, entry_id: None, + metadata_entry_id: None, location_id: None, }), ) @@ -479,6 +486,7 @@ async fn test_catchup_with_manifest_update(factory: Option) { RegionRequest::Catchup(RegionCatchupRequest { set_writable: true, entry_id: None, + metadata_entry_id: None, location_id: None, }), ) @@ -501,6 +509,7 @@ async fn test_catchup_not_exist() { RegionRequest::Catchup(RegionCatchupRequest { set_writable: true, entry_id: None, + metadata_entry_id: None, location_id: None, }), ) diff --git a/src/mito2/src/engine/set_role_state_test.rs b/src/mito2/src/engine/set_role_state_test.rs index 2a4cb9f9ca..93dbc69407 100644 --- a/src/mito2/src/engine/set_role_state_test.rs +++ b/src/mito2/src/engine/set_role_state_test.rs @@ -16,7 +16,8 @@ use api::v1::Rows; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use store_api::region_engine::{ - RegionEngine, RegionRole, SetRegionRoleStateResponse, SettableRegionRoleState, + RegionEngine, RegionRole, SetRegionRoleStateResponse, SetRegionRoleStateSuccess, + SettableRegionRoleState, }; use store_api::region_request::{RegionPutRequest, RegionRequest}; use store_api::storage::RegionId; @@ -48,9 +49,7 @@ async fn test_set_role_state_gracefully() { .await .unwrap(); assert_eq!( - SetRegionRoleStateResponse::Success { - last_entry_id: Some(0) - }, + SetRegionRoleStateResponse::success(SetRegionRoleStateSuccess::mito(0)), result ); @@ -60,9 +59,7 @@ async fn test_set_role_state_gracefully() { .await .unwrap(); assert_eq!( - SetRegionRoleStateResponse::Success { - last_entry_id: Some(0) - }, + SetRegionRoleStateResponse::success(SetRegionRoleStateSuccess::mito(0)), result ); @@ -96,9 +93,7 @@ async fn test_set_role_state_gracefully() { .unwrap(); assert_eq!( - SetRegionRoleStateResponse::Success { - last_entry_id: Some(1) - }, + SetRegionRoleStateResponse::success(SetRegionRoleStateSuccess::mito(1)), result ); } @@ -144,9 +139,7 @@ async fn test_write_downgrading_region() { .await .unwrap(); assert_eq!( - SetRegionRoleStateResponse::Success { - last_entry_id: Some(1) - }, + SetRegionRoleStateResponse::success(SetRegionRoleStateSuccess::mito(1)), result ); diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 0eb5abff41..7aee65b651 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -41,7 +41,9 @@ use prometheus::IntGauge; use rand::{rng, Rng}; use snafu::{ensure, ResultExt}; use store_api::logstore::LogStore; -use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState}; +use store_api::region_engine::{ + SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState, +}; use store_api::storage::RegionId; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::{mpsc, oneshot, watch, Mutex}; @@ -931,7 +933,9 @@ impl RegionWorkerLoop { region.set_role_state_gracefully(region_role_state).await; let last_entry_id = region.version_control.current().last_entry_id; - let _ = sender.send(SetRegionRoleStateResponse::success(Some(last_entry_id))); + let _ = sender.send(SetRegionRoleStateResponse::success( + SetRegionRoleStateSuccess::mito(last_entry_id), + )); }); } else { let _ = sender.send(SetRegionRoleStateResponse::NotFound); diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index d4df5216f4..0a38700f1d 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -47,6 +47,15 @@ pub enum SettableRegionRoleState { DowngradingLeader, } +impl Display for SettableRegionRoleState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + SettableRegionRoleState::Follower => write!(f, "Follower"), + SettableRegionRoleState::DowngradingLeader => write!(f, "Leader(Downgrading)"), + } + } +} + impl From for RegionRole { fn from(value: SettableRegionRoleState) -> Self { match value { @@ -63,20 +72,78 @@ pub struct SetRegionRoleStateRequest { region_role_state: SettableRegionRoleState, } +/// The success response of setting region role state. +#[derive(Debug, PartialEq, Eq)] +pub enum SetRegionRoleStateSuccess { + File, + Mito { + last_entry_id: entry::Id, + }, + Metric { + last_entry_id: entry::Id, + metadata_last_entry_id: entry::Id, + }, +} + +impl SetRegionRoleStateSuccess { + /// Returns a [SetRegionRoleStateSuccess::File]. + pub fn file() -> Self { + Self::File + } + + /// Returns a [SetRegionRoleStateSuccess::Mito] with the `last_entry_id`. + pub fn mito(last_entry_id: entry::Id) -> Self { + SetRegionRoleStateSuccess::Mito { last_entry_id } + } + + /// Returns a [SetRegionRoleStateSuccess::Metric] with the `last_entry_id` and `metadata_last_entry_id`. + pub fn metric(last_entry_id: entry::Id, metadata_last_entry_id: entry::Id) -> Self { + SetRegionRoleStateSuccess::Metric { + last_entry_id, + metadata_last_entry_id, + } + } +} + +impl SetRegionRoleStateSuccess { + /// Returns the last entry id of the region. + pub fn last_entry_id(&self) -> Option { + match self { + SetRegionRoleStateSuccess::File => None, + SetRegionRoleStateSuccess::Mito { last_entry_id } => Some(*last_entry_id), + SetRegionRoleStateSuccess::Metric { last_entry_id, .. } => Some(*last_entry_id), + } + } + + /// Returns the last entry id of the metadata of the region. + pub fn metadata_last_entry_id(&self) -> Option { + match self { + SetRegionRoleStateSuccess::File => None, + SetRegionRoleStateSuccess::Mito { .. } => None, + SetRegionRoleStateSuccess::Metric { + metadata_last_entry_id, + .. + } => Some(*metadata_last_entry_id), + } + } +} + /// The response of setting region role state. #[derive(Debug, PartialEq, Eq)] pub enum SetRegionRoleStateResponse { - Success { - /// Returns `last_entry_id` of the region if available(e.g., It's not available in file engine). - last_entry_id: Option, - }, + Success(SetRegionRoleStateSuccess), NotFound, } impl SetRegionRoleStateResponse { - /// Returns a [SetRegionRoleStateResponse::Success] with the `last_entry_id`. - pub fn success(last_entry_id: Option) -> Self { - Self::Success { last_entry_id } + /// Returns a [SetRegionRoleStateResponse::Success] with the `File` success. + pub fn success(success: SetRegionRoleStateSuccess) -> Self { + Self::Success(success) + } + + /// Returns true if the response is a [SetRegionRoleStateResponse::NotFound]. + pub fn is_not_found(&self) -> bool { + matches!(self, SetRegionRoleStateResponse::NotFound) } } diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index d11963987e..638f2ee114 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -1105,6 +1105,10 @@ pub struct RegionCatchupRequest { /// The `entry_id` that was expected to reply to. /// `None` stands replaying to latest. pub entry_id: Option, + /// Used for metrics metadata region. + /// The `entry_id` that was expected to reply to. + /// `None` stands replaying to latest. + pub metadata_entry_id: Option, /// The hint for replaying memtable. pub location_id: Option, }