fix: alway rejects write while downgrading region (#5842)

* fix: alway rejects write while downgrading region

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2025-04-11 14:42:41 +08:00
committed by GitHub
parent 84e2bc52c2
commit 5a36fa5e18
18 changed files with 244 additions and 154 deletions

View File

@@ -25,12 +25,13 @@ use session::context::QueryContextRef;
use crate::handlers::ProcedureServiceHandlerRef;
use crate::helper::cast_u64;
const DEFAULT_TIMEOUT_SECS: u64 = 30;
/// The default timeout for migrate region procedure.
const DEFAULT_TIMEOUT_SECS: u64 = 300;
/// A function to migrate a region from source peer to target peer.
/// Returns the submitted procedure id if success. Only available in cluster mode.
///
/// - `migrate_region(region_id, from_peer, to_peer)`, with timeout(30 seconds).
/// - `migrate_region(region_id, from_peer, to_peer)`, with timeout(300 seconds).
/// - `migrate_region(region_id, from_peer, to_peer, timeout(secs))`.
///
/// The parameters:

View File

@@ -57,6 +57,8 @@ impl Display for RegionIdent {
pub struct DowngradeRegionReply {
/// Returns the `last_entry_id` if available.
pub last_entry_id: Option<u64>,
/// Returns the `metadata_last_entry_id` if available (Only available for metric engine).
pub metadata_last_entry_id: Option<u64>,
/// Indicates whether the region exists.
pub exists: bool,
/// Return error if any during the operation.
@@ -136,16 +138,14 @@ pub struct DowngradeRegion {
/// `None` stands for don't flush before downgrading the region.
#[serde(default)]
pub flush_timeout: Option<Duration>,
/// Rejects all write requests after flushing.
pub reject_write: bool,
}
impl Display for DowngradeRegion {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"DowngradeRegion(region_id={}, flush_timeout={:?}, rejct_write={})",
self.region_id, self.flush_timeout, self.reject_write
"DowngradeRegion(region_id={}, flush_timeout={:?})",
self.region_id, self.flush_timeout,
)
}
}
@@ -157,6 +157,8 @@ pub struct UpgradeRegion {
pub region_id: RegionId,
/// The `last_entry_id` of old leader region.
pub last_entry_id: Option<u64>,
/// The `last_entry_id` of old leader metadata region (Only used for metric engine).
pub metadata_last_entry_id: Option<u64>,
/// The timeout of waiting for a wal replay.
///
/// `None` stands for no wait,

View File

@@ -220,7 +220,6 @@ mod tests {
let instruction = Instruction::DowngradeRegion(DowngradeRegion {
region_id: RegionId::new(2048, 1),
flush_timeout: Some(Duration::from_secs(1)),
reject_write: false,
});
assert!(heartbeat_handler
.is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction))));
@@ -229,6 +228,7 @@ mod tests {
let instruction = Instruction::UpgradeRegion(UpgradeRegion {
region_id,
last_entry_id: None,
metadata_last_entry_id: None,
replay_timeout: None,
location_id: None,
});
@@ -419,7 +419,6 @@ mod tests {
let instruction = Instruction::DowngradeRegion(DowngradeRegion {
region_id,
flush_timeout: Some(Duration::from_secs(1)),
reject_write: false,
});
let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
@@ -442,7 +441,6 @@ mod tests {
let instruction = Instruction::DowngradeRegion(DowngradeRegion {
region_id: RegionId::new(2048, 1),
flush_timeout: Some(Duration::from_secs(1)),
reject_write: false,
});
let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();

View File

@@ -14,7 +14,7 @@
use common_meta::instruction::{DowngradeRegion, DowngradeRegionReply, InstructionReply};
use common_telemetry::tracing::info;
use common_telemetry::warn;
use common_telemetry::{error, warn};
use futures_util::future::BoxFuture;
use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
use store_api::region_request::{RegionFlushRequest, RegionRequest};
@@ -33,25 +33,32 @@ impl HandlerContext {
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::Follower)
.await
{
Ok(SetRegionRoleStateResponse::Success { last_entry_id }) => {
Ok(SetRegionRoleStateResponse::Success(success)) => {
Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id,
last_entry_id: success.last_entry_id(),
metadata_last_entry_id: success.metadata_last_entry_id(),
exists: true,
error: None,
}))
}
Ok(SetRegionRoleStateResponse::NotFound) => {
warn!("Region: {region_id} is not found");
Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
metadata_last_entry_id: None,
exists: false,
error: None,
}))
}
Err(err) => Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
exists: true,
error: Some(format!("{err:?}")),
})),
Err(err) => {
error!(err; "Failed to convert region to {}", SettableRegionRoleState::Follower);
Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
metadata_last_entry_id: None,
exists: true,
error: Some(format!("{err:?}")),
}))
}
}
}
@@ -60,7 +67,6 @@ impl HandlerContext {
DowngradeRegion {
region_id,
flush_timeout,
reject_write,
}: DowngradeRegion,
) -> BoxFuture<'static, Option<InstructionReply>> {
Box::pin(async move {
@@ -68,6 +74,7 @@ impl HandlerContext {
warn!("Region: {region_id} is not found");
return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
metadata_last_entry_id: None,
exists: false,
error: None,
}));
@@ -89,33 +96,35 @@ impl HandlerContext {
return self.downgrade_to_follower_gracefully(region_id).await;
};
if reject_write {
// Sets region to downgrading, the downgrading region will reject all write requests.
match self
.region_server
.set_region_role_state_gracefully(
region_id,
SettableRegionRoleState::DowngradingLeader,
)
.await
{
Ok(SetRegionRoleStateResponse::Success { .. }) => {}
Ok(SetRegionRoleStateResponse::NotFound) => {
warn!("Region: {region_id} is not found");
return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
exists: false,
error: None,
}));
}
Err(err) => {
warn!(err; "Failed to convert region to downgrading leader");
return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
exists: true,
error: Some(format!("{err:?}")),
}));
}
// Sets region to downgrading,
// the downgrading region will reject all write requests.
// However, the downgrading region will still accept read, flush requests.
match self
.region_server
.set_region_role_state_gracefully(
region_id,
SettableRegionRoleState::DowngradingLeader,
)
.await
{
Ok(SetRegionRoleStateResponse::Success { .. }) => {}
Ok(SetRegionRoleStateResponse::NotFound) => {
warn!("Region: {region_id} is not found");
return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
metadata_last_entry_id: None,
exists: false,
error: None,
}));
}
Err(err) => {
error!(err; "Failed to convert region to downgrading leader");
return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
metadata_last_entry_id: None,
exists: true,
error: Some(format!("{err:?}")),
}));
}
}
@@ -144,20 +153,25 @@ impl HandlerContext {
}
let mut watcher = register_result.into_watcher();
let result = self.catchup_tasks.wait(&mut watcher, flush_timeout).await;
let result = self.downgrade_tasks.wait(&mut watcher, flush_timeout).await;
match result {
WaitResult::Timeout => {
Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
metadata_last_entry_id: None,
exists: true,
error: Some(format!("Flush region: {region_id} is timeout")),
error: Some(format!(
"Flush region timeout, region: {region_id}, timeout: {:?}",
flush_timeout
)),
}))
}
WaitResult::Finish(Ok(_)) => self.downgrade_to_follower_gracefully(region_id).await,
WaitResult::Finish(Err(err)) => {
Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
metadata_last_entry_id: None,
exists: true,
error: Some(format!("{err:?}")),
}))
@@ -174,7 +188,9 @@ mod tests {
use common_meta::instruction::{DowngradeRegion, InstructionReply};
use mito2::engine::MITO_ENGINE_NAME;
use store_api::region_engine::{RegionRole, SetRegionRoleStateResponse};
use store_api::region_engine::{
RegionRole, SetRegionRoleStateResponse, SetRegionRoleStateSuccess,
};
use store_api::region_request::RegionRequest;
use store_api::storage::RegionId;
use tokio::time::Instant;
@@ -198,7 +214,6 @@ mod tests {
.handle_downgrade_region_instruction(DowngradeRegion {
region_id,
flush_timeout,
reject_write: false,
})
.await;
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
@@ -227,7 +242,9 @@ mod tests {
Ok(0)
}));
region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
Ok(SetRegionRoleStateResponse::success(Some(1024)))
Ok(SetRegionRoleStateResponse::success(
SetRegionRoleStateSuccess::mito(1024),
))
}))
});
mock_region_server.register_test_region(region_id, mock_engine);
@@ -240,7 +257,6 @@ mod tests {
.handle_downgrade_region_instruction(DowngradeRegion {
region_id,
flush_timeout,
reject_write: false,
})
.await;
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
@@ -262,7 +278,9 @@ mod tests {
region_engine.mock_role = Some(Some(RegionRole::Leader));
region_engine.handle_request_delay = Some(Duration::from_secs(100));
region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
Ok(SetRegionRoleStateResponse::success(Some(1024)))
Ok(SetRegionRoleStateResponse::success(
SetRegionRoleStateSuccess::mito(1024),
))
}))
});
mock_region_server.register_test_region(region_id, mock_engine);
@@ -274,7 +292,6 @@ mod tests {
.handle_downgrade_region_instruction(DowngradeRegion {
region_id,
flush_timeout: Some(flush_timeout),
reject_write: false,
})
.await;
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
@@ -295,7 +312,9 @@ mod tests {
region_engine.mock_role = Some(Some(RegionRole::Leader));
region_engine.handle_request_delay = Some(Duration::from_millis(300));
region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
Ok(SetRegionRoleStateResponse::success(Some(1024)))
Ok(SetRegionRoleStateResponse::success(
SetRegionRoleStateSuccess::mito(1024),
))
}))
});
mock_region_server.register_test_region(region_id, mock_engine);
@@ -312,7 +331,6 @@ mod tests {
.handle_downgrade_region_instruction(DowngradeRegion {
region_id,
flush_timeout,
reject_write: false,
})
.await;
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
@@ -327,7 +345,6 @@ mod tests {
.handle_downgrade_region_instruction(DowngradeRegion {
region_id,
flush_timeout: Some(Duration::from_millis(500)),
reject_write: false,
})
.await;
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
@@ -356,7 +373,9 @@ mod tests {
.fail()
}));
region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
Ok(SetRegionRoleStateResponse::success(Some(1024)))
Ok(SetRegionRoleStateResponse::success(
SetRegionRoleStateSuccess::mito(1024),
))
}))
});
mock_region_server.register_test_region(region_id, mock_engine);
@@ -373,7 +392,6 @@ mod tests {
.handle_downgrade_region_instruction(DowngradeRegion {
region_id,
flush_timeout,
reject_write: false,
})
.await;
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
@@ -388,7 +406,6 @@ mod tests {
.handle_downgrade_region_instruction(DowngradeRegion {
region_id,
flush_timeout: Some(Duration::from_millis(500)),
reject_write: false,
})
.await;
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
@@ -419,7 +436,6 @@ mod tests {
.handle_downgrade_region_instruction(DowngradeRegion {
region_id,
flush_timeout: None,
reject_write: false,
})
.await;
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
@@ -451,7 +467,6 @@ mod tests {
.handle_downgrade_region_instruction(DowngradeRegion {
region_id,
flush_timeout: None,
reject_write: false,
})
.await;
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));

View File

@@ -26,6 +26,7 @@ impl HandlerContext {
UpgradeRegion {
region_id,
last_entry_id,
metadata_last_entry_id,
replay_timeout,
location_id,
}: UpgradeRegion,
@@ -63,6 +64,7 @@ impl HandlerContext {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: true,
entry_id: last_entry_id,
metadata_entry_id: metadata_last_entry_id,
location_id,
}),
)
@@ -147,6 +149,7 @@ mod tests {
.handle_upgrade_region_instruction(UpgradeRegion {
region_id,
last_entry_id: None,
metadata_last_entry_id: None,
replay_timeout,
location_id: None,
})
@@ -185,6 +188,7 @@ mod tests {
.handle_upgrade_region_instruction(UpgradeRegion {
region_id,
last_entry_id: None,
metadata_last_entry_id: None,
replay_timeout,
location_id: None,
})
@@ -224,6 +228,7 @@ mod tests {
.handle_upgrade_region_instruction(UpgradeRegion {
region_id,
last_entry_id: None,
metadata_last_entry_id: None,
replay_timeout,
location_id: None,
})
@@ -267,6 +272,7 @@ mod tests {
region_id,
replay_timeout,
last_entry_id: None,
metadata_last_entry_id: None,
location_id: None,
})
.await;
@@ -284,6 +290,7 @@ mod tests {
.handle_upgrade_region_instruction(UpgradeRegion {
region_id,
last_entry_id: None,
metadata_last_entry_id: None,
replay_timeout: Some(Duration::from_millis(500)),
location_id: None,
})
@@ -326,6 +333,7 @@ mod tests {
.handle_upgrade_region_instruction(UpgradeRegion {
region_id,
last_entry_id: None,
metadata_last_entry_id: None,
replay_timeout: None,
location_id: None,
})
@@ -344,6 +352,7 @@ mod tests {
.handle_upgrade_region_instruction(UpgradeRegion {
region_id,
last_entry_id: None,
metadata_last_entry_id: None,
replay_timeout: Some(Duration::from_millis(200)),
location_id: None,
})

View File

@@ -27,7 +27,8 @@ use snafu::{ensure, OptionExt};
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{
RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic,
SetRegionRoleStateResponse, SettableRegionRoleState, SinglePartitionScanner,
SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState,
SinglePartitionScanner,
};
use store_api::region_request::{
AffectedRows, RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest,
@@ -132,7 +133,9 @@ impl RegionEngine for FileRegionEngine {
let exists = self.inner.get_region(region_id).await.is_some();
if exists {
Ok(SetRegionRoleStateResponse::success(None))
Ok(SetRegionRoleStateResponse::success(
SetRegionRoleStateSuccess::file(),
))
} else {
Ok(SetRegionRoleStateResponse::NotFound)
}

View File

@@ -127,6 +127,8 @@ pub struct VolatileContext {
leader_region_lease_deadline: Option<Instant>,
/// The last_entry_id of leader region.
leader_region_last_entry_id: Option<u64>,
/// The last_entry_id of leader metadata region (Only used for metric engine).
leader_region_metadata_last_entry_id: Option<u64>,
/// Elapsed time of downgrading region and upgrading region.
operations_elapsed: Duration,
}
@@ -148,6 +150,11 @@ impl VolatileContext {
pub fn set_last_entry_id(&mut self, last_entry_id: u64) {
self.leader_region_last_entry_id = Some(last_entry_id)
}
/// Sets the `leader_region_metadata_last_entry_id`.
pub fn set_metadata_last_entry_id(&mut self, last_entry_id: u64) {
self.leader_region_metadata_last_entry_id = Some(last_entry_id);
}
}
/// Used to generate new [Context].

View File

@@ -16,7 +16,7 @@ use std::any::Any;
use std::time::Duration;
use api::v1::meta::MailboxMessage;
use common_meta::distributed_time_constants::MAILBOX_RTT_SECS;
use common_meta::distributed_time_constants::REGION_LEASE_SECS;
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_meta::key::datanode_table::RegionInfo;
use common_meta::RegionIdent;
@@ -31,7 +31,8 @@ use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
use crate::procedure::region_migration::{Context, State};
use crate::service::mailbox::Channel;
const CLOSE_DOWNGRADED_REGION_TIMEOUT: Duration = Duration::from_secs(MAILBOX_RTT_SECS);
/// Uses lease time of a region as the timeout of closing a downgraded region.
const CLOSE_DOWNGRADED_REGION_TIMEOUT: Duration = Duration::from_secs(REGION_LEASE_SECS);
#[derive(Debug, Serialize, Deserialize)]
pub struct CloseDowngradedRegion;

View File

@@ -22,10 +22,8 @@ use common_meta::instruction::{
};
use common_procedure::Status;
use common_telemetry::{error, info, warn};
use common_wal::options::WalOptions;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use tokio::time::{sleep, Instant};
use crate::error::{self, Result};
@@ -97,32 +95,15 @@ impl DowngradeLeaderRegion {
&self,
ctx: &Context,
flush_timeout: Duration,
reject_write: bool,
) -> Instruction {
let pc = &ctx.persistent_ctx;
let region_id = pc.region_id;
Instruction::DowngradeRegion(DowngradeRegion {
region_id,
flush_timeout: Some(flush_timeout),
reject_write,
})
}
async fn should_reject_write(ctx: &mut Context, region_id: RegionId) -> Result<bool> {
let datanode_table_value = ctx.get_from_peer_datanode_table_value().await?;
if let Some(wal_option) = datanode_table_value
.region_info
.region_wal_options
.get(&region_id.region_number())
{
let options: WalOptions = serde_json::from_str(wal_option)
.with_context(|_| error::DeserializeFromJsonSnafu { input: wal_option })?;
return Ok(matches!(options, WalOptions::RaftEngine));
}
Ok(true)
}
/// Tries to downgrade a leader region.
///
/// Retry:
@@ -143,9 +124,7 @@ impl DowngradeLeaderRegion {
.context(error::ExceededDeadlineSnafu {
operation: "Downgrade region",
})?;
let reject_write = Self::should_reject_write(ctx, region_id).await?;
let downgrade_instruction =
self.build_downgrade_region_instruction(ctx, operation_timeout, reject_write);
let downgrade_instruction = self.build_downgrade_region_instruction(ctx, operation_timeout);
let leader = &ctx.persistent_ctx.from_peer;
let msg = MailboxMessage::json_message(
@@ -174,6 +153,7 @@ impl DowngradeLeaderRegion {
);
let InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id,
metadata_last_entry_id,
exists,
error,
}) = reply
@@ -202,9 +182,10 @@ impl DowngradeLeaderRegion {
);
} else {
info!(
"Region {} leader is downgraded, last_entry_id: {:?}, elapsed: {:?}",
"Region {} leader is downgraded, last_entry_id: {:?}, metadata_last_entry_id: {:?}, elapsed: {:?}",
region_id,
last_entry_id,
metadata_last_entry_id,
now.elapsed()
);
}
@@ -213,6 +194,11 @@ impl DowngradeLeaderRegion {
ctx.volatile_ctx.set_last_entry_id(last_entry_id);
}
if let Some(metadata_last_entry_id) = metadata_last_entry_id {
ctx.volatile_ctx
.set_metadata_last_entry_id(metadata_last_entry_id);
}
Ok(())
}
Err(error::Error::MailboxTimeout { .. }) => {
@@ -276,7 +262,6 @@ mod tests {
use common_meta::key::test_utils::new_test_table_info;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_wal::options::KafkaWalOptions;
use store_api::storage::RegionId;
use tokio::time::Instant;
@@ -331,41 +316,6 @@ mod tests {
assert!(!err.is_retryable());
}
#[tokio::test]
async fn test_should_reject_writes() {
let persistent_context = new_persistent_context();
let region_id = persistent_context.region_id;
let env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let wal_options =
HashMap::from([(1, serde_json::to_string(&WalOptions::RaftEngine).unwrap())]);
prepare_table_metadata(&ctx, wal_options).await;
let reject_write = DowngradeLeaderRegion::should_reject_write(&mut ctx, region_id)
.await
.unwrap();
assert!(reject_write);
// Remote WAL
let persistent_context = new_persistent_context();
let region_id = persistent_context.region_id;
let env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let wal_options = HashMap::from([(
1,
serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
topic: "my_topic".to_string(),
}))
.unwrap(),
)]);
prepare_table_metadata(&ctx, wal_options).await;
let reject_write = DowngradeLeaderRegion::should_reject_write(&mut ctx, region_id)
.await
.unwrap();
assert!(!reject_write);
}
#[tokio::test]
async fn test_pusher_dropped() {
let state = DowngradeLeaderRegion::default();

View File

@@ -76,10 +76,12 @@ impl UpgradeCandidateRegion {
let pc = &ctx.persistent_ctx;
let region_id = pc.region_id;
let last_entry_id = ctx.volatile_ctx.leader_region_last_entry_id;
let metadata_last_entry_id = ctx.volatile_ctx.leader_region_metadata_last_entry_id;
Instruction::UpgradeRegion(UpgradeRegion {
region_id,
last_entry_id,
metadata_last_entry_id,
replay_timeout: Some(replay_timeout),
location_id: Some(ctx.persistent_ctx.from_peer.id),
})

View File

@@ -135,6 +135,7 @@ pub fn new_downgrade_region_reply(
payload: Some(Payload::Json(
serde_json::to_string(&InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id,
metadata_last_entry_id: None,
exists: exist,
error,
}))

View File

@@ -40,7 +40,7 @@ use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use store_api::region_engine::{
RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic,
SetRegionRoleStateResponse, SettableRegionRoleState,
SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState,
};
use store_api::region_request::{BatchRegionDdlRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
@@ -352,17 +352,39 @@ impl RegionEngine for MetricEngine {
region_id: RegionId,
region_role_state: SettableRegionRoleState,
) -> std::result::Result<SetRegionRoleStateResponse, BoxedError> {
self.inner
let metadata_result = match self
.inner
.mito
.set_region_role_state_gracefully(
utils::to_metadata_region_id(region_id),
region_role_state,
)
.await?;
self.inner
.await?
{
SetRegionRoleStateResponse::Success(success) => success,
SetRegionRoleStateResponse::NotFound => {
return Ok(SetRegionRoleStateResponse::NotFound)
}
};
let data_result = match self
.inner
.mito
.set_region_role_state_gracefully(region_id, region_role_state)
.await
.await?
{
SetRegionRoleStateResponse::Success(success) => success,
SetRegionRoleStateResponse::NotFound => {
return Ok(SetRegionRoleStateResponse::NotFound)
}
};
Ok(SetRegionRoleStateResponse::success(
SetRegionRoleStateSuccess::metric(
data_result.last_entry_id().unwrap_or_default(),
metadata_result.last_entry_id().unwrap_or_default(),
),
))
}
/// Returns the physical region role.

View File

@@ -56,7 +56,8 @@ impl MetricEngineInner {
metadata_region_id,
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: req.set_writable,
entry_id: None,
entry_id: req.metadata_entry_id,
metadata_entry_id: None,
location_id: req.location_id,
}),
)
@@ -70,6 +71,7 @@ impl MetricEngineInner {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: req.set_writable,
entry_id: req.entry_id,
metadata_entry_id: None,
location_id: req.location_id,
}),
)

View File

@@ -35,8 +35,8 @@ use crate::test_util::{
use crate::wal::EntryId;
fn get_last_entry_id(resp: SetRegionRoleStateResponse) -> Option<EntryId> {
if let SetRegionRoleStateResponse::Success { last_entry_id } = resp {
last_entry_id
if let SetRegionRoleStateResponse::Success(success) = resp {
success.last_entry_id()
} else {
unreachable!();
}
@@ -118,6 +118,7 @@ async fn test_catchup_with_last_entry_id(factory: Option<LogStoreFactory>) {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: false,
entry_id: last_entry_id,
metadata_entry_id: None,
location_id: None,
}),
)
@@ -150,6 +151,7 @@ async fn test_catchup_with_last_entry_id(factory: Option<LogStoreFactory>) {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: true,
entry_id: last_entry_id,
metadata_entry_id: None,
location_id: None,
}),
)
@@ -237,6 +239,7 @@ async fn test_catchup_with_incorrect_last_entry_id(factory: Option<LogStoreFacto
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: false,
entry_id: incorrect_last_entry_id,
metadata_entry_id: None,
location_id: None,
}),
)
@@ -254,6 +257,7 @@ async fn test_catchup_with_incorrect_last_entry_id(factory: Option<LogStoreFacto
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: false,
entry_id: incorrect_last_entry_id,
metadata_entry_id: None,
location_id: None,
}),
)
@@ -322,6 +326,7 @@ async fn test_catchup_without_last_entry_id(factory: Option<LogStoreFactory>) {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: false,
entry_id: None,
metadata_entry_id: None,
location_id: None,
}),
)
@@ -353,6 +358,7 @@ async fn test_catchup_without_last_entry_id(factory: Option<LogStoreFactory>) {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: true,
entry_id: None,
metadata_entry_id: None,
location_id: None,
}),
)
@@ -442,6 +448,7 @@ async fn test_catchup_with_manifest_update(factory: Option<LogStoreFactory>) {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: false,
entry_id: None,
metadata_entry_id: None,
location_id: None,
}),
)
@@ -479,6 +486,7 @@ async fn test_catchup_with_manifest_update(factory: Option<LogStoreFactory>) {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: true,
entry_id: None,
metadata_entry_id: None,
location_id: None,
}),
)
@@ -501,6 +509,7 @@ async fn test_catchup_not_exist() {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: true,
entry_id: None,
metadata_entry_id: None,
location_id: None,
}),
)

View File

@@ -16,7 +16,8 @@ use api::v1::Rows;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use store_api::region_engine::{
RegionEngine, RegionRole, SetRegionRoleStateResponse, SettableRegionRoleState,
RegionEngine, RegionRole, SetRegionRoleStateResponse, SetRegionRoleStateSuccess,
SettableRegionRoleState,
};
use store_api::region_request::{RegionPutRequest, RegionRequest};
use store_api::storage::RegionId;
@@ -48,9 +49,7 @@ async fn test_set_role_state_gracefully() {
.await
.unwrap();
assert_eq!(
SetRegionRoleStateResponse::Success {
last_entry_id: Some(0)
},
SetRegionRoleStateResponse::success(SetRegionRoleStateSuccess::mito(0)),
result
);
@@ -60,9 +59,7 @@ async fn test_set_role_state_gracefully() {
.await
.unwrap();
assert_eq!(
SetRegionRoleStateResponse::Success {
last_entry_id: Some(0)
},
SetRegionRoleStateResponse::success(SetRegionRoleStateSuccess::mito(0)),
result
);
@@ -96,9 +93,7 @@ async fn test_set_role_state_gracefully() {
.unwrap();
assert_eq!(
SetRegionRoleStateResponse::Success {
last_entry_id: Some(1)
},
SetRegionRoleStateResponse::success(SetRegionRoleStateSuccess::mito(1)),
result
);
}
@@ -144,9 +139,7 @@ async fn test_write_downgrading_region() {
.await
.unwrap();
assert_eq!(
SetRegionRoleStateResponse::Success {
last_entry_id: Some(1)
},
SetRegionRoleStateResponse::success(SetRegionRoleStateSuccess::mito(1)),
result
);

View File

@@ -41,7 +41,9 @@ use prometheus::IntGauge;
use rand::{rng, Rng};
use snafu::{ensure, ResultExt};
use store_api::logstore::LogStore;
use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
use store_api::region_engine::{
SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState,
};
use store_api::storage::RegionId;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::{mpsc, oneshot, watch, Mutex};
@@ -931,7 +933,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
region.set_role_state_gracefully(region_role_state).await;
let last_entry_id = region.version_control.current().last_entry_id;
let _ = sender.send(SetRegionRoleStateResponse::success(Some(last_entry_id)));
let _ = sender.send(SetRegionRoleStateResponse::success(
SetRegionRoleStateSuccess::mito(last_entry_id),
));
});
} else {
let _ = sender.send(SetRegionRoleStateResponse::NotFound);

View File

@@ -47,6 +47,15 @@ pub enum SettableRegionRoleState {
DowngradingLeader,
}
impl Display for SettableRegionRoleState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SettableRegionRoleState::Follower => write!(f, "Follower"),
SettableRegionRoleState::DowngradingLeader => write!(f, "Leader(Downgrading)"),
}
}
}
impl From<SettableRegionRoleState> for RegionRole {
fn from(value: SettableRegionRoleState) -> Self {
match value {
@@ -63,20 +72,78 @@ pub struct SetRegionRoleStateRequest {
region_role_state: SettableRegionRoleState,
}
/// The success response of setting region role state.
#[derive(Debug, PartialEq, Eq)]
pub enum SetRegionRoleStateSuccess {
File,
Mito {
last_entry_id: entry::Id,
},
Metric {
last_entry_id: entry::Id,
metadata_last_entry_id: entry::Id,
},
}
impl SetRegionRoleStateSuccess {
/// Returns a [SetRegionRoleStateSuccess::File].
pub fn file() -> Self {
Self::File
}
/// Returns a [SetRegionRoleStateSuccess::Mito] with the `last_entry_id`.
pub fn mito(last_entry_id: entry::Id) -> Self {
SetRegionRoleStateSuccess::Mito { last_entry_id }
}
/// Returns a [SetRegionRoleStateSuccess::Metric] with the `last_entry_id` and `metadata_last_entry_id`.
pub fn metric(last_entry_id: entry::Id, metadata_last_entry_id: entry::Id) -> Self {
SetRegionRoleStateSuccess::Metric {
last_entry_id,
metadata_last_entry_id,
}
}
}
impl SetRegionRoleStateSuccess {
/// Returns the last entry id of the region.
pub fn last_entry_id(&self) -> Option<entry::Id> {
match self {
SetRegionRoleStateSuccess::File => None,
SetRegionRoleStateSuccess::Mito { last_entry_id } => Some(*last_entry_id),
SetRegionRoleStateSuccess::Metric { last_entry_id, .. } => Some(*last_entry_id),
}
}
/// Returns the last entry id of the metadata of the region.
pub fn metadata_last_entry_id(&self) -> Option<entry::Id> {
match self {
SetRegionRoleStateSuccess::File => None,
SetRegionRoleStateSuccess::Mito { .. } => None,
SetRegionRoleStateSuccess::Metric {
metadata_last_entry_id,
..
} => Some(*metadata_last_entry_id),
}
}
}
/// The response of setting region role state.
#[derive(Debug, PartialEq, Eq)]
pub enum SetRegionRoleStateResponse {
Success {
/// Returns `last_entry_id` of the region if available(e.g., It's not available in file engine).
last_entry_id: Option<entry::Id>,
},
Success(SetRegionRoleStateSuccess),
NotFound,
}
impl SetRegionRoleStateResponse {
/// Returns a [SetRegionRoleStateResponse::Success] with the `last_entry_id`.
pub fn success(last_entry_id: Option<entry::Id>) -> Self {
Self::Success { last_entry_id }
/// Returns a [SetRegionRoleStateResponse::Success] with the `File` success.
pub fn success(success: SetRegionRoleStateSuccess) -> Self {
Self::Success(success)
}
/// Returns true if the response is a [SetRegionRoleStateResponse::NotFound].
pub fn is_not_found(&self) -> bool {
matches!(self, SetRegionRoleStateResponse::NotFound)
}
}

View File

@@ -1105,6 +1105,10 @@ pub struct RegionCatchupRequest {
/// The `entry_id` that was expected to reply to.
/// `None` stands replaying to latest.
pub entry_id: Option<entry::Id>,
/// Used for metrics metadata region.
/// The `entry_id` that was expected to reply to.
/// `None` stands replaying to latest.
pub metadata_entry_id: Option<entry::Id>,
/// The hint for replaying memtable.
pub location_id: Option<u64>,
}