mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-27 02:10:38 +00:00
feat: introduce OpenRegions and CloseRegions instructions to support batch region operations (#7122)
* feat: introduce `OpenRegions` and `CloseRegions` instructions to support batch region operations Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: apply suggestions Signed-off-by: WenyXu <wenymedia@gmail.com> * feat: merge instructions Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: apply suggestions Signed-off-by: WenyXu <wenymedia@gmail.com> --------- Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
@@ -24,7 +24,7 @@ async fn test_heartbeat_mailbox() {
|
||||
let mailbox = HeartbeatMailbox::new(tx);
|
||||
|
||||
let meta = MessageMeta::new_test(1, "test", "foo", "bar");
|
||||
let reply = InstructionReply::OpenRegion(SimpleReply {
|
||||
let reply = InstructionReply::OpenRegions(SimpleReply {
|
||||
result: true,
|
||||
error: None,
|
||||
});
|
||||
|
||||
@@ -16,7 +16,7 @@ use std::collections::HashMap;
|
||||
use std::fmt::{Display, Formatter};
|
||||
use std::time::Duration;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde::{Deserialize, Deserializer, Serialize};
|
||||
use store_api::storage::{RegionId, RegionNumber};
|
||||
use strum::Display;
|
||||
use table::metadata::TableId;
|
||||
@@ -394,16 +394,33 @@ impl From<RegionId> for FlushRegions {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
#[serde(untagged)]
|
||||
enum SingleOrMultiple<T> {
|
||||
Single(T),
|
||||
Multiple(Vec<T>),
|
||||
}
|
||||
|
||||
fn single_or_multiple_from<'de, D, T>(deserializer: D) -> Result<Vec<T>, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
T: Deserialize<'de>,
|
||||
{
|
||||
let helper = SingleOrMultiple::<T>::deserialize(deserializer)?;
|
||||
Ok(match helper {
|
||||
SingleOrMultiple::Single(x) => vec![x],
|
||||
SingleOrMultiple::Multiple(xs) => xs,
|
||||
})
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]
|
||||
pub enum Instruction {
|
||||
/// Opens a region.
|
||||
///
|
||||
/// - Returns true if a specified region exists.
|
||||
OpenRegion(OpenRegion),
|
||||
/// Closes a region.
|
||||
///
|
||||
/// - Returns true if a specified region does not exist.
|
||||
CloseRegion(RegionIdent),
|
||||
/// Opens regions.
|
||||
#[serde(deserialize_with = "single_or_multiple_from", alias = "OpenRegion")]
|
||||
OpenRegions(Vec<OpenRegion>),
|
||||
/// Closes regions.
|
||||
#[serde(deserialize_with = "single_or_multiple_from", alias = "CloseRegion")]
|
||||
CloseRegions(Vec<RegionIdent>),
|
||||
/// Upgrades a region.
|
||||
UpgradeRegion(UpgradeRegion),
|
||||
/// Downgrades a region.
|
||||
@@ -438,8 +455,10 @@ impl Display for UpgradeRegionReply {
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
|
||||
#[serde(tag = "type", rename_all = "snake_case")]
|
||||
pub enum InstructionReply {
|
||||
OpenRegion(SimpleReply),
|
||||
CloseRegion(SimpleReply),
|
||||
#[serde(alias = "open_region")]
|
||||
OpenRegions(SimpleReply),
|
||||
#[serde(alias = "close_region")]
|
||||
CloseRegions(SimpleReply),
|
||||
UpgradeRegion(UpgradeRegionReply),
|
||||
DowngradeRegion(DowngradeRegionReply),
|
||||
FlushRegions(FlushRegionReply),
|
||||
@@ -448,8 +467,8 @@ pub enum InstructionReply {
|
||||
impl Display for InstructionReply {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::OpenRegion(reply) => write!(f, "InstructionReply::OpenRegion({})", reply),
|
||||
Self::CloseRegion(reply) => write!(f, "InstructionReply::CloseRegion({})", reply),
|
||||
Self::OpenRegions(reply) => write!(f, "InstructionReply::OpenRegions({})", reply),
|
||||
Self::CloseRegions(reply) => write!(f, "InstructionReply::CloseRegions({})", reply),
|
||||
Self::UpgradeRegion(reply) => write!(f, "InstructionReply::UpgradeRegion({})", reply),
|
||||
Self::DowngradeRegion(reply) => {
|
||||
write!(f, "InstructionReply::DowngradeRegion({})", reply)
|
||||
@@ -459,13 +478,30 @@ impl Display for InstructionReply {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
impl InstructionReply {
|
||||
pub fn expect_close_regions_reply(self) -> SimpleReply {
|
||||
match self {
|
||||
Self::CloseRegions(reply) => reply,
|
||||
_ => panic!("Expected CloseRegions reply"),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn expect_open_regions_reply(self) -> SimpleReply {
|
||||
match self {
|
||||
Self::OpenRegions(reply) => reply,
|
||||
_ => panic!("Expected OpenRegions reply"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_serialize_instruction() {
|
||||
let open_region = Instruction::OpenRegion(OpenRegion::new(
|
||||
let open_region = Instruction::OpenRegions(vec![OpenRegion::new(
|
||||
RegionIdent {
|
||||
datanode_id: 2,
|
||||
table_id: 1024,
|
||||
@@ -476,30 +512,78 @@ mod tests {
|
||||
HashMap::new(),
|
||||
HashMap::new(),
|
||||
false,
|
||||
));
|
||||
)]);
|
||||
|
||||
let serialized = serde_json::to_string(&open_region).unwrap();
|
||||
|
||||
assert_eq!(
|
||||
r#"{"OpenRegion":{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}}"#,
|
||||
r#"{"OpenRegions":[{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}]}"#,
|
||||
serialized
|
||||
);
|
||||
|
||||
let close_region = Instruction::CloseRegion(RegionIdent {
|
||||
let close_region = Instruction::CloseRegions(vec![RegionIdent {
|
||||
datanode_id: 2,
|
||||
table_id: 1024,
|
||||
region_number: 1,
|
||||
engine: "mito2".to_string(),
|
||||
});
|
||||
}]);
|
||||
|
||||
let serialized = serde_json::to_string(&close_region).unwrap();
|
||||
|
||||
assert_eq!(
|
||||
r#"{"CloseRegion":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}}"#,
|
||||
r#"{"CloseRegions":[{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}]}"#,
|
||||
serialized
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_deserialize_instruction() {
|
||||
let open_region_instruction = r#"{"OpenRegion":[{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}]}"#;
|
||||
let open_region_instruction: Instruction =
|
||||
serde_json::from_str(open_region_instruction).unwrap();
|
||||
let open_region = Instruction::OpenRegions(vec![OpenRegion::new(
|
||||
RegionIdent {
|
||||
datanode_id: 2,
|
||||
table_id: 1024,
|
||||
region_number: 1,
|
||||
engine: "mito2".to_string(),
|
||||
},
|
||||
"test/foo",
|
||||
HashMap::new(),
|
||||
HashMap::new(),
|
||||
false,
|
||||
)]);
|
||||
assert_eq!(open_region_instruction, open_region);
|
||||
|
||||
let close_region_instruction = r#"{"CloseRegion":[{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}]}"#;
|
||||
let close_region_instruction: Instruction =
|
||||
serde_json::from_str(close_region_instruction).unwrap();
|
||||
let close_region = Instruction::CloseRegions(vec![RegionIdent {
|
||||
datanode_id: 2,
|
||||
table_id: 1024,
|
||||
region_number: 1,
|
||||
engine: "mito2".to_string(),
|
||||
}]);
|
||||
assert_eq!(close_region_instruction, close_region);
|
||||
|
||||
let close_region_instruction_reply =
|
||||
r#"{"result":true,"error":null,"type":"close_region"}"#;
|
||||
let close_region_instruction_reply: InstructionReply =
|
||||
serde_json::from_str(close_region_instruction_reply).unwrap();
|
||||
let close_region_reply = InstructionReply::CloseRegions(SimpleReply {
|
||||
result: true,
|
||||
error: None,
|
||||
});
|
||||
assert_eq!(close_region_instruction_reply, close_region_reply);
|
||||
|
||||
let open_region_instruction_reply = r#"{"result":true,"error":null,"type":"open_region"}"#;
|
||||
let open_region_instruction_reply: InstructionReply =
|
||||
serde_json::from_str(open_region_instruction_reply).unwrap();
|
||||
let open_region_reply = InstructionReply::OpenRegions(SimpleReply {
|
||||
result: true,
|
||||
error: None,
|
||||
});
|
||||
assert_eq!(open_region_instruction_reply, open_region_reply);
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
struct LegacyOpenRegion {
|
||||
region_ident: RegionIdent,
|
||||
|
||||
@@ -90,7 +90,10 @@ impl HeartbeatTask {
|
||||
let resp_handler_executor = Arc::new(HandlerGroupExecutor::new(vec![
|
||||
region_alive_keeper.clone(),
|
||||
Arc::new(ParseMailboxMessageHandler),
|
||||
Arc::new(RegionHeartbeatResponseHandler::new(region_server.clone())),
|
||||
Arc::new(
|
||||
RegionHeartbeatResponseHandler::new(region_server.clone())
|
||||
.with_open_region_parallelism(opts.init_regions_parallelism),
|
||||
),
|
||||
Arc::new(InvalidateCacheHandler::new(cache_invalidator)),
|
||||
]));
|
||||
|
||||
|
||||
@@ -40,6 +40,7 @@ pub struct RegionHeartbeatResponseHandler {
|
||||
catchup_tasks: TaskTracker<()>,
|
||||
downgrade_tasks: TaskTracker<()>,
|
||||
flush_tasks: TaskTracker<()>,
|
||||
open_region_parallelism: usize,
|
||||
}
|
||||
|
||||
/// Handler of the instruction.
|
||||
@@ -78,17 +79,29 @@ impl RegionHeartbeatResponseHandler {
|
||||
catchup_tasks: TaskTracker::new(),
|
||||
downgrade_tasks: TaskTracker::new(),
|
||||
flush_tasks: TaskTracker::new(),
|
||||
// Default to half of the number of CPUs.
|
||||
open_region_parallelism: (num_cpus::get() / 2).max(1),
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets the parallelism for opening regions.
|
||||
pub fn with_open_region_parallelism(mut self, parallelism: usize) -> Self {
|
||||
self.open_region_parallelism = parallelism;
|
||||
self
|
||||
}
|
||||
|
||||
/// Builds the [InstructionHandler].
|
||||
fn build_handler(instruction: Instruction) -> MetaResult<InstructionHandler> {
|
||||
fn build_handler(&self, instruction: Instruction) -> MetaResult<InstructionHandler> {
|
||||
match instruction {
|
||||
Instruction::OpenRegion(open_region) => Ok(Box::new(move |handler_context| {
|
||||
handler_context.handle_open_region_instruction(open_region)
|
||||
})),
|
||||
Instruction::CloseRegion(close_region) => Ok(Box::new(|handler_context| {
|
||||
handler_context.handle_close_region_instruction(close_region)
|
||||
Instruction::OpenRegions(open_regions) => {
|
||||
let open_region_parallelism = self.open_region_parallelism;
|
||||
Ok(Box::new(move |handler_context| {
|
||||
handler_context
|
||||
.handle_open_regions_instruction(open_regions, open_region_parallelism)
|
||||
}))
|
||||
}
|
||||
Instruction::CloseRegions(close_regions) => Ok(Box::new(move |handler_context| {
|
||||
handler_context.handle_close_regions_instruction(close_regions)
|
||||
})),
|
||||
Instruction::DowngradeRegion(downgrade_region) => {
|
||||
Ok(Box::new(move |handler_context| {
|
||||
@@ -109,14 +122,22 @@ impl RegionHeartbeatResponseHandler {
|
||||
#[async_trait]
|
||||
impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
|
||||
fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
|
||||
matches!(
|
||||
ctx.incoming_message.as_ref(),
|
||||
Some((_, Instruction::OpenRegion { .. }))
|
||||
| Some((_, Instruction::CloseRegion { .. }))
|
||||
| Some((_, Instruction::DowngradeRegion { .. }))
|
||||
| Some((_, Instruction::UpgradeRegion { .. }))
|
||||
| Some((_, Instruction::FlushRegions { .. }))
|
||||
)
|
||||
matches!(ctx.incoming_message.as_ref(), |Some((
|
||||
_,
|
||||
Instruction::DowngradeRegion { .. },
|
||||
))| Some((
|
||||
_,
|
||||
Instruction::UpgradeRegion { .. }
|
||||
)) | Some((
|
||||
_,
|
||||
Instruction::FlushRegions { .. }
|
||||
)) | Some((
|
||||
_,
|
||||
Instruction::OpenRegions { .. }
|
||||
)) | Some((
|
||||
_,
|
||||
Instruction::CloseRegions { .. }
|
||||
)))
|
||||
}
|
||||
|
||||
async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult<HandleControl> {
|
||||
@@ -130,7 +151,7 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
|
||||
let catchup_tasks = self.catchup_tasks.clone();
|
||||
let downgrade_tasks = self.downgrade_tasks.clone();
|
||||
let flush_tasks = self.flush_tasks.clone();
|
||||
let handler = Self::build_handler(instruction)?;
|
||||
let handler = self.build_handler(instruction)?;
|
||||
let _handle = common_runtime::spawn_global(async move {
|
||||
let reply = handler(HandlerContext {
|
||||
region_server,
|
||||
@@ -176,8 +197,8 @@ mod tests {
|
||||
use crate::tests::mock_region_server;
|
||||
|
||||
pub struct HeartbeatResponseTestEnv {
|
||||
mailbox: MailboxRef,
|
||||
receiver: Receiver<(MessageMeta, InstructionReply)>,
|
||||
pub(crate) mailbox: MailboxRef,
|
||||
pub(crate) receiver: Receiver<(MessageMeta, InstructionReply)>,
|
||||
}
|
||||
|
||||
impl HeartbeatResponseTestEnv {
|
||||
@@ -248,16 +269,16 @@ mod tests {
|
||||
}
|
||||
|
||||
fn close_region_instruction(region_id: RegionId) -> Instruction {
|
||||
Instruction::CloseRegion(RegionIdent {
|
||||
Instruction::CloseRegions(vec![RegionIdent {
|
||||
table_id: region_id.table_id(),
|
||||
region_number: region_id.region_number(),
|
||||
datanode_id: 2,
|
||||
engine: MITO_ENGINE_NAME.to_string(),
|
||||
})
|
||||
}])
|
||||
}
|
||||
|
||||
fn open_region_instruction(region_id: RegionId, path: &str) -> Instruction {
|
||||
Instruction::OpenRegion(OpenRegion::new(
|
||||
Instruction::OpenRegions(vec![OpenRegion::new(
|
||||
RegionIdent {
|
||||
table_id: region_id.table_id(),
|
||||
region_number: region_id.region_number(),
|
||||
@@ -268,7 +289,7 @@ mod tests {
|
||||
HashMap::new(),
|
||||
HashMap::new(),
|
||||
false,
|
||||
))
|
||||
)])
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -303,7 +324,7 @@ mod tests {
|
||||
|
||||
let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
|
||||
|
||||
if let InstructionReply::CloseRegion(reply) = reply {
|
||||
if let InstructionReply::CloseRegions(reply) = reply {
|
||||
assert!(reply.result);
|
||||
assert!(reply.error.is_none());
|
||||
} else {
|
||||
@@ -358,7 +379,7 @@ mod tests {
|
||||
|
||||
let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
|
||||
|
||||
if let InstructionReply::OpenRegion(reply) = reply {
|
||||
if let InstructionReply::OpenRegions(reply) = reply {
|
||||
assert!(reply.result);
|
||||
assert!(reply.error.is_none());
|
||||
} else {
|
||||
@@ -391,7 +412,7 @@ mod tests {
|
||||
|
||||
let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
|
||||
|
||||
if let InstructionReply::OpenRegion(reply) = reply {
|
||||
if let InstructionReply::OpenRegions(reply) = reply {
|
||||
assert!(!reply.result);
|
||||
assert!(reply.error.is_some());
|
||||
} else {
|
||||
|
||||
@@ -14,7 +14,8 @@
|
||||
|
||||
use common_meta::RegionIdent;
|
||||
use common_meta::instruction::{InstructionReply, SimpleReply};
|
||||
use common_telemetry::{tracing, warn};
|
||||
use common_telemetry::warn;
|
||||
use futures::future::join_all;
|
||||
use futures_util::future::BoxFuture;
|
||||
use store_api::region_request::{RegionCloseRequest, RegionRequest};
|
||||
|
||||
@@ -22,35 +23,124 @@ use crate::error;
|
||||
use crate::heartbeat::handler::HandlerContext;
|
||||
|
||||
impl HandlerContext {
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub(crate) fn handle_close_region_instruction(
|
||||
pub(crate) fn handle_close_regions_instruction(
|
||||
self,
|
||||
region_ident: RegionIdent,
|
||||
region_idents: Vec<RegionIdent>,
|
||||
) -> BoxFuture<'static, Option<InstructionReply>> {
|
||||
Box::pin(async move {
|
||||
let region_id = Self::region_ident_to_region_id(®ion_ident);
|
||||
let request = RegionRequest::Close(RegionCloseRequest {});
|
||||
let result = self.region_server.handle_request(region_id, request).await;
|
||||
let region_ids = region_idents
|
||||
.into_iter()
|
||||
.map(|region_ident| Self::region_ident_to_region_id(®ion_ident))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
match result {
|
||||
Ok(_) => Some(InstructionReply::CloseRegion(SimpleReply {
|
||||
let futs = region_ids.iter().map(|region_id| {
|
||||
self.region_server
|
||||
.handle_request(*region_id, RegionRequest::Close(RegionCloseRequest {}))
|
||||
});
|
||||
|
||||
let results = join_all(futs).await;
|
||||
|
||||
let mut errors = vec![];
|
||||
for (region_id, result) in region_ids.into_iter().zip(results.into_iter()) {
|
||||
match result {
|
||||
Ok(_) => (),
|
||||
Err(error::Error::RegionNotFound { .. }) => {
|
||||
warn!(
|
||||
"Received a close regions instruction from meta, but target region:{} is not found.",
|
||||
region_id
|
||||
);
|
||||
}
|
||||
Err(err) => errors.push(format!("region:{region_id}: {err:?}")),
|
||||
}
|
||||
}
|
||||
|
||||
if errors.is_empty() {
|
||||
return Some(InstructionReply::CloseRegions(SimpleReply {
|
||||
result: true,
|
||||
error: None,
|
||||
})),
|
||||
Err(error::Error::RegionNotFound { .. }) => {
|
||||
warn!(
|
||||
"Received a close region instruction from meta, but target region:{region_id} is not found."
|
||||
);
|
||||
Some(InstructionReply::CloseRegion(SimpleReply {
|
||||
result: true,
|
||||
error: None,
|
||||
}))
|
||||
}
|
||||
Err(err) => Some(InstructionReply::CloseRegion(SimpleReply {
|
||||
result: false,
|
||||
error: Some(format!("{err:?}")),
|
||||
})),
|
||||
}));
|
||||
}
|
||||
|
||||
Some(InstructionReply::CloseRegions(SimpleReply {
|
||||
result: false,
|
||||
error: Some(errors.join("; ")),
|
||||
}))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::assert_matches;
|
||||
use std::sync::Arc;
|
||||
|
||||
use assert_matches::assert_matches;
|
||||
use common_meta::RegionIdent;
|
||||
use common_meta::heartbeat::handler::{HandleControl, HeartbeatResponseHandler};
|
||||
use common_meta::heartbeat::mailbox::MessageMeta;
|
||||
use common_meta::instruction::Instruction;
|
||||
use mito2::config::MitoConfig;
|
||||
use mito2::engine::MITO_ENGINE_NAME;
|
||||
use mito2::test_util::{CreateRequestBuilder, TestEnv};
|
||||
use store_api::region_request::RegionRequest;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::heartbeat::handler::RegionHeartbeatResponseHandler;
|
||||
use crate::heartbeat::handler::tests::HeartbeatResponseTestEnv;
|
||||
use crate::tests::mock_region_server;
|
||||
|
||||
fn close_regions_instruction(region_ids: impl IntoIterator<Item = RegionId>) -> Instruction {
|
||||
let region_idents = region_ids
|
||||
.into_iter()
|
||||
.map(|region_id| RegionIdent {
|
||||
table_id: region_id.table_id(),
|
||||
region_number: region_id.region_number(),
|
||||
datanode_id: 2,
|
||||
engine: MITO_ENGINE_NAME.to_string(),
|
||||
})
|
||||
.collect();
|
||||
|
||||
Instruction::CloseRegions(region_idents)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_close_regions() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let mut region_server = mock_region_server();
|
||||
let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
|
||||
let mut engine_env = TestEnv::with_prefix("close-regions").await;
|
||||
let engine = engine_env.create_engine(MitoConfig::default()).await;
|
||||
region_server.register_engine(Arc::new(engine.clone()));
|
||||
let region_id = RegionId::new(1024, 1);
|
||||
let region_id1 = RegionId::new(1024, 2);
|
||||
|
||||
let builder = CreateRequestBuilder::new();
|
||||
let create_req = builder.build();
|
||||
region_server
|
||||
.handle_request(region_id, RegionRequest::Create(create_req))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let create_req1 = builder.build();
|
||||
region_server
|
||||
.handle_request(region_id1, RegionRequest::Create(create_req1))
|
||||
.await
|
||||
.unwrap();
|
||||
let meta = MessageMeta::new_test(1, "test", "dn-1", "meta-0");
|
||||
let instruction =
|
||||
close_regions_instruction([region_id, region_id1, RegionId::new(1024, 3)]);
|
||||
let mut heartbeat_env = HeartbeatResponseTestEnv::new();
|
||||
let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
|
||||
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
|
||||
assert_matches!(control, HandleControl::Continue);
|
||||
|
||||
let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
|
||||
let reply = reply.expect_close_regions_reply();
|
||||
assert!(reply.result);
|
||||
assert!(reply.error.is_none());
|
||||
assert!(!engine.is_region_exists(region_id));
|
||||
assert!(!engine.is_region_exists(region_id1));
|
||||
assert!(!engine.is_region_exists(RegionId::new(1024, 3)));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,39 +16,146 @@ use common_meta::instruction::{InstructionReply, OpenRegion, SimpleReply};
|
||||
use common_meta::wal_options_allocator::prepare_wal_options;
|
||||
use futures_util::future::BoxFuture;
|
||||
use store_api::path_utils::table_dir;
|
||||
use store_api::region_request::{PathType, RegionOpenRequest, RegionRequest};
|
||||
use store_api::region_request::{PathType, RegionOpenRequest};
|
||||
|
||||
use crate::heartbeat::handler::HandlerContext;
|
||||
|
||||
impl HandlerContext {
|
||||
pub(crate) fn handle_open_region_instruction(
|
||||
pub(crate) fn handle_open_regions_instruction(
|
||||
self,
|
||||
OpenRegion {
|
||||
region_ident,
|
||||
region_storage_path,
|
||||
mut region_options,
|
||||
region_wal_options,
|
||||
skip_wal_replay,
|
||||
}: OpenRegion,
|
||||
open_regions: Vec<OpenRegion>,
|
||||
open_region_parallelism: usize,
|
||||
) -> BoxFuture<'static, Option<InstructionReply>> {
|
||||
Box::pin(async move {
|
||||
let region_id = Self::region_ident_to_region_id(®ion_ident);
|
||||
prepare_wal_options(&mut region_options, region_id, ®ion_wal_options);
|
||||
let request = RegionRequest::Open(RegionOpenRequest {
|
||||
engine: region_ident.engine,
|
||||
table_dir: table_dir(®ion_storage_path, region_id.table_id()),
|
||||
path_type: PathType::Bare,
|
||||
options: region_options,
|
||||
skip_wal_replay,
|
||||
checkpoint: None,
|
||||
});
|
||||
let result = self.region_server.handle_request(region_id, request).await;
|
||||
let requests = open_regions
|
||||
.into_iter()
|
||||
.map(|open_region| {
|
||||
let OpenRegion {
|
||||
region_ident,
|
||||
region_storage_path,
|
||||
mut region_options,
|
||||
region_wal_options,
|
||||
skip_wal_replay,
|
||||
} = open_region;
|
||||
let region_id = Self::region_ident_to_region_id(®ion_ident);
|
||||
prepare_wal_options(&mut region_options, region_id, ®ion_wal_options);
|
||||
let request = RegionOpenRequest {
|
||||
engine: region_ident.engine,
|
||||
table_dir: table_dir(®ion_storage_path, region_id.table_id()),
|
||||
path_type: PathType::Bare,
|
||||
options: region_options,
|
||||
skip_wal_replay,
|
||||
checkpoint: None,
|
||||
};
|
||||
(region_id, request)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let result = self
|
||||
.region_server
|
||||
.handle_batch_open_requests(open_region_parallelism, requests, false)
|
||||
.await;
|
||||
let success = result.is_ok();
|
||||
let error = result.as_ref().map_err(|e| format!("{e:?}")).err();
|
||||
Some(InstructionReply::OpenRegion(SimpleReply {
|
||||
Some(InstructionReply::OpenRegions(SimpleReply {
|
||||
result: success,
|
||||
error,
|
||||
}))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::assert_matches::assert_matches;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_meta::RegionIdent;
|
||||
use common_meta::heartbeat::handler::{HandleControl, HeartbeatResponseHandler};
|
||||
use common_meta::heartbeat::mailbox::MessageMeta;
|
||||
use common_meta::instruction::{Instruction, OpenRegion};
|
||||
use mito2::config::MitoConfig;
|
||||
use mito2::engine::MITO_ENGINE_NAME;
|
||||
use mito2::test_util::{CreateRequestBuilder, TestEnv};
|
||||
use store_api::path_utils::table_dir;
|
||||
use store_api::region_request::{RegionCloseRequest, RegionRequest};
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::heartbeat::handler::RegionHeartbeatResponseHandler;
|
||||
use crate::heartbeat::handler::tests::HeartbeatResponseTestEnv;
|
||||
use crate::tests::mock_region_server;
|
||||
|
||||
fn open_regions_instruction(
|
||||
region_ids: impl IntoIterator<Item = RegionId>,
|
||||
storage_path: &str,
|
||||
) -> Instruction {
|
||||
let region_idents = region_ids
|
||||
.into_iter()
|
||||
.map(|region_id| OpenRegion {
|
||||
region_ident: RegionIdent {
|
||||
datanode_id: 0,
|
||||
table_id: region_id.table_id(),
|
||||
region_number: region_id.region_number(),
|
||||
engine: MITO_ENGINE_NAME.to_string(),
|
||||
},
|
||||
region_storage_path: storage_path.to_string(),
|
||||
region_options: HashMap::new(),
|
||||
region_wal_options: HashMap::new(),
|
||||
skip_wal_replay: false,
|
||||
})
|
||||
.collect();
|
||||
|
||||
Instruction::OpenRegions(region_idents)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_open_regions() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let mut region_server = mock_region_server();
|
||||
let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
|
||||
let mut engine_env = TestEnv::with_prefix("open-regions").await;
|
||||
let engine = engine_env.create_engine(MitoConfig::default()).await;
|
||||
region_server.register_engine(Arc::new(engine.clone()));
|
||||
let region_id = RegionId::new(1024, 1);
|
||||
let region_id1 = RegionId::new(1024, 2);
|
||||
let storage_path = "test";
|
||||
let builder = CreateRequestBuilder::new();
|
||||
let mut create_req = builder.build();
|
||||
create_req.table_dir = table_dir(storage_path, region_id.table_id());
|
||||
region_server
|
||||
.handle_request(region_id, RegionRequest::Create(create_req))
|
||||
.await
|
||||
.unwrap();
|
||||
let mut create_req1 = builder.build();
|
||||
create_req1.table_dir = table_dir(storage_path, region_id1.table_id());
|
||||
region_server
|
||||
.handle_request(region_id1, RegionRequest::Create(create_req1))
|
||||
.await
|
||||
.unwrap();
|
||||
region_server
|
||||
.handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
|
||||
.await
|
||||
.unwrap();
|
||||
region_server
|
||||
.handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
|
||||
let instruction = open_regions_instruction([region_id, region_id1], storage_path);
|
||||
let mut heartbeat_env = HeartbeatResponseTestEnv::new();
|
||||
let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
|
||||
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
|
||||
assert_matches!(control, HandleControl::Continue);
|
||||
let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
|
||||
|
||||
let reply = reply.expect_open_regions_reply();
|
||||
assert!(reply.result);
|
||||
assert!(reply.error.is_none());
|
||||
|
||||
assert!(engine.is_region_exists(region_id));
|
||||
assert!(engine.is_region_exists(region_id1));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -80,12 +80,12 @@ impl CloseDowngradedRegion {
|
||||
|
||||
let RegionInfo { engine, .. } = datanode_table_value.region_info.clone();
|
||||
|
||||
Ok(Instruction::CloseRegion(RegionIdent {
|
||||
Ok(Instruction::CloseRegions(vec![RegionIdent {
|
||||
datanode_id: downgrade_leader_datanode_id,
|
||||
table_id,
|
||||
region_number,
|
||||
engine,
|
||||
}))
|
||||
}]))
|
||||
}
|
||||
|
||||
/// Closes the downgraded leader region.
|
||||
@@ -121,7 +121,7 @@ impl CloseDowngradedRegion {
|
||||
"Received close downgraded leade region reply: {:?}, region: {}",
|
||||
reply, region_id
|
||||
);
|
||||
let InstructionReply::CloseRegion(SimpleReply { result, error }) = reply else {
|
||||
let InstructionReply::CloseRegions(SimpleReply { result, error }) = reply else {
|
||||
return error::UnexpectedInstructionReplySnafu {
|
||||
mailbox_message: msg.to_string(),
|
||||
reason: "expect close region reply",
|
||||
|
||||
@@ -78,7 +78,7 @@ impl OpenCandidateRegion {
|
||||
engine,
|
||||
} = datanode_table_value.region_info.clone();
|
||||
|
||||
let open_instruction = Instruction::OpenRegion(OpenRegion::new(
|
||||
let open_instruction = Instruction::OpenRegions(vec![OpenRegion::new(
|
||||
RegionIdent {
|
||||
datanode_id: candidate_id,
|
||||
table_id,
|
||||
@@ -89,7 +89,7 @@ impl OpenCandidateRegion {
|
||||
region_options,
|
||||
region_wal_options,
|
||||
true,
|
||||
));
|
||||
)]);
|
||||
|
||||
Ok(open_instruction)
|
||||
}
|
||||
@@ -155,7 +155,7 @@ impl OpenCandidateRegion {
|
||||
region_id,
|
||||
now.elapsed()
|
||||
);
|
||||
let InstructionReply::OpenRegion(SimpleReply { result, error }) = reply else {
|
||||
let InstructionReply::OpenRegions(SimpleReply { result, error }) = reply else {
|
||||
return error::UnexpectedInstructionReplySnafu {
|
||||
mailbox_message: msg.to_string(),
|
||||
reason: "expect open region reply",
|
||||
@@ -215,7 +215,7 @@ mod tests {
|
||||
}
|
||||
|
||||
fn new_mock_open_instruction(datanode_id: DatanodeId, region_id: RegionId) -> Instruction {
|
||||
Instruction::OpenRegion(OpenRegion {
|
||||
Instruction::OpenRegions(vec![OpenRegion {
|
||||
region_ident: RegionIdent {
|
||||
datanode_id,
|
||||
table_id: region_id.table_id(),
|
||||
@@ -226,7 +226,7 @@ mod tests {
|
||||
region_options: Default::default(),
|
||||
region_wal_options: Default::default(),
|
||||
skip_wal_replay: true,
|
||||
})
|
||||
}])
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -95,8 +95,11 @@ pub fn new_open_region_reply(id: u64, result: bool, error: Option<String>) -> Ma
|
||||
to: "meta".to_string(),
|
||||
timestamp_millis: current_time_millis(),
|
||||
payload: Some(Payload::Json(
|
||||
serde_json::to_string(&InstructionReply::OpenRegion(SimpleReply { result, error }))
|
||||
.unwrap(),
|
||||
serde_json::to_string(&InstructionReply::OpenRegions(SimpleReply {
|
||||
result,
|
||||
error,
|
||||
}))
|
||||
.unwrap(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
@@ -157,7 +160,7 @@ pub fn new_close_region_reply(id: u64) -> MailboxMessage {
|
||||
to: "meta".to_string(),
|
||||
timestamp_millis: current_time_millis(),
|
||||
payload: Some(Payload::Json(
|
||||
serde_json::to_string(&InstructionReply::CloseRegion(SimpleReply {
|
||||
serde_json::to_string(&InstructionReply::CloseRegions(SimpleReply {
|
||||
result: false,
|
||||
error: None,
|
||||
}))
|
||||
|
||||
Reference in New Issue
Block a user