mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
refactor: refactor instruction handler and adds support for batch region downgrade operations (#7130)
* refactor: refactor instruction handler Signed-off-by: WenyXu <wenymedia@gmail.com> * refactor: support batch downgrade region instructions Signed-off-by: WenyXu <wenymedia@gmail.com> * fix compat Signed-off-by: WenyXu <wenymedia@gmail.com> * fix clippy Signed-off-by: WenyXu <wenymedia@gmail.com> * add tests Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: add comments Signed-off-by: WenyXu <wenymedia@gmail.com> --------- Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
@@ -55,6 +55,10 @@ impl Display for RegionIdent {
|
||||
/// The result of downgrade leader region.
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
|
||||
pub struct DowngradeRegionReply {
|
||||
/// The [RegionId].
|
||||
/// For compatibility, it is defaulted to [RegionId::new(0, 0)].
|
||||
#[serde(default)]
|
||||
pub region_id: RegionId,
|
||||
/// Returns the `last_entry_id` if available.
|
||||
pub last_entry_id: Option<u64>,
|
||||
/// Returns the `metadata_last_entry_id` if available (Only available for metric engine).
|
||||
@@ -423,14 +427,60 @@ pub enum Instruction {
|
||||
CloseRegions(Vec<RegionIdent>),
|
||||
/// Upgrades a region.
|
||||
UpgradeRegion(UpgradeRegion),
|
||||
#[serde(
|
||||
deserialize_with = "single_or_multiple_from",
|
||||
alias = "DowngradeRegion"
|
||||
)]
|
||||
/// Downgrades a region.
|
||||
DowngradeRegion(DowngradeRegion),
|
||||
DowngradeRegions(Vec<DowngradeRegion>),
|
||||
/// Invalidates batch cache.
|
||||
InvalidateCaches(Vec<CacheIdent>),
|
||||
/// Flushes regions.
|
||||
FlushRegions(FlushRegions),
|
||||
}
|
||||
|
||||
impl Instruction {
|
||||
/// Converts the instruction into a vector of [OpenRegion].
|
||||
pub fn into_open_regions(self) -> Option<Vec<OpenRegion>> {
|
||||
match self {
|
||||
Self::OpenRegions(open_regions) => Some(open_regions),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Converts the instruction into a vector of [RegionIdent].
|
||||
pub fn into_close_regions(self) -> Option<Vec<RegionIdent>> {
|
||||
match self {
|
||||
Self::CloseRegions(close_regions) => Some(close_regions),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Converts the instruction into a [FlushRegions].
|
||||
pub fn into_flush_regions(self) -> Option<FlushRegions> {
|
||||
match self {
|
||||
Self::FlushRegions(flush_regions) => Some(flush_regions),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Converts the instruction into a [DowngradeRegion].
|
||||
pub fn into_downgrade_regions(self) -> Option<Vec<DowngradeRegion>> {
|
||||
match self {
|
||||
Self::DowngradeRegions(downgrade_region) => Some(downgrade_region),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Converts the instruction into a [UpgradeRegion].
|
||||
pub fn into_upgrade_regions(self) -> Option<UpgradeRegion> {
|
||||
match self {
|
||||
Self::UpgradeRegion(upgrade_region) => Some(upgrade_region),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The reply of [UpgradeRegion].
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
|
||||
pub struct UpgradeRegionReply {
|
||||
@@ -452,6 +502,39 @@ impl Display for UpgradeRegionReply {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
|
||||
pub struct DowngradeRegionsReply {
|
||||
pub replies: Vec<DowngradeRegionReply>,
|
||||
}
|
||||
|
||||
impl DowngradeRegionsReply {
|
||||
pub fn new(replies: Vec<DowngradeRegionReply>) -> Self {
|
||||
Self { replies }
|
||||
}
|
||||
|
||||
pub fn single(reply: DowngradeRegionReply) -> Self {
|
||||
Self::new(vec![reply])
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
#[serde(untagged)]
|
||||
enum DowngradeRegionsCompat {
|
||||
Single(DowngradeRegionReply),
|
||||
Multiple(DowngradeRegionsReply),
|
||||
}
|
||||
|
||||
fn downgrade_regions_compat_from<'de, D>(deserializer: D) -> Result<DowngradeRegionsReply, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
{
|
||||
let helper = DowngradeRegionsCompat::deserialize(deserializer)?;
|
||||
Ok(match helper {
|
||||
DowngradeRegionsCompat::Single(x) => DowngradeRegionsReply::new(vec![x]),
|
||||
DowngradeRegionsCompat::Multiple(reply) => reply,
|
||||
})
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
|
||||
#[serde(tag = "type", rename_all = "snake_case")]
|
||||
pub enum InstructionReply {
|
||||
@@ -460,7 +543,11 @@ pub enum InstructionReply {
|
||||
#[serde(alias = "close_region")]
|
||||
CloseRegions(SimpleReply),
|
||||
UpgradeRegion(UpgradeRegionReply),
|
||||
DowngradeRegion(DowngradeRegionReply),
|
||||
#[serde(
|
||||
alias = "downgrade_region",
|
||||
deserialize_with = "downgrade_regions_compat_from"
|
||||
)]
|
||||
DowngradeRegions(DowngradeRegionsReply),
|
||||
FlushRegions(FlushRegionReply),
|
||||
}
|
||||
|
||||
@@ -470,8 +557,8 @@ impl Display for InstructionReply {
|
||||
Self::OpenRegions(reply) => write!(f, "InstructionReply::OpenRegions({})", reply),
|
||||
Self::CloseRegions(reply) => write!(f, "InstructionReply::CloseRegions({})", reply),
|
||||
Self::UpgradeRegion(reply) => write!(f, "InstructionReply::UpgradeRegion({})", reply),
|
||||
Self::DowngradeRegion(reply) => {
|
||||
write!(f, "InstructionReply::DowngradeRegion({})", reply)
|
||||
Self::DowngradeRegions(reply) => {
|
||||
write!(f, "InstructionReply::DowngradeRegions({:?})", reply)
|
||||
}
|
||||
Self::FlushRegions(reply) => write!(f, "InstructionReply::FlushRegions({})", reply),
|
||||
}
|
||||
@@ -493,6 +580,27 @@ impl InstructionReply {
|
||||
_ => panic!("Expected OpenRegions reply"),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn expect_upgrade_region_reply(self) -> UpgradeRegionReply {
|
||||
match self {
|
||||
Self::UpgradeRegion(reply) => reply,
|
||||
_ => panic!("Expected UpgradeRegion reply"),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn expect_downgrade_regions_reply(self) -> Vec<DowngradeRegionReply> {
|
||||
match self {
|
||||
Self::DowngradeRegions(reply) => reply.replies,
|
||||
_ => panic!("Expected DowngradeRegion reply"),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn expect_flush_regions_reply(self) -> FlushRegionReply {
|
||||
match self {
|
||||
Self::FlushRegions(reply) => reply,
|
||||
_ => panic!("Expected FlushRegions reply"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -532,11 +640,27 @@ mod tests {
|
||||
r#"{"CloseRegions":[{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}]}"#,
|
||||
serialized
|
||||
);
|
||||
|
||||
let downgrade_region = InstructionReply::DowngradeRegions(DowngradeRegionsReply::single(
|
||||
DowngradeRegionReply {
|
||||
region_id: RegionId::new(1024, 1),
|
||||
last_entry_id: None,
|
||||
metadata_last_entry_id: None,
|
||||
exists: true,
|
||||
error: None,
|
||||
},
|
||||
));
|
||||
|
||||
let serialized = serde_json::to_string(&downgrade_region).unwrap();
|
||||
assert_eq!(
|
||||
r#"{"type":"downgrade_regions","replies":[{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"exists":true,"error":null}]}"#,
|
||||
serialized
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_deserialize_instruction() {
|
||||
let open_region_instruction = r#"{"OpenRegion":[{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}]}"#;
|
||||
let open_region_instruction = r#"{"OpenRegion":{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}}"#;
|
||||
let open_region_instruction: Instruction =
|
||||
serde_json::from_str(open_region_instruction).unwrap();
|
||||
let open_region = Instruction::OpenRegions(vec![OpenRegion::new(
|
||||
@@ -553,7 +677,7 @@ mod tests {
|
||||
)]);
|
||||
assert_eq!(open_region_instruction, open_region);
|
||||
|
||||
let close_region_instruction = r#"{"CloseRegion":[{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}]}"#;
|
||||
let close_region_instruction = r#"{"CloseRegion":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}}"#;
|
||||
let close_region_instruction: Instruction =
|
||||
serde_json::from_str(close_region_instruction).unwrap();
|
||||
let close_region = Instruction::CloseRegions(vec![RegionIdent {
|
||||
@@ -564,6 +688,15 @@ mod tests {
|
||||
}]);
|
||||
assert_eq!(close_region_instruction, close_region);
|
||||
|
||||
let downgrade_region_instruction = r#"{"DowngradeRegions":{"region_id":4398046511105,"flush_timeout":{"secs":1,"nanos":0}}}"#;
|
||||
let downgrade_region_instruction: Instruction =
|
||||
serde_json::from_str(downgrade_region_instruction).unwrap();
|
||||
let downgrade_region = Instruction::DowngradeRegions(vec![DowngradeRegion {
|
||||
region_id: RegionId::new(1024, 1),
|
||||
flush_timeout: Some(Duration::from_millis(1000)),
|
||||
}]);
|
||||
assert_eq!(downgrade_region_instruction, downgrade_region);
|
||||
|
||||
let close_region_instruction_reply =
|
||||
r#"{"result":true,"error":null,"type":"close_region"}"#;
|
||||
let close_region_instruction_reply: InstructionReply =
|
||||
@@ -582,6 +715,20 @@ mod tests {
|
||||
error: None,
|
||||
});
|
||||
assert_eq!(open_region_instruction_reply, open_region_reply);
|
||||
|
||||
let downgrade_region_instruction_reply = r#"{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"exists":true,"error":null,"type":"downgrade_region"}"#;
|
||||
let downgrade_region_instruction_reply: InstructionReply =
|
||||
serde_json::from_str(downgrade_region_instruction_reply).unwrap();
|
||||
let downgrade_region_reply = InstructionReply::DowngradeRegions(
|
||||
DowngradeRegionsReply::single(DowngradeRegionReply {
|
||||
region_id: RegionId::new(1024, 1),
|
||||
last_entry_id: None,
|
||||
metadata_last_entry_id: None,
|
||||
exists: true,
|
||||
error: None,
|
||||
}),
|
||||
);
|
||||
assert_eq!(downgrade_region_instruction_reply, downgrade_region_reply);
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
|
||||
@@ -13,16 +13,13 @@
|
||||
// limitations under the License.
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_meta::RegionIdent;
|
||||
use common_meta::error::{InvalidHeartbeatResponseSnafu, Result as MetaResult};
|
||||
use common_meta::heartbeat::handler::{
|
||||
HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
|
||||
};
|
||||
use common_meta::instruction::{Instruction, InstructionReply};
|
||||
use common_telemetry::error;
|
||||
use futures::future::BoxFuture;
|
||||
use snafu::OptionExt;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
mod close_region;
|
||||
mod downgrade_region;
|
||||
@@ -30,10 +27,15 @@ mod flush_region;
|
||||
mod open_region;
|
||||
mod upgrade_region;
|
||||
|
||||
use crate::heartbeat::handler::close_region::CloseRegionsHandler;
|
||||
use crate::heartbeat::handler::downgrade_region::DowngradeRegionsHandler;
|
||||
use crate::heartbeat::handler::flush_region::FlushRegionsHandler;
|
||||
use crate::heartbeat::handler::open_region::OpenRegionsHandler;
|
||||
use crate::heartbeat::handler::upgrade_region::UpgradeRegionsHandler;
|
||||
use crate::heartbeat::task_tracker::TaskTracker;
|
||||
use crate::region_server::RegionServer;
|
||||
|
||||
/// Handler for [Instruction::OpenRegion] and [Instruction::CloseRegion].
|
||||
/// The handler for [`Instruction`]s.
|
||||
#[derive(Clone)]
|
||||
pub struct RegionHeartbeatResponseHandler {
|
||||
region_server: RegionServer,
|
||||
@@ -43,9 +45,14 @@ pub struct RegionHeartbeatResponseHandler {
|
||||
open_region_parallelism: usize,
|
||||
}
|
||||
|
||||
/// Handler of the instruction.
|
||||
pub type InstructionHandler =
|
||||
Box<dyn FnOnce(HandlerContext) -> BoxFuture<'static, Option<InstructionReply>> + Send>;
|
||||
#[async_trait::async_trait]
|
||||
pub trait InstructionHandler: Send + Sync {
|
||||
async fn handle(
|
||||
&self,
|
||||
ctx: &HandlerContext,
|
||||
instruction: Instruction,
|
||||
) -> Option<InstructionReply>;
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct HandlerContext {
|
||||
@@ -56,10 +63,6 @@ pub struct HandlerContext {
|
||||
}
|
||||
|
||||
impl HandlerContext {
|
||||
fn region_ident_to_region_id(region_ident: &RegionIdent) -> RegionId {
|
||||
RegionId::new(region_ident.table_id, region_ident.region_number)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn new_for_test(region_server: RegionServer) -> Self {
|
||||
Self {
|
||||
@@ -90,31 +93,16 @@ impl RegionHeartbeatResponseHandler {
|
||||
self
|
||||
}
|
||||
|
||||
/// Builds the [InstructionHandler].
|
||||
fn build_handler(&self, instruction: Instruction) -> MetaResult<InstructionHandler> {
|
||||
fn build_handler(&self, instruction: &Instruction) -> MetaResult<Box<dyn InstructionHandler>> {
|
||||
match instruction {
|
||||
Instruction::OpenRegions(open_regions) => {
|
||||
let open_region_parallelism = self.open_region_parallelism;
|
||||
Ok(Box::new(move |handler_context| {
|
||||
handler_context
|
||||
.handle_open_regions_instruction(open_regions, open_region_parallelism)
|
||||
}))
|
||||
}
|
||||
Instruction::CloseRegions(close_regions) => Ok(Box::new(move |handler_context| {
|
||||
handler_context.handle_close_regions_instruction(close_regions)
|
||||
})),
|
||||
Instruction::DowngradeRegion(downgrade_region) => {
|
||||
Ok(Box::new(move |handler_context| {
|
||||
handler_context.handle_downgrade_region_instruction(downgrade_region)
|
||||
}))
|
||||
}
|
||||
Instruction::UpgradeRegion(upgrade_region) => Ok(Box::new(move |handler_context| {
|
||||
handler_context.handle_upgrade_region_instruction(upgrade_region)
|
||||
Instruction::CloseRegions(_) => Ok(Box::new(CloseRegionsHandler)),
|
||||
Instruction::OpenRegions(_) => Ok(Box::new(OpenRegionsHandler {
|
||||
open_region_parallelism: self.open_region_parallelism,
|
||||
})),
|
||||
Instruction::FlushRegions(_) => Ok(Box::new(FlushRegionsHandler)),
|
||||
Instruction::DowngradeRegions(_) => Ok(Box::new(DowngradeRegionsHandler)),
|
||||
Instruction::UpgradeRegion(_) => Ok(Box::new(UpgradeRegionsHandler)),
|
||||
Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(),
|
||||
Instruction::FlushRegions(flush_regions) => Ok(Box::new(move |handler_context| {
|
||||
handler_context.handle_flush_regions_instruction(flush_regions)
|
||||
})),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -124,7 +112,7 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
|
||||
fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
|
||||
matches!(ctx.incoming_message.as_ref(), |Some((
|
||||
_,
|
||||
Instruction::DowngradeRegion { .. },
|
||||
Instruction::DowngradeRegions { .. },
|
||||
))| Some((
|
||||
_,
|
||||
Instruction::UpgradeRegion { .. }
|
||||
@@ -151,15 +139,19 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
|
||||
let catchup_tasks = self.catchup_tasks.clone();
|
||||
let downgrade_tasks = self.downgrade_tasks.clone();
|
||||
let flush_tasks = self.flush_tasks.clone();
|
||||
let handler = self.build_handler(instruction)?;
|
||||
let handler = self.build_handler(&instruction)?;
|
||||
let _handle = common_runtime::spawn_global(async move {
|
||||
let reply = handler(HandlerContext {
|
||||
region_server,
|
||||
catchup_tasks,
|
||||
downgrade_tasks,
|
||||
flush_tasks,
|
||||
})
|
||||
.await;
|
||||
let reply = handler
|
||||
.handle(
|
||||
&HandlerContext {
|
||||
region_server,
|
||||
catchup_tasks,
|
||||
downgrade_tasks,
|
||||
flush_tasks,
|
||||
},
|
||||
instruction,
|
||||
)
|
||||
.await;
|
||||
|
||||
if let Some(reply) = reply
|
||||
&& let Err(e) = mailbox.send((meta, reply)).await
|
||||
@@ -179,6 +171,7 @@ mod tests {
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use common_meta::RegionIdent;
|
||||
use common_meta::heartbeat::mailbox::{
|
||||
HeartbeatMailbox, IncomingMessage, MailboxRef, MessageMeta,
|
||||
};
|
||||
@@ -249,10 +242,10 @@ mod tests {
|
||||
);
|
||||
|
||||
// Downgrade region
|
||||
let instruction = Instruction::DowngradeRegion(DowngradeRegion {
|
||||
let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion {
|
||||
region_id: RegionId::new(2048, 1),
|
||||
flush_timeout: Some(Duration::from_secs(1)),
|
||||
});
|
||||
}]);
|
||||
assert!(
|
||||
heartbeat_handler
|
||||
.is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction)))
|
||||
@@ -447,10 +440,10 @@ mod tests {
|
||||
// Should be ok, if we try to downgrade it twice.
|
||||
for _ in 0..2 {
|
||||
let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
|
||||
let instruction = Instruction::DowngradeRegion(DowngradeRegion {
|
||||
let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion {
|
||||
region_id,
|
||||
flush_timeout: Some(Duration::from_secs(1)),
|
||||
});
|
||||
}]);
|
||||
|
||||
let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
|
||||
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
|
||||
@@ -458,33 +451,27 @@ mod tests {
|
||||
|
||||
let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
|
||||
|
||||
if let InstructionReply::DowngradeRegion(reply) = reply {
|
||||
assert!(reply.exists);
|
||||
assert!(reply.error.is_none());
|
||||
assert_eq!(reply.last_entry_id.unwrap(), 0);
|
||||
} else {
|
||||
unreachable!()
|
||||
}
|
||||
let reply = &reply.expect_downgrade_regions_reply()[0];
|
||||
assert!(reply.exists);
|
||||
assert!(reply.error.is_none());
|
||||
assert_eq!(reply.last_entry_id.unwrap(), 0);
|
||||
}
|
||||
|
||||
// Downgrades a not exists region.
|
||||
let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
|
||||
let instruction = Instruction::DowngradeRegion(DowngradeRegion {
|
||||
let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion {
|
||||
region_id: RegionId::new(2048, 1),
|
||||
flush_timeout: Some(Duration::from_secs(1)),
|
||||
});
|
||||
}]);
|
||||
let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
|
||||
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
|
||||
assert_matches!(control, HandleControl::Continue);
|
||||
|
||||
let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
|
||||
|
||||
if let InstructionReply::DowngradeRegion(reply) = reply {
|
||||
assert!(!reply.exists);
|
||||
assert!(reply.error.is_none());
|
||||
assert!(reply.last_entry_id.is_none());
|
||||
} else {
|
||||
unreachable!()
|
||||
}
|
||||
let reply = reply.expect_downgrade_regions_reply();
|
||||
assert!(!reply[0].exists);
|
||||
assert!(reply[0].error.is_none());
|
||||
assert!(reply[0].last_entry_id.is_none());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,60 +12,64 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_meta::RegionIdent;
|
||||
use common_meta::instruction::{InstructionReply, SimpleReply};
|
||||
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
|
||||
use common_telemetry::warn;
|
||||
use futures::future::join_all;
|
||||
use futures_util::future::BoxFuture;
|
||||
use store_api::region_request::{RegionCloseRequest, RegionRequest};
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::error;
|
||||
use crate::heartbeat::handler::HandlerContext;
|
||||
use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
|
||||
|
||||
impl HandlerContext {
|
||||
pub(crate) fn handle_close_regions_instruction(
|
||||
self,
|
||||
region_idents: Vec<RegionIdent>,
|
||||
) -> BoxFuture<'static, Option<InstructionReply>> {
|
||||
Box::pin(async move {
|
||||
let region_ids = region_idents
|
||||
.into_iter()
|
||||
.map(|region_ident| Self::region_ident_to_region_id(®ion_ident))
|
||||
.collect::<Vec<_>>();
|
||||
#[derive(Debug, Clone, Copy, Default)]
|
||||
pub struct CloseRegionsHandler;
|
||||
|
||||
let futs = region_ids.iter().map(|region_id| {
|
||||
self.region_server
|
||||
.handle_request(*region_id, RegionRequest::Close(RegionCloseRequest {}))
|
||||
});
|
||||
#[async_trait::async_trait]
|
||||
impl InstructionHandler for CloseRegionsHandler {
|
||||
async fn handle(
|
||||
&self,
|
||||
ctx: &HandlerContext,
|
||||
instruction: Instruction,
|
||||
) -> Option<InstructionReply> {
|
||||
// Safety: must be `Instruction::CloseRegions` instruction.
|
||||
let region_idents = instruction.into_close_regions().unwrap();
|
||||
let region_ids = region_idents
|
||||
.into_iter()
|
||||
.map(|region_ident| RegionId::new(region_ident.table_id, region_ident.region_number))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let results = join_all(futs).await;
|
||||
let futs = region_ids.iter().map(|region_id| {
|
||||
ctx.region_server
|
||||
.handle_request(*region_id, RegionRequest::Close(RegionCloseRequest {}))
|
||||
});
|
||||
|
||||
let mut errors = vec![];
|
||||
for (region_id, result) in region_ids.into_iter().zip(results.into_iter()) {
|
||||
match result {
|
||||
Ok(_) => (),
|
||||
Err(error::Error::RegionNotFound { .. }) => {
|
||||
warn!(
|
||||
"Received a close regions instruction from meta, but target region:{} is not found.",
|
||||
region_id
|
||||
);
|
||||
}
|
||||
Err(err) => errors.push(format!("region:{region_id}: {err:?}")),
|
||||
let results = join_all(futs).await;
|
||||
|
||||
let mut errors = vec![];
|
||||
for (region_id, result) in region_ids.into_iter().zip(results.into_iter()) {
|
||||
match result {
|
||||
Ok(_) => (),
|
||||
Err(error::Error::RegionNotFound { .. }) => {
|
||||
warn!(
|
||||
"Received a close regions instruction from meta, but target region:{} is not found.",
|
||||
region_id
|
||||
);
|
||||
}
|
||||
Err(err) => errors.push(format!("region:{region_id}: {err:?}")),
|
||||
}
|
||||
}
|
||||
|
||||
if errors.is_empty() {
|
||||
return Some(InstructionReply::CloseRegions(SimpleReply {
|
||||
result: true,
|
||||
error: None,
|
||||
}));
|
||||
}
|
||||
if errors.is_empty() {
|
||||
return Some(InstructionReply::CloseRegions(SimpleReply {
|
||||
result: true,
|
||||
error: None,
|
||||
}));
|
||||
}
|
||||
|
||||
Some(InstructionReply::CloseRegions(SimpleReply {
|
||||
result: false,
|
||||
error: Some(errors.join("; ")),
|
||||
}))
|
||||
})
|
||||
Some(InstructionReply::CloseRegions(SimpleReply {
|
||||
result: false,
|
||||
error: Some(errors.join("; ")),
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -12,209 +12,242 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_meta::instruction::{DowngradeRegion, DowngradeRegionReply, InstructionReply};
|
||||
use common_meta::instruction::{
|
||||
DowngradeRegion, DowngradeRegionReply, DowngradeRegionsReply, Instruction, InstructionReply,
|
||||
};
|
||||
use common_telemetry::tracing::info;
|
||||
use common_telemetry::{error, warn};
|
||||
use futures_util::future::BoxFuture;
|
||||
use futures::future::join_all;
|
||||
use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
|
||||
use store_api::region_request::{RegionFlushRequest, RegionRequest};
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::heartbeat::handler::HandlerContext;
|
||||
use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
|
||||
use crate::heartbeat::task_tracker::WaitResult;
|
||||
|
||||
impl HandlerContext {
|
||||
async fn downgrade_to_follower_gracefully(
|
||||
#[derive(Debug, Clone, Copy, Default)]
|
||||
pub struct DowngradeRegionsHandler;
|
||||
|
||||
impl DowngradeRegionsHandler {
|
||||
async fn handle_downgrade_region(
|
||||
ctx: &HandlerContext,
|
||||
DowngradeRegion {
|
||||
region_id,
|
||||
flush_timeout,
|
||||
}: DowngradeRegion,
|
||||
) -> DowngradeRegionReply {
|
||||
let Some(writable) = ctx.region_server.is_region_leader(region_id) else {
|
||||
warn!("Region: {region_id} is not found");
|
||||
return DowngradeRegionReply {
|
||||
region_id,
|
||||
last_entry_id: None,
|
||||
metadata_last_entry_id: None,
|
||||
exists: false,
|
||||
error: None,
|
||||
};
|
||||
};
|
||||
|
||||
let region_server_moved = ctx.region_server.clone();
|
||||
|
||||
// Ignores flush request
|
||||
if !writable {
|
||||
warn!(
|
||||
"Region: {region_id} is not writable, flush_timeout: {:?}",
|
||||
flush_timeout
|
||||
);
|
||||
return ctx.downgrade_to_follower_gracefully(region_id).await;
|
||||
}
|
||||
|
||||
// If flush_timeout is not set, directly convert region to follower.
|
||||
let Some(flush_timeout) = flush_timeout else {
|
||||
return ctx.downgrade_to_follower_gracefully(region_id).await;
|
||||
};
|
||||
|
||||
// Sets region to downgrading,
|
||||
// the downgrading region will reject all write requests.
|
||||
// However, the downgrading region will still accept read, flush requests.
|
||||
match ctx
|
||||
.region_server
|
||||
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::DowngradingLeader)
|
||||
.await
|
||||
{
|
||||
Ok(SetRegionRoleStateResponse::Success { .. }) => {}
|
||||
Ok(SetRegionRoleStateResponse::NotFound) => {
|
||||
warn!("Region: {region_id} is not found");
|
||||
return DowngradeRegionReply {
|
||||
region_id,
|
||||
last_entry_id: None,
|
||||
metadata_last_entry_id: None,
|
||||
exists: false,
|
||||
error: None,
|
||||
};
|
||||
}
|
||||
Ok(SetRegionRoleStateResponse::InvalidTransition(err)) => {
|
||||
error!(err; "Failed to convert region to downgrading leader - invalid transition");
|
||||
return DowngradeRegionReply {
|
||||
region_id,
|
||||
last_entry_id: None,
|
||||
metadata_last_entry_id: None,
|
||||
exists: true,
|
||||
error: Some(format!("{err:?}")),
|
||||
};
|
||||
}
|
||||
Err(err) => {
|
||||
error!(err; "Failed to convert region to downgrading leader");
|
||||
return DowngradeRegionReply {
|
||||
region_id,
|
||||
last_entry_id: None,
|
||||
metadata_last_entry_id: None,
|
||||
exists: true,
|
||||
error: Some(format!("{err:?}")),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
let register_result = ctx
|
||||
.downgrade_tasks
|
||||
.try_register(
|
||||
region_id,
|
||||
Box::pin(async move {
|
||||
info!("Flush region: {region_id} before converting region to follower");
|
||||
region_server_moved
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::Flush(RegionFlushRequest {
|
||||
row_group_size: None,
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
|
||||
if register_result.is_busy() {
|
||||
warn!("Another flush task is running for the region: {region_id}");
|
||||
}
|
||||
|
||||
let mut watcher = register_result.into_watcher();
|
||||
let result = ctx.downgrade_tasks.wait(&mut watcher, flush_timeout).await;
|
||||
|
||||
match result {
|
||||
WaitResult::Timeout => DowngradeRegionReply {
|
||||
region_id,
|
||||
last_entry_id: None,
|
||||
metadata_last_entry_id: None,
|
||||
exists: true,
|
||||
error: Some(format!(
|
||||
"Flush region timeout, region: {region_id}, timeout: {:?}",
|
||||
flush_timeout
|
||||
)),
|
||||
},
|
||||
WaitResult::Finish(Ok(_)) => ctx.downgrade_to_follower_gracefully(region_id).await,
|
||||
WaitResult::Finish(Err(err)) => DowngradeRegionReply {
|
||||
region_id,
|
||||
last_entry_id: None,
|
||||
metadata_last_entry_id: None,
|
||||
exists: true,
|
||||
error: Some(format!("{err:?}")),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl InstructionHandler for DowngradeRegionsHandler {
|
||||
async fn handle(
|
||||
&self,
|
||||
region_id: RegionId,
|
||||
ctx: &HandlerContext,
|
||||
instruction: Instruction,
|
||||
) -> Option<InstructionReply> {
|
||||
// Safety: must be `Instruction::DowngradeRegion` instruction.
|
||||
let downgrade_regions = instruction.into_downgrade_regions().unwrap();
|
||||
let futures = downgrade_regions
|
||||
.into_iter()
|
||||
.map(|downgrade_region| Self::handle_downgrade_region(ctx, downgrade_region));
|
||||
// Join all futures; parallelism is governed by the underlying flush scheduler.
|
||||
let results = join_all(futures).await;
|
||||
|
||||
Some(InstructionReply::DowngradeRegions(
|
||||
DowngradeRegionsReply::new(results),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
impl HandlerContext {
|
||||
async fn downgrade_to_follower_gracefully(&self, region_id: RegionId) -> DowngradeRegionReply {
|
||||
match self
|
||||
.region_server
|
||||
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::Follower)
|
||||
.await
|
||||
{
|
||||
Ok(SetRegionRoleStateResponse::Success(success)) => {
|
||||
Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
|
||||
last_entry_id: success.last_entry_id(),
|
||||
metadata_last_entry_id: success.metadata_last_entry_id(),
|
||||
exists: true,
|
||||
error: None,
|
||||
}))
|
||||
}
|
||||
Ok(SetRegionRoleStateResponse::Success(success)) => DowngradeRegionReply {
|
||||
region_id,
|
||||
last_entry_id: success.last_entry_id(),
|
||||
metadata_last_entry_id: success.metadata_last_entry_id(),
|
||||
exists: true,
|
||||
error: None,
|
||||
},
|
||||
Ok(SetRegionRoleStateResponse::NotFound) => {
|
||||
warn!("Region: {region_id} is not found");
|
||||
Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
|
||||
DowngradeRegionReply {
|
||||
region_id,
|
||||
last_entry_id: None,
|
||||
metadata_last_entry_id: None,
|
||||
exists: false,
|
||||
error: None,
|
||||
}))
|
||||
}
|
||||
}
|
||||
Ok(SetRegionRoleStateResponse::InvalidTransition(err)) => {
|
||||
error!(err; "Failed to convert region to follower - invalid transition");
|
||||
Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
|
||||
DowngradeRegionReply {
|
||||
region_id,
|
||||
last_entry_id: None,
|
||||
metadata_last_entry_id: None,
|
||||
exists: true,
|
||||
error: Some(format!("{err:?}")),
|
||||
}))
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
error!(err; "Failed to convert region to {}", SettableRegionRoleState::Follower);
|
||||
Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
|
||||
DowngradeRegionReply {
|
||||
region_id,
|
||||
last_entry_id: None,
|
||||
metadata_last_entry_id: None,
|
||||
exists: true,
|
||||
error: Some(format!("{err:?}")),
|
||||
}))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn handle_downgrade_region_instruction(
|
||||
self,
|
||||
DowngradeRegion {
|
||||
region_id,
|
||||
flush_timeout,
|
||||
}: DowngradeRegion,
|
||||
) -> BoxFuture<'static, Option<InstructionReply>> {
|
||||
Box::pin(async move {
|
||||
let Some(writable) = self.region_server.is_region_leader(region_id) else {
|
||||
warn!("Region: {region_id} is not found");
|
||||
return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
|
||||
last_entry_id: None,
|
||||
metadata_last_entry_id: None,
|
||||
exists: false,
|
||||
error: None,
|
||||
}));
|
||||
};
|
||||
|
||||
let region_server_moved = self.region_server.clone();
|
||||
|
||||
// Ignores flush request
|
||||
if !writable {
|
||||
warn!(
|
||||
"Region: {region_id} is not writable, flush_timeout: {:?}",
|
||||
flush_timeout
|
||||
);
|
||||
return self.downgrade_to_follower_gracefully(region_id).await;
|
||||
}
|
||||
|
||||
// If flush_timeout is not set, directly convert region to follower.
|
||||
let Some(flush_timeout) = flush_timeout else {
|
||||
return self.downgrade_to_follower_gracefully(region_id).await;
|
||||
};
|
||||
|
||||
// Sets region to downgrading,
|
||||
// the downgrading region will reject all write requests.
|
||||
// However, the downgrading region will still accept read, flush requests.
|
||||
match self
|
||||
.region_server
|
||||
.set_region_role_state_gracefully(
|
||||
region_id,
|
||||
SettableRegionRoleState::DowngradingLeader,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(SetRegionRoleStateResponse::Success { .. }) => {}
|
||||
Ok(SetRegionRoleStateResponse::NotFound) => {
|
||||
warn!("Region: {region_id} is not found");
|
||||
return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
|
||||
last_entry_id: None,
|
||||
metadata_last_entry_id: None,
|
||||
exists: false,
|
||||
error: None,
|
||||
}));
|
||||
}
|
||||
Ok(SetRegionRoleStateResponse::InvalidTransition(err)) => {
|
||||
error!(err; "Failed to convert region to downgrading leader - invalid transition");
|
||||
return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
|
||||
last_entry_id: None,
|
||||
metadata_last_entry_id: None,
|
||||
exists: true,
|
||||
error: Some(format!("{err:?}")),
|
||||
}));
|
||||
}
|
||||
Err(err) => {
|
||||
error!(err; "Failed to convert region to downgrading leader");
|
||||
return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
|
||||
last_entry_id: None,
|
||||
metadata_last_entry_id: None,
|
||||
exists: true,
|
||||
error: Some(format!("{err:?}")),
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
let register_result = self
|
||||
.downgrade_tasks
|
||||
.try_register(
|
||||
region_id,
|
||||
Box::pin(async move {
|
||||
info!("Flush region: {region_id} before converting region to follower");
|
||||
region_server_moved
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::Flush(RegionFlushRequest {
|
||||
row_group_size: None,
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
|
||||
if register_result.is_busy() {
|
||||
warn!("Another flush task is running for the region: {region_id}");
|
||||
}
|
||||
|
||||
let mut watcher = register_result.into_watcher();
|
||||
let result = self.downgrade_tasks.wait(&mut watcher, flush_timeout).await;
|
||||
|
||||
match result {
|
||||
WaitResult::Timeout => {
|
||||
Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
|
||||
last_entry_id: None,
|
||||
metadata_last_entry_id: None,
|
||||
exists: true,
|
||||
error: Some(format!(
|
||||
"Flush region timeout, region: {region_id}, timeout: {:?}",
|
||||
flush_timeout
|
||||
)),
|
||||
}))
|
||||
}
|
||||
WaitResult::Finish(Ok(_)) => self.downgrade_to_follower_gracefully(region_id).await,
|
||||
WaitResult::Finish(Err(err)) => {
|
||||
Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
|
||||
last_entry_id: None,
|
||||
metadata_last_entry_id: None,
|
||||
exists: true,
|
||||
error: Some(format!("{err:?}")),
|
||||
}))
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::assert_matches::assert_matches;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use common_meta::instruction::{DowngradeRegion, InstructionReply};
|
||||
use common_meta::heartbeat::handler::{HandleControl, HeartbeatResponseHandler};
|
||||
use common_meta::heartbeat::mailbox::MessageMeta;
|
||||
use common_meta::instruction::{DowngradeRegion, Instruction};
|
||||
use mito2::config::MitoConfig;
|
||||
use mito2::engine::MITO_ENGINE_NAME;
|
||||
use mito2::test_util::{CreateRequestBuilder, TestEnv};
|
||||
use store_api::region_engine::{
|
||||
RegionRole, SetRegionRoleStateResponse, SetRegionRoleStateSuccess,
|
||||
RegionEngine, RegionRole, SetRegionRoleStateResponse, SetRegionRoleStateSuccess,
|
||||
};
|
||||
use store_api::region_request::RegionRequest;
|
||||
use store_api::storage::RegionId;
|
||||
use tokio::time::Instant;
|
||||
|
||||
use crate::error;
|
||||
use crate::heartbeat::handler::HandlerContext;
|
||||
use crate::heartbeat::handler::downgrade_region::DowngradeRegionsHandler;
|
||||
use crate::heartbeat::handler::tests::HeartbeatResponseTestEnv;
|
||||
use crate::heartbeat::handler::{
|
||||
HandlerContext, InstructionHandler, RegionHeartbeatResponseHandler,
|
||||
};
|
||||
use crate::tests::{MockRegionEngine, mock_region_server};
|
||||
|
||||
#[tokio::test]
|
||||
@@ -227,20 +260,20 @@ mod tests {
|
||||
let waits = vec![None, Some(Duration::from_millis(100u64))];
|
||||
|
||||
for flush_timeout in waits {
|
||||
let reply = handler_context
|
||||
.clone()
|
||||
.handle_downgrade_region_instruction(DowngradeRegion {
|
||||
region_id,
|
||||
flush_timeout,
|
||||
})
|
||||
let reply = DowngradeRegionsHandler
|
||||
.handle(
|
||||
&handler_context,
|
||||
Instruction::DowngradeRegions(vec![DowngradeRegion {
|
||||
region_id,
|
||||
flush_timeout,
|
||||
}]),
|
||||
)
|
||||
.await;
|
||||
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
|
||||
|
||||
if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
|
||||
assert!(!reply.exists);
|
||||
assert!(reply.error.is_none());
|
||||
assert!(reply.last_entry_id.is_none());
|
||||
}
|
||||
let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
|
||||
assert!(!reply.exists);
|
||||
assert!(reply.error.is_none());
|
||||
assert!(reply.last_entry_id.is_none());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -270,20 +303,20 @@ mod tests {
|
||||
|
||||
let waits = vec![None, Some(Duration::from_millis(100u64))];
|
||||
for flush_timeout in waits {
|
||||
let reply = handler_context
|
||||
.clone()
|
||||
.handle_downgrade_region_instruction(DowngradeRegion {
|
||||
region_id,
|
||||
flush_timeout,
|
||||
})
|
||||
let reply = DowngradeRegionsHandler
|
||||
.handle(
|
||||
&handler_context,
|
||||
Instruction::DowngradeRegions(vec![DowngradeRegion {
|
||||
region_id,
|
||||
flush_timeout,
|
||||
}]),
|
||||
)
|
||||
.await;
|
||||
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
|
||||
|
||||
if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
|
||||
assert!(reply.exists);
|
||||
assert!(reply.error.is_none());
|
||||
assert_eq!(reply.last_entry_id.unwrap(), 1024);
|
||||
}
|
||||
let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
|
||||
assert!(reply.exists);
|
||||
assert!(reply.error.is_none());
|
||||
assert_eq!(reply.last_entry_id.unwrap(), 1024);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -305,20 +338,20 @@ mod tests {
|
||||
let handler_context = HandlerContext::new_for_test(mock_region_server);
|
||||
|
||||
let flush_timeout = Duration::from_millis(100);
|
||||
let reply = handler_context
|
||||
.clone()
|
||||
.handle_downgrade_region_instruction(DowngradeRegion {
|
||||
region_id,
|
||||
flush_timeout: Some(flush_timeout),
|
||||
})
|
||||
let reply = DowngradeRegionsHandler
|
||||
.handle(
|
||||
&handler_context,
|
||||
Instruction::DowngradeRegions(vec![DowngradeRegion {
|
||||
region_id,
|
||||
flush_timeout: Some(flush_timeout),
|
||||
}]),
|
||||
)
|
||||
.await;
|
||||
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
|
||||
|
||||
if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
|
||||
assert!(reply.exists);
|
||||
assert!(reply.error.unwrap().contains("timeout"));
|
||||
assert!(reply.last_entry_id.is_none());
|
||||
}
|
||||
let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
|
||||
assert!(reply.exists);
|
||||
assert!(reply.error.as_ref().unwrap().contains("timeout"));
|
||||
assert!(reply.last_entry_id.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -344,36 +377,38 @@ mod tests {
|
||||
];
|
||||
|
||||
for flush_timeout in waits {
|
||||
let reply = handler_context
|
||||
.clone()
|
||||
.handle_downgrade_region_instruction(DowngradeRegion {
|
||||
region_id,
|
||||
flush_timeout,
|
||||
})
|
||||
let reply = DowngradeRegionsHandler
|
||||
.handle(
|
||||
&handler_context,
|
||||
Instruction::DowngradeRegions(vec![DowngradeRegion {
|
||||
region_id,
|
||||
flush_timeout,
|
||||
}]),
|
||||
)
|
||||
.await;
|
||||
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
|
||||
if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
|
||||
assert!(reply.exists);
|
||||
assert!(reply.error.unwrap().contains("timeout"));
|
||||
assert!(reply.last_entry_id.is_none());
|
||||
}
|
||||
|
||||
let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
|
||||
assert!(reply.exists);
|
||||
assert!(reply.error.as_ref().unwrap().contains("timeout"));
|
||||
assert!(reply.last_entry_id.is_none());
|
||||
}
|
||||
let timer = Instant::now();
|
||||
let reply = handler_context
|
||||
.handle_downgrade_region_instruction(DowngradeRegion {
|
||||
region_id,
|
||||
flush_timeout: Some(Duration::from_millis(500)),
|
||||
})
|
||||
let reply = DowngradeRegionsHandler
|
||||
.handle(
|
||||
&handler_context,
|
||||
Instruction::DowngradeRegions(vec![DowngradeRegion {
|
||||
region_id,
|
||||
flush_timeout: Some(Duration::from_millis(500)),
|
||||
}]),
|
||||
)
|
||||
.await;
|
||||
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
|
||||
// Must less than 300 ms.
|
||||
assert!(timer.elapsed().as_millis() < 300);
|
||||
|
||||
if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
|
||||
assert!(reply.exists);
|
||||
assert!(reply.error.is_none());
|
||||
assert_eq!(reply.last_entry_id.unwrap(), 1024);
|
||||
}
|
||||
let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
|
||||
assert!(reply.exists);
|
||||
assert!(reply.error.is_none());
|
||||
assert_eq!(reply.last_entry_id.unwrap(), 1024);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -405,36 +440,36 @@ mod tests {
|
||||
];
|
||||
|
||||
for flush_timeout in waits {
|
||||
let reply = handler_context
|
||||
.clone()
|
||||
.handle_downgrade_region_instruction(DowngradeRegion {
|
||||
region_id,
|
||||
flush_timeout,
|
||||
})
|
||||
let reply = DowngradeRegionsHandler
|
||||
.handle(
|
||||
&handler_context,
|
||||
Instruction::DowngradeRegions(vec![DowngradeRegion {
|
||||
region_id,
|
||||
flush_timeout,
|
||||
}]),
|
||||
)
|
||||
.await;
|
||||
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
|
||||
if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
|
||||
assert!(reply.exists);
|
||||
assert!(reply.error.unwrap().contains("timeout"));
|
||||
assert!(reply.last_entry_id.is_none());
|
||||
}
|
||||
}
|
||||
let timer = Instant::now();
|
||||
let reply = handler_context
|
||||
.handle_downgrade_region_instruction(DowngradeRegion {
|
||||
region_id,
|
||||
flush_timeout: Some(Duration::from_millis(500)),
|
||||
})
|
||||
.await;
|
||||
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
|
||||
// Must less than 300 ms.
|
||||
assert!(timer.elapsed().as_millis() < 300);
|
||||
|
||||
if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
|
||||
let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
|
||||
assert!(reply.exists);
|
||||
assert!(reply.error.unwrap().contains("flush failed"));
|
||||
assert!(reply.error.as_ref().unwrap().contains("timeout"));
|
||||
assert!(reply.last_entry_id.is_none());
|
||||
}
|
||||
let timer = Instant::now();
|
||||
let reply = DowngradeRegionsHandler
|
||||
.handle(
|
||||
&handler_context,
|
||||
Instruction::DowngradeRegions(vec![DowngradeRegion {
|
||||
region_id,
|
||||
flush_timeout: Some(Duration::from_millis(500)),
|
||||
}]),
|
||||
)
|
||||
.await;
|
||||
// Must less than 300 ms.
|
||||
assert!(timer.elapsed().as_millis() < 300);
|
||||
let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
|
||||
assert!(reply.exists);
|
||||
assert!(reply.error.as_ref().unwrap().contains("flush failed"));
|
||||
assert!(reply.last_entry_id.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -449,19 +484,19 @@ mod tests {
|
||||
});
|
||||
mock_region_server.register_test_region(region_id, mock_engine);
|
||||
let handler_context = HandlerContext::new_for_test(mock_region_server);
|
||||
let reply = handler_context
|
||||
.clone()
|
||||
.handle_downgrade_region_instruction(DowngradeRegion {
|
||||
region_id,
|
||||
flush_timeout: None,
|
||||
})
|
||||
let reply = DowngradeRegionsHandler
|
||||
.handle(
|
||||
&handler_context,
|
||||
Instruction::DowngradeRegions(vec![DowngradeRegion {
|
||||
region_id,
|
||||
flush_timeout: None,
|
||||
}]),
|
||||
)
|
||||
.await;
|
||||
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
|
||||
if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
|
||||
assert!(!reply.exists);
|
||||
assert!(reply.error.is_none());
|
||||
assert!(reply.last_entry_id.is_none());
|
||||
}
|
||||
let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
|
||||
assert!(!reply.exists);
|
||||
assert!(reply.error.is_none());
|
||||
assert!(reply.last_entry_id.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -480,23 +515,77 @@ mod tests {
|
||||
});
|
||||
mock_region_server.register_test_region(region_id, mock_engine);
|
||||
let handler_context = HandlerContext::new_for_test(mock_region_server);
|
||||
let reply = handler_context
|
||||
.clone()
|
||||
.handle_downgrade_region_instruction(DowngradeRegion {
|
||||
region_id,
|
||||
flush_timeout: None,
|
||||
})
|
||||
let reply = DowngradeRegionsHandler
|
||||
.handle(
|
||||
&handler_context,
|
||||
Instruction::DowngradeRegions(vec![DowngradeRegion {
|
||||
region_id,
|
||||
flush_timeout: None,
|
||||
}]),
|
||||
)
|
||||
.await;
|
||||
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
|
||||
if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
|
||||
assert!(reply.exists);
|
||||
assert!(
|
||||
reply
|
||||
.error
|
||||
.unwrap()
|
||||
.contains("Failed to set region to readonly")
|
||||
);
|
||||
assert!(reply.last_entry_id.is_none());
|
||||
}
|
||||
let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
|
||||
assert!(reply.exists);
|
||||
assert!(
|
||||
reply
|
||||
.error
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.contains("Failed to set region to readonly")
|
||||
);
|
||||
assert!(reply.last_entry_id.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_downgrade_regions() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let mut region_server = mock_region_server();
|
||||
let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
|
||||
let mut engine_env = TestEnv::with_prefix("downgrade-regions").await;
|
||||
let engine = engine_env.create_engine(MitoConfig::default()).await;
|
||||
region_server.register_engine(Arc::new(engine.clone()));
|
||||
let region_id = RegionId::new(1024, 1);
|
||||
let region_id1 = RegionId::new(1024, 2);
|
||||
let builder = CreateRequestBuilder::new();
|
||||
let create_req = builder.build();
|
||||
region_server
|
||||
.handle_request(region_id, RegionRequest::Create(create_req))
|
||||
.await
|
||||
.unwrap();
|
||||
let create_req1 = builder.build();
|
||||
region_server
|
||||
.handle_request(region_id1, RegionRequest::Create(create_req1))
|
||||
.await
|
||||
.unwrap();
|
||||
let meta = MessageMeta::new_test(1, "test", "dn-1", "meta-0");
|
||||
let instruction = Instruction::DowngradeRegions(vec![
|
||||
DowngradeRegion {
|
||||
region_id,
|
||||
flush_timeout: Some(Duration::from_secs(1)),
|
||||
},
|
||||
DowngradeRegion {
|
||||
region_id: region_id1,
|
||||
flush_timeout: Some(Duration::from_secs(1)),
|
||||
},
|
||||
]);
|
||||
let mut heartbeat_env = HeartbeatResponseTestEnv::new();
|
||||
let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
|
||||
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
|
||||
assert_matches!(control, HandleControl::Continue);
|
||||
|
||||
let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
|
||||
let reply = reply.expect_downgrade_regions_reply();
|
||||
assert_eq!(reply[0].region_id, region_id);
|
||||
assert!(reply[0].exists);
|
||||
assert!(reply[0].error.is_none());
|
||||
assert_eq!(reply[0].last_entry_id, Some(0));
|
||||
assert_eq!(reply[1].region_id, region_id1);
|
||||
assert!(reply[1].exists);
|
||||
assert!(reply[1].error.is_none());
|
||||
assert_eq!(reply[1].last_entry_id, Some(0));
|
||||
|
||||
assert_eq!(engine.role(region_id).unwrap(), RegionRole::Follower);
|
||||
assert_eq!(engine.role(region_id1).unwrap(), RegionRole::Follower);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,19 +15,53 @@
|
||||
use std::time::Instant;
|
||||
|
||||
use common_meta::instruction::{
|
||||
FlushErrorStrategy, FlushRegionReply, FlushRegions, FlushStrategy, InstructionReply,
|
||||
FlushErrorStrategy, FlushRegionReply, FlushStrategy, Instruction, InstructionReply,
|
||||
};
|
||||
use common_telemetry::{debug, warn};
|
||||
use futures_util::future::BoxFuture;
|
||||
use store_api::region_request::{RegionFlushRequest, RegionRequest};
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::error::{self, RegionNotFoundSnafu, RegionNotReadySnafu, UnexpectedSnafu};
|
||||
use crate::heartbeat::handler::HandlerContext;
|
||||
use crate::error::{self, RegionNotFoundSnafu, RegionNotReadySnafu, Result, UnexpectedSnafu};
|
||||
use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
|
||||
|
||||
pub struct FlushRegionsHandler;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl InstructionHandler for FlushRegionsHandler {
|
||||
async fn handle(
|
||||
&self,
|
||||
ctx: &HandlerContext,
|
||||
instruction: Instruction,
|
||||
) -> Option<InstructionReply> {
|
||||
let start_time = Instant::now();
|
||||
let flush_regions = instruction.into_flush_regions().unwrap();
|
||||
let strategy = flush_regions.strategy;
|
||||
let region_ids = flush_regions.region_ids;
|
||||
let error_strategy = flush_regions.error_strategy;
|
||||
|
||||
let reply = if matches!(strategy, FlushStrategy::Async) {
|
||||
// Asynchronous hint mode: fire-and-forget, no reply expected
|
||||
ctx.handle_flush_hint(region_ids).await;
|
||||
None
|
||||
} else {
|
||||
// Synchronous mode: return reply with results
|
||||
let reply = ctx.handle_flush_sync(region_ids, error_strategy).await;
|
||||
Some(InstructionReply::FlushRegions(reply))
|
||||
};
|
||||
|
||||
let elapsed = start_time.elapsed();
|
||||
debug!(
|
||||
"FlushRegions strategy: {:?}, elapsed: {:?}, reply: {:?}",
|
||||
strategy, elapsed, reply
|
||||
);
|
||||
|
||||
reply
|
||||
}
|
||||
}
|
||||
|
||||
impl HandlerContext {
|
||||
/// Performs the actual region flush operation.
|
||||
async fn perform_region_flush(&self, region_id: RegionId) -> Result<(), error::Error> {
|
||||
async fn perform_region_flush(&self, region_id: RegionId) -> Result<()> {
|
||||
let request = RegionRequest::Flush(RegionFlushRequest {
|
||||
row_group_size: None,
|
||||
});
|
||||
@@ -92,7 +126,7 @@ impl HandlerContext {
|
||||
}
|
||||
|
||||
/// Flushes a single region synchronously with proper error handling.
|
||||
async fn flush_single_region_sync(&self, region_id: RegionId) -> Result<(), error::Error> {
|
||||
async fn flush_single_region_sync(&self, region_id: RegionId) -> Result<()> {
|
||||
// Check if region is leader and writable
|
||||
let Some(writable) = self.region_server.is_region_leader(region_id) else {
|
||||
return Err(RegionNotFoundSnafu { region_id }.build());
|
||||
@@ -135,37 +169,6 @@ impl HandlerContext {
|
||||
.build()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Unified handler for FlushRegions with all flush semantics.
|
||||
pub(crate) fn handle_flush_regions_instruction(
|
||||
self,
|
||||
flush_regions: FlushRegions,
|
||||
) -> BoxFuture<'static, Option<InstructionReply>> {
|
||||
Box::pin(async move {
|
||||
let start_time = Instant::now();
|
||||
let strategy = flush_regions.strategy;
|
||||
let region_ids = flush_regions.region_ids;
|
||||
let error_strategy = flush_regions.error_strategy;
|
||||
|
||||
let reply = if matches!(strategy, FlushStrategy::Async) {
|
||||
// Asynchronous hint mode: fire-and-forget, no reply expected
|
||||
self.handle_flush_hint(region_ids).await;
|
||||
None
|
||||
} else {
|
||||
// Synchronous mode: return reply with results
|
||||
let reply = self.handle_flush_sync(region_ids, error_strategy).await;
|
||||
Some(InstructionReply::FlushRegions(reply))
|
||||
};
|
||||
|
||||
let elapsed = start_time.elapsed();
|
||||
debug!(
|
||||
"FlushRegions strategy: {:?}, elapsed: {:?}, reply: {:?}",
|
||||
strategy, elapsed, reply
|
||||
);
|
||||
|
||||
reply
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -201,9 +204,11 @@ mod tests {
|
||||
|
||||
// Async hint mode
|
||||
let flush_instruction = FlushRegions::async_batch(region_ids.clone());
|
||||
let reply = handler_context
|
||||
.clone()
|
||||
.handle_flush_regions_instruction(flush_instruction)
|
||||
let reply = FlushRegionsHandler
|
||||
.handle(
|
||||
&handler_context,
|
||||
Instruction::FlushRegions(flush_instruction),
|
||||
)
|
||||
.await;
|
||||
assert!(reply.is_none()); // Hint mode returns no reply
|
||||
assert_eq!(*flushed_region_ids.read().unwrap(), region_ids);
|
||||
@@ -212,8 +217,11 @@ mod tests {
|
||||
flushed_region_ids.write().unwrap().clear();
|
||||
let not_found_region_ids = (0..2).map(|i| RegionId::new(2048, i)).collect::<Vec<_>>();
|
||||
let flush_instruction = FlushRegions::async_batch(not_found_region_ids);
|
||||
let reply = handler_context
|
||||
.handle_flush_regions_instruction(flush_instruction)
|
||||
let reply = FlushRegionsHandler
|
||||
.handle(
|
||||
&handler_context,
|
||||
Instruction::FlushRegions(flush_instruction),
|
||||
)
|
||||
.await;
|
||||
assert!(reply.is_none());
|
||||
assert!(flushed_region_ids.read().unwrap().is_empty());
|
||||
@@ -238,20 +246,17 @@ mod tests {
|
||||
let handler_context = HandlerContext::new_for_test(mock_region_server);
|
||||
|
||||
let flush_instruction = FlushRegions::sync_single(region_id);
|
||||
let reply = handler_context
|
||||
.handle_flush_regions_instruction(flush_instruction)
|
||||
let reply = FlushRegionsHandler
|
||||
.handle(
|
||||
&handler_context,
|
||||
Instruction::FlushRegions(flush_instruction),
|
||||
)
|
||||
.await;
|
||||
|
||||
assert!(reply.is_some());
|
||||
if let Some(InstructionReply::FlushRegions(flush_reply)) = reply {
|
||||
assert!(flush_reply.overall_success);
|
||||
assert_eq!(flush_reply.results.len(), 1);
|
||||
assert_eq!(flush_reply.results[0].0, region_id);
|
||||
assert!(flush_reply.results[0].1.is_ok());
|
||||
} else {
|
||||
panic!("Expected FlushRegions reply");
|
||||
}
|
||||
|
||||
let flush_reply = reply.unwrap().expect_flush_regions_reply();
|
||||
assert!(flush_reply.overall_success);
|
||||
assert_eq!(flush_reply.results.len(), 1);
|
||||
assert_eq!(flush_reply.results[0].0, region_id);
|
||||
assert!(flush_reply.results[0].1.is_ok());
|
||||
assert_eq!(*flushed_region_ids.read().unwrap(), vec![region_id]);
|
||||
}
|
||||
|
||||
@@ -281,18 +286,16 @@ mod tests {
|
||||
// Sync batch with fail-fast strategy
|
||||
let flush_instruction =
|
||||
FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::FailFast);
|
||||
let reply = handler_context
|
||||
.handle_flush_regions_instruction(flush_instruction)
|
||||
let reply = FlushRegionsHandler
|
||||
.handle(
|
||||
&handler_context,
|
||||
Instruction::FlushRegions(flush_instruction),
|
||||
)
|
||||
.await;
|
||||
|
||||
assert!(reply.is_some());
|
||||
if let Some(InstructionReply::FlushRegions(flush_reply)) = reply {
|
||||
assert!(!flush_reply.overall_success); // Should fail due to non-existent regions
|
||||
// With fail-fast, only process regions until first failure
|
||||
assert!(flush_reply.results.len() <= region_ids.len());
|
||||
} else {
|
||||
panic!("Expected FlushRegions reply");
|
||||
}
|
||||
let flush_reply = reply.unwrap().expect_flush_regions_reply();
|
||||
assert!(!flush_reply.overall_success); // Should fail due to non-existent regions
|
||||
// With fail-fast, only process regions until first failure
|
||||
assert!(flush_reply.results.len() <= region_ids.len());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -317,20 +320,18 @@ mod tests {
|
||||
// Sync batch with try-all strategy
|
||||
let flush_instruction =
|
||||
FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::TryAll);
|
||||
let reply = handler_context
|
||||
.handle_flush_regions_instruction(flush_instruction)
|
||||
let reply = FlushRegionsHandler
|
||||
.handle(
|
||||
&handler_context,
|
||||
Instruction::FlushRegions(flush_instruction),
|
||||
)
|
||||
.await;
|
||||
|
||||
assert!(reply.is_some());
|
||||
if let Some(InstructionReply::FlushRegions(flush_reply)) = reply {
|
||||
assert!(!flush_reply.overall_success); // Should fail due to one non-existent region
|
||||
// With try-all, should process all regions
|
||||
assert_eq!(flush_reply.results.len(), region_ids.len());
|
||||
// First should succeed, second should fail
|
||||
assert!(flush_reply.results[0].1.is_ok());
|
||||
assert!(flush_reply.results[1].1.is_err());
|
||||
} else {
|
||||
panic!("Expected FlushRegions reply");
|
||||
}
|
||||
let flush_reply = reply.unwrap().expect_flush_regions_reply();
|
||||
assert!(!flush_reply.overall_success); // Should fail due to one non-existent region
|
||||
// With try-all, should process all regions
|
||||
assert_eq!(flush_reply.results.len(), region_ids.len());
|
||||
// First should succeed, second should fail
|
||||
assert!(flush_reply.results[0].1.is_ok());
|
||||
assert!(flush_reply.results[1].1.is_err());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,56 +12,62 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_meta::instruction::{InstructionReply, OpenRegion, SimpleReply};
|
||||
use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
|
||||
use common_meta::wal_options_allocator::prepare_wal_options;
|
||||
use futures_util::future::BoxFuture;
|
||||
use store_api::path_utils::table_dir;
|
||||
use store_api::region_request::{PathType, RegionOpenRequest};
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::heartbeat::handler::HandlerContext;
|
||||
use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
|
||||
|
||||
impl HandlerContext {
|
||||
pub(crate) fn handle_open_regions_instruction(
|
||||
self,
|
||||
open_regions: Vec<OpenRegion>,
|
||||
open_region_parallelism: usize,
|
||||
) -> BoxFuture<'static, Option<InstructionReply>> {
|
||||
Box::pin(async move {
|
||||
let requests = open_regions
|
||||
.into_iter()
|
||||
.map(|open_region| {
|
||||
let OpenRegion {
|
||||
region_ident,
|
||||
region_storage_path,
|
||||
mut region_options,
|
||||
region_wal_options,
|
||||
skip_wal_replay,
|
||||
} = open_region;
|
||||
let region_id = Self::region_ident_to_region_id(®ion_ident);
|
||||
prepare_wal_options(&mut region_options, region_id, ®ion_wal_options);
|
||||
let request = RegionOpenRequest {
|
||||
engine: region_ident.engine,
|
||||
table_dir: table_dir(®ion_storage_path, region_id.table_id()),
|
||||
path_type: PathType::Bare,
|
||||
options: region_options,
|
||||
skip_wal_replay,
|
||||
checkpoint: None,
|
||||
};
|
||||
(region_id, request)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
pub struct OpenRegionsHandler {
|
||||
pub open_region_parallelism: usize,
|
||||
}
|
||||
|
||||
let result = self
|
||||
.region_server
|
||||
.handle_batch_open_requests(open_region_parallelism, requests, false)
|
||||
.await;
|
||||
let success = result.is_ok();
|
||||
let error = result.as_ref().map_err(|e| format!("{e:?}")).err();
|
||||
Some(InstructionReply::OpenRegions(SimpleReply {
|
||||
result: success,
|
||||
error,
|
||||
}))
|
||||
})
|
||||
#[async_trait::async_trait]
|
||||
impl InstructionHandler for OpenRegionsHandler {
|
||||
async fn handle(
|
||||
&self,
|
||||
ctx: &HandlerContext,
|
||||
instruction: Instruction,
|
||||
) -> Option<InstructionReply> {
|
||||
let open_regions = instruction.into_open_regions().unwrap();
|
||||
|
||||
let requests = open_regions
|
||||
.into_iter()
|
||||
.map(|open_region| {
|
||||
let OpenRegion {
|
||||
region_ident,
|
||||
region_storage_path,
|
||||
mut region_options,
|
||||
region_wal_options,
|
||||
skip_wal_replay,
|
||||
} = open_region;
|
||||
let region_id = RegionId::new(region_ident.table_id, region_ident.region_number);
|
||||
prepare_wal_options(&mut region_options, region_id, ®ion_wal_options);
|
||||
let request = RegionOpenRequest {
|
||||
engine: region_ident.engine,
|
||||
table_dir: table_dir(®ion_storage_path, region_id.table_id()),
|
||||
path_type: PathType::Bare,
|
||||
options: region_options,
|
||||
skip_wal_replay,
|
||||
checkpoint: None,
|
||||
};
|
||||
(region_id, request)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let result = ctx
|
||||
.region_server
|
||||
.handle_batch_open_requests(self.open_region_parallelism, requests, false)
|
||||
.await;
|
||||
let success = result.is_ok();
|
||||
let error = result.as_ref().map_err(|e| format!("{e:?}")).err();
|
||||
|
||||
Some(InstructionReply::OpenRegions(SimpleReply {
|
||||
result: success,
|
||||
error,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -12,18 +12,24 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_meta::instruction::{InstructionReply, UpgradeRegion, UpgradeRegionReply};
|
||||
use common_meta::instruction::{Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply};
|
||||
use common_telemetry::{info, warn};
|
||||
use futures_util::future::BoxFuture;
|
||||
use store_api::region_request::{RegionCatchupRequest, RegionRequest, ReplayCheckpoint};
|
||||
|
||||
use crate::heartbeat::handler::HandlerContext;
|
||||
use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
|
||||
use crate::heartbeat::task_tracker::WaitResult;
|
||||
|
||||
impl HandlerContext {
|
||||
pub(crate) fn handle_upgrade_region_instruction(
|
||||
self,
|
||||
UpgradeRegion {
|
||||
#[derive(Debug, Clone, Copy, Default)]
|
||||
pub struct UpgradeRegionsHandler;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl InstructionHandler for UpgradeRegionsHandler {
|
||||
async fn handle(
|
||||
&self,
|
||||
ctx: &HandlerContext,
|
||||
instruction: Instruction,
|
||||
) -> Option<InstructionReply> {
|
||||
let UpgradeRegion {
|
||||
region_id,
|
||||
last_entry_id,
|
||||
metadata_last_entry_id,
|
||||
@@ -31,116 +37,116 @@ impl HandlerContext {
|
||||
location_id,
|
||||
replay_entry_id,
|
||||
metadata_replay_entry_id,
|
||||
}: UpgradeRegion,
|
||||
) -> BoxFuture<'static, Option<InstructionReply>> {
|
||||
Box::pin(async move {
|
||||
let Some(writable) = self.region_server.is_region_leader(region_id) else {
|
||||
return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
|
||||
ready: false,
|
||||
exists: false,
|
||||
error: None,
|
||||
}));
|
||||
};
|
||||
} = instruction.into_upgrade_regions().unwrap();
|
||||
|
||||
if writable {
|
||||
return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
|
||||
let Some(writable) = ctx.region_server.is_region_leader(region_id) else {
|
||||
return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
|
||||
ready: false,
|
||||
exists: false,
|
||||
error: None,
|
||||
}));
|
||||
};
|
||||
|
||||
if writable {
|
||||
return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
|
||||
ready: true,
|
||||
exists: true,
|
||||
error: None,
|
||||
}));
|
||||
}
|
||||
|
||||
let region_server_moved = ctx.region_server.clone();
|
||||
|
||||
let checkpoint = match (replay_entry_id, metadata_replay_entry_id) {
|
||||
(Some(entry_id), metadata_entry_id) => Some(ReplayCheckpoint {
|
||||
entry_id,
|
||||
metadata_entry_id,
|
||||
}),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
// The catchup task is almost zero cost if the inside region is writable.
|
||||
// Therefore, it always registers a new catchup task.
|
||||
let register_result = ctx
|
||||
.catchup_tasks
|
||||
.try_register(
|
||||
region_id,
|
||||
Box::pin(async move {
|
||||
info!(
|
||||
"Executing region: {region_id} catchup to: last entry id {last_entry_id:?}"
|
||||
);
|
||||
region_server_moved
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::Catchup(RegionCatchupRequest {
|
||||
set_writable: true,
|
||||
entry_id: last_entry_id,
|
||||
metadata_entry_id: metadata_last_entry_id,
|
||||
location_id,
|
||||
checkpoint,
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
|
||||
if register_result.is_busy() {
|
||||
warn!("Another catchup task is running for the region: {region_id}");
|
||||
}
|
||||
|
||||
// Returns immediately
|
||||
let Some(replay_timeout) = replay_timeout else {
|
||||
return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
|
||||
ready: false,
|
||||
exists: true,
|
||||
error: None,
|
||||
}));
|
||||
};
|
||||
|
||||
// We don't care that it returns a newly registered or running task.
|
||||
let mut watcher = register_result.into_watcher();
|
||||
let result = ctx.catchup_tasks.wait(&mut watcher, replay_timeout).await;
|
||||
|
||||
match result {
|
||||
WaitResult::Timeout => Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
|
||||
ready: false,
|
||||
exists: true,
|
||||
error: None,
|
||||
})),
|
||||
WaitResult::Finish(Ok(_)) => {
|
||||
Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
|
||||
ready: true,
|
||||
exists: true,
|
||||
error: None,
|
||||
}));
|
||||
}))
|
||||
}
|
||||
|
||||
let region_server_moved = self.region_server.clone();
|
||||
|
||||
let checkpoint = match (replay_entry_id, metadata_replay_entry_id) {
|
||||
(Some(entry_id), metadata_entry_id) => Some(ReplayCheckpoint {
|
||||
entry_id,
|
||||
metadata_entry_id,
|
||||
}),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
// The catchup task is almost zero cost if the inside region is writable.
|
||||
// Therefore, it always registers a new catchup task.
|
||||
let register_result = self
|
||||
.catchup_tasks
|
||||
.try_register(
|
||||
region_id,
|
||||
Box::pin(async move {
|
||||
info!("Executing region: {region_id} catchup to: last entry id {last_entry_id:?}");
|
||||
region_server_moved
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::Catchup(RegionCatchupRequest {
|
||||
set_writable: true,
|
||||
entry_id: last_entry_id,
|
||||
metadata_entry_id: metadata_last_entry_id,
|
||||
location_id,
|
||||
checkpoint,
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
|
||||
if register_result.is_busy() {
|
||||
warn!("Another catchup task is running for the region: {region_id}");
|
||||
}
|
||||
|
||||
// Returns immediately
|
||||
let Some(replay_timeout) = replay_timeout else {
|
||||
return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
|
||||
WaitResult::Finish(Err(err)) => {
|
||||
Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
|
||||
ready: false,
|
||||
exists: true,
|
||||
error: None,
|
||||
}));
|
||||
};
|
||||
|
||||
// We don't care that it returns a newly registered or running task.
|
||||
let mut watcher = register_result.into_watcher();
|
||||
let result = self.catchup_tasks.wait(&mut watcher, replay_timeout).await;
|
||||
|
||||
match result {
|
||||
WaitResult::Timeout => Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
|
||||
ready: false,
|
||||
exists: true,
|
||||
error: None,
|
||||
})),
|
||||
WaitResult::Finish(Ok(_)) => {
|
||||
Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
|
||||
ready: true,
|
||||
exists: true,
|
||||
error: None,
|
||||
}))
|
||||
}
|
||||
WaitResult::Finish(Err(err)) => {
|
||||
Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
|
||||
ready: false,
|
||||
exists: true,
|
||||
error: Some(format!("{err:?}")),
|
||||
}))
|
||||
}
|
||||
error: Some(format!("{err:?}")),
|
||||
}))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::assert_matches::assert_matches;
|
||||
use std::time::Duration;
|
||||
|
||||
use common_meta::instruction::{InstructionReply, UpgradeRegion};
|
||||
use common_meta::instruction::{Instruction, UpgradeRegion};
|
||||
use mito2::engine::MITO_ENGINE_NAME;
|
||||
use store_api::region_engine::RegionRole;
|
||||
use store_api::storage::RegionId;
|
||||
use tokio::time::Instant;
|
||||
|
||||
use crate::error;
|
||||
use crate::heartbeat::handler::HandlerContext;
|
||||
use crate::heartbeat::handler::upgrade_region::UpgradeRegionsHandler;
|
||||
use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
|
||||
use crate::tests::{MockRegionEngine, mock_region_server};
|
||||
|
||||
#[tokio::test]
|
||||
@@ -155,20 +161,20 @@ mod tests {
|
||||
let waits = vec![None, Some(Duration::from_millis(100u64))];
|
||||
|
||||
for replay_timeout in waits {
|
||||
let reply = handler_context
|
||||
.clone()
|
||||
.handle_upgrade_region_instruction(UpgradeRegion {
|
||||
region_id,
|
||||
replay_timeout,
|
||||
..Default::default()
|
||||
})
|
||||
let reply = UpgradeRegionsHandler
|
||||
.handle(
|
||||
&handler_context,
|
||||
Instruction::UpgradeRegion(UpgradeRegion {
|
||||
region_id,
|
||||
replay_timeout,
|
||||
..Default::default()
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
|
||||
|
||||
if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
|
||||
assert!(!reply.exists);
|
||||
assert!(reply.error.is_none());
|
||||
}
|
||||
let reply = reply.unwrap().expect_upgrade_region_reply();
|
||||
assert!(!reply.exists);
|
||||
assert!(reply.error.is_none());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -192,21 +198,21 @@ mod tests {
|
||||
let waits = vec![None, Some(Duration::from_millis(100u64))];
|
||||
|
||||
for replay_timeout in waits {
|
||||
let reply = handler_context
|
||||
.clone()
|
||||
.handle_upgrade_region_instruction(UpgradeRegion {
|
||||
region_id,
|
||||
replay_timeout,
|
||||
..Default::default()
|
||||
})
|
||||
let reply = UpgradeRegionsHandler
|
||||
.handle(
|
||||
&handler_context,
|
||||
Instruction::UpgradeRegion(UpgradeRegion {
|
||||
region_id,
|
||||
replay_timeout,
|
||||
..Default::default()
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
|
||||
|
||||
if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
|
||||
assert!(reply.ready);
|
||||
assert!(reply.exists);
|
||||
assert!(reply.error.is_none());
|
||||
}
|
||||
let reply = reply.unwrap().expect_upgrade_region_reply();
|
||||
assert!(reply.ready);
|
||||
assert!(reply.exists);
|
||||
assert!(reply.error.is_none());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -230,21 +236,21 @@ mod tests {
|
||||
let waits = vec![None, Some(Duration::from_millis(100u64))];
|
||||
|
||||
for replay_timeout in waits {
|
||||
let reply = handler_context
|
||||
.clone()
|
||||
.handle_upgrade_region_instruction(UpgradeRegion {
|
||||
region_id,
|
||||
replay_timeout,
|
||||
..Default::default()
|
||||
})
|
||||
let reply = UpgradeRegionsHandler
|
||||
.handle(
|
||||
&handler_context,
|
||||
Instruction::UpgradeRegion(UpgradeRegion {
|
||||
region_id,
|
||||
replay_timeout,
|
||||
..Default::default()
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
|
||||
|
||||
if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
|
||||
assert!(!reply.ready);
|
||||
assert!(reply.exists);
|
||||
assert!(reply.error.is_none());
|
||||
}
|
||||
let reply = reply.unwrap().expect_upgrade_region_reply();
|
||||
assert!(!reply.ready);
|
||||
assert!(reply.exists);
|
||||
assert!(reply.error.is_none());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -271,40 +277,41 @@ mod tests {
|
||||
let handler_context = HandlerContext::new_for_test(mock_region_server);
|
||||
|
||||
for replay_timeout in waits {
|
||||
let reply = handler_context
|
||||
.clone()
|
||||
.handle_upgrade_region_instruction(UpgradeRegion {
|
||||
region_id,
|
||||
replay_timeout,
|
||||
..Default::default()
|
||||
})
|
||||
let reply = UpgradeRegionsHandler
|
||||
.handle(
|
||||
&handler_context,
|
||||
Instruction::UpgradeRegion(UpgradeRegion {
|
||||
region_id,
|
||||
replay_timeout,
|
||||
..Default::default()
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
|
||||
|
||||
if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
|
||||
assert!(!reply.ready);
|
||||
assert!(reply.exists);
|
||||
assert!(reply.error.is_none());
|
||||
}
|
||||
}
|
||||
|
||||
let timer = Instant::now();
|
||||
let reply = handler_context
|
||||
.handle_upgrade_region_instruction(UpgradeRegion {
|
||||
region_id,
|
||||
replay_timeout: Some(Duration::from_millis(500)),
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
|
||||
// Must less than 300 ms.
|
||||
assert!(timer.elapsed().as_millis() < 300);
|
||||
|
||||
if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
|
||||
assert!(reply.ready);
|
||||
let reply = reply.unwrap().expect_upgrade_region_reply();
|
||||
assert!(!reply.ready);
|
||||
assert!(reply.exists);
|
||||
assert!(reply.error.is_none());
|
||||
}
|
||||
|
||||
let timer = Instant::now();
|
||||
let reply = UpgradeRegionsHandler
|
||||
.handle(
|
||||
&handler_context,
|
||||
Instruction::UpgradeRegion(UpgradeRegion {
|
||||
region_id,
|
||||
replay_timeout: Some(Duration::from_millis(500)),
|
||||
..Default::default()
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
// Must less than 300 ms.
|
||||
assert!(timer.elapsed().as_millis() < 300);
|
||||
|
||||
let reply = reply.unwrap().expect_upgrade_region_reply();
|
||||
assert!(reply.ready);
|
||||
assert!(reply.exists);
|
||||
assert!(reply.error.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -329,37 +336,37 @@ mod tests {
|
||||
|
||||
let handler_context = HandlerContext::new_for_test(mock_region_server);
|
||||
|
||||
let reply = handler_context
|
||||
.clone()
|
||||
.handle_upgrade_region_instruction(UpgradeRegion {
|
||||
region_id,
|
||||
..Default::default()
|
||||
})
|
||||
let reply = UpgradeRegionsHandler
|
||||
.handle(
|
||||
&handler_context,
|
||||
Instruction::UpgradeRegion(UpgradeRegion {
|
||||
region_id,
|
||||
..Default::default()
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
|
||||
|
||||
// It didn't wait for handle returns; it had no idea about the error.
|
||||
if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
|
||||
assert!(!reply.ready);
|
||||
assert!(reply.exists);
|
||||
assert!(reply.error.is_none());
|
||||
}
|
||||
let reply = reply.unwrap().expect_upgrade_region_reply();
|
||||
assert!(!reply.ready);
|
||||
assert!(reply.exists);
|
||||
assert!(reply.error.is_none());
|
||||
|
||||
let reply = handler_context
|
||||
.clone()
|
||||
.handle_upgrade_region_instruction(UpgradeRegion {
|
||||
region_id,
|
||||
replay_timeout: Some(Duration::from_millis(200)),
|
||||
..Default::default()
|
||||
})
|
||||
let reply = UpgradeRegionsHandler
|
||||
.handle(
|
||||
&handler_context,
|
||||
Instruction::UpgradeRegion(UpgradeRegion {
|
||||
region_id,
|
||||
replay_timeout: Some(Duration::from_millis(200)),
|
||||
..Default::default()
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
|
||||
|
||||
if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
|
||||
assert!(!reply.ready);
|
||||
assert!(reply.exists);
|
||||
assert!(reply.error.is_some());
|
||||
assert!(reply.error.unwrap().contains("mock_error"));
|
||||
}
|
||||
let reply = reply.unwrap().expect_upgrade_region_reply();
|
||||
assert!(!reply.ready);
|
||||
assert!(reply.exists);
|
||||
assert!(reply.error.is_some());
|
||||
assert!(reply.error.unwrap().contains("mock_error"));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ use api::v1::meta::MailboxMessage;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::distributed_time_constants::REGION_LEASE_SECS;
|
||||
use common_meta::instruction::{
|
||||
DowngradeRegion, DowngradeRegionReply, Instruction, InstructionReply,
|
||||
DowngradeRegion, DowngradeRegionReply, DowngradeRegionsReply, Instruction, InstructionReply,
|
||||
};
|
||||
use common_procedure::{Context as ProcedureContext, Status};
|
||||
use common_telemetry::{error, info, warn};
|
||||
@@ -120,10 +120,10 @@ impl DowngradeLeaderRegion {
|
||||
) -> Instruction {
|
||||
let pc = &ctx.persistent_ctx;
|
||||
let region_id = pc.region_id;
|
||||
Instruction::DowngradeRegion(DowngradeRegion {
|
||||
Instruction::DowngradeRegions(vec![DowngradeRegion {
|
||||
region_id,
|
||||
flush_timeout: Some(flush_timeout),
|
||||
})
|
||||
}])
|
||||
}
|
||||
|
||||
/// Tries to downgrade a leader region.
|
||||
@@ -173,12 +173,7 @@ impl DowngradeLeaderRegion {
|
||||
region_id,
|
||||
now.elapsed()
|
||||
);
|
||||
let InstructionReply::DowngradeRegion(DowngradeRegionReply {
|
||||
last_entry_id,
|
||||
metadata_last_entry_id,
|
||||
exists,
|
||||
error,
|
||||
}) = reply
|
||||
let InstructionReply::DowngradeRegions(DowngradeRegionsReply { replies }) = reply
|
||||
else {
|
||||
return error::UnexpectedInstructionReplySnafu {
|
||||
mailbox_message: msg.to_string(),
|
||||
@@ -187,6 +182,15 @@ impl DowngradeLeaderRegion {
|
||||
.fail();
|
||||
};
|
||||
|
||||
// TODO(weny): handle multiple replies.
|
||||
let DowngradeRegionReply {
|
||||
region_id,
|
||||
last_entry_id,
|
||||
metadata_last_entry_id,
|
||||
exists,
|
||||
error,
|
||||
} = &replies[0];
|
||||
|
||||
if error.is_some() {
|
||||
return error::RetryLaterSnafu {
|
||||
reason: format!(
|
||||
@@ -216,12 +220,12 @@ impl DowngradeLeaderRegion {
|
||||
}
|
||||
|
||||
if let Some(last_entry_id) = last_entry_id {
|
||||
ctx.volatile_ctx.set_last_entry_id(last_entry_id);
|
||||
ctx.volatile_ctx.set_last_entry_id(*last_entry_id);
|
||||
}
|
||||
|
||||
if let Some(metadata_last_entry_id) = metadata_last_entry_id {
|
||||
ctx.volatile_ctx
|
||||
.set_metadata_last_entry_id(metadata_last_entry_id);
|
||||
.set_metadata_last_entry_id(*metadata_last_entry_id);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -17,7 +17,8 @@ use std::collections::HashMap;
|
||||
use api::v1::meta::mailbox_message::Payload;
|
||||
use api::v1::meta::{HeartbeatResponse, MailboxMessage};
|
||||
use common_meta::instruction::{
|
||||
DowngradeRegionReply, FlushRegionReply, InstructionReply, SimpleReply, UpgradeRegionReply,
|
||||
DowngradeRegionReply, DowngradeRegionsReply, FlushRegionReply, InstructionReply, SimpleReply,
|
||||
UpgradeRegionReply,
|
||||
};
|
||||
use common_meta::key::TableMetadataManagerRef;
|
||||
use common_meta::key::table_route::TableRouteValue;
|
||||
@@ -183,12 +184,15 @@ pub fn new_downgrade_region_reply(
|
||||
to: "meta".to_string(),
|
||||
timestamp_millis: current_time_millis(),
|
||||
payload: Some(Payload::Json(
|
||||
serde_json::to_string(&InstructionReply::DowngradeRegion(DowngradeRegionReply {
|
||||
last_entry_id,
|
||||
metadata_last_entry_id: None,
|
||||
exists: exist,
|
||||
error,
|
||||
}))
|
||||
serde_json::to_string(&InstructionReply::DowngradeRegions(
|
||||
DowngradeRegionsReply::new(vec![DowngradeRegionReply {
|
||||
region_id: RegionId::new(0, 0),
|
||||
last_entry_id,
|
||||
metadata_last_entry_id: None,
|
||||
exists: exist,
|
||||
error,
|
||||
}]),
|
||||
))
|
||||
.unwrap(),
|
||||
)),
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user