refactor: add support for batch region upgrade operations part1 (#7155)

* refactor: convert UpgradeRegion instruction to batch operation

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: introduce `handle_batch_catchup_requests` fn for mito engine

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: add tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: introduce `handle_batch_catchup_requests` fn for metric engine

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: suggestion and add ser/de tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: add comments

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: fix unit tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-10-31 11:08:38 +08:00
committed by GitHub
parent 30894d7599
commit 6960a0183a
16 changed files with 887 additions and 178 deletions

View File

@@ -507,13 +507,14 @@ pub enum Instruction {
/// Closes regions.
#[serde(deserialize_with = "single_or_multiple_from", alias = "CloseRegion")]
CloseRegions(Vec<RegionIdent>),
/// Upgrades a region.
UpgradeRegion(UpgradeRegion),
/// Upgrades regions.
#[serde(deserialize_with = "single_or_multiple_from", alias = "UpgradeRegion")]
UpgradeRegions(Vec<UpgradeRegion>),
#[serde(
deserialize_with = "single_or_multiple_from",
alias = "DowngradeRegion"
)]
/// Downgrades a region.
/// Downgrades regions.
DowngradeRegions(Vec<DowngradeRegion>),
/// Invalidates batch cache.
InvalidateCaches(Vec<CacheIdent>),
@@ -559,9 +560,9 @@ impl Instruction {
}
/// Converts the instruction into a [UpgradeRegion].
pub fn into_upgrade_regions(self) -> Option<UpgradeRegion> {
pub fn into_upgrade_regions(self) -> Option<Vec<UpgradeRegion>> {
match self {
Self::UpgradeRegion(upgrade_region) => Some(upgrade_region),
Self::UpgradeRegions(upgrade_region) => Some(upgrade_region),
_ => None,
}
}
@@ -584,6 +585,10 @@ impl Instruction {
/// The reply of [UpgradeRegion].
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct UpgradeRegionReply {
/// The [RegionId].
/// For compatibility, it is defaulted to [RegionId::new(0, 0)].
#[serde(default)]
pub region_id: RegionId,
/// Returns true if `last_entry_id` has been replayed to the latest.
pub ready: bool,
/// Indicates whether the region exists.
@@ -635,6 +640,39 @@ where
})
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct UpgradeRegionsReply {
pub replies: Vec<UpgradeRegionReply>,
}
impl UpgradeRegionsReply {
pub fn new(replies: Vec<UpgradeRegionReply>) -> Self {
Self { replies }
}
pub fn single(reply: UpgradeRegionReply) -> Self {
Self::new(vec![reply])
}
}
#[derive(Deserialize)]
#[serde(untagged)]
enum UpgradeRegionsCompat {
Single(UpgradeRegionReply),
Multiple(UpgradeRegionsReply),
}
fn upgrade_regions_compat_from<'de, D>(deserializer: D) -> Result<UpgradeRegionsReply, D::Error>
where
D: Deserializer<'de>,
{
let helper = UpgradeRegionsCompat::deserialize(deserializer)?;
Ok(match helper {
UpgradeRegionsCompat::Single(x) => UpgradeRegionsReply::new(vec![x]),
UpgradeRegionsCompat::Multiple(reply) => reply,
})
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum InstructionReply {
@@ -642,7 +680,11 @@ pub enum InstructionReply {
OpenRegions(SimpleReply),
#[serde(alias = "close_region")]
CloseRegions(SimpleReply),
UpgradeRegion(UpgradeRegionReply),
#[serde(
deserialize_with = "upgrade_regions_compat_from",
alias = "upgrade_region"
)]
UpgradeRegions(UpgradeRegionsReply),
#[serde(
alias = "downgrade_region",
deserialize_with = "downgrade_regions_compat_from"
@@ -658,9 +700,11 @@ impl Display for InstructionReply {
match self {
Self::OpenRegions(reply) => write!(f, "InstructionReply::OpenRegions({})", reply),
Self::CloseRegions(reply) => write!(f, "InstructionReply::CloseRegions({})", reply),
Self::UpgradeRegion(reply) => write!(f, "InstructionReply::UpgradeRegion({})", reply),
Self::UpgradeRegions(reply) => {
write!(f, "InstructionReply::UpgradeRegions({:?})", reply.replies)
}
Self::DowngradeRegions(reply) => {
write!(f, "InstructionReply::DowngradeRegions({:?})", reply)
write!(f, "InstructionReply::DowngradeRegions({:?})", reply.replies)
}
Self::FlushRegions(reply) => write!(f, "InstructionReply::FlushRegions({})", reply),
Self::GetFileRefs(reply) => write!(f, "InstructionReply::GetFileRefs({})", reply),
@@ -685,9 +729,9 @@ impl InstructionReply {
}
}
pub fn expect_upgrade_region_reply(self) -> UpgradeRegionReply {
pub fn expect_upgrade_regions_reply(self) -> Vec<UpgradeRegionReply> {
match self {
Self::UpgradeRegion(reply) => reply,
Self::UpgradeRegions(reply) => reply.replies,
_ => panic!("Expected UpgradeRegion reply"),
}
}
@@ -749,25 +793,58 @@ mod tests {
serialized
);
let downgrade_region = InstructionReply::DowngradeRegions(DowngradeRegionsReply::single(
DowngradeRegionReply {
let upgrade_region = Instruction::UpgradeRegions(vec![UpgradeRegion {
region_id: RegionId::new(1024, 1),
last_entry_id: None,
metadata_last_entry_id: None,
replay_timeout: Some(Duration::from_millis(1000)),
location_id: None,
replay_entry_id: None,
metadata_replay_entry_id: None,
}]);
let serialized = serde_json::to_string(&upgrade_region).unwrap();
assert_eq!(
r#"{"UpgradeRegions":[{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"replay_timeout":"1s","location_id":null}]}"#,
serialized
);
}
#[test]
fn test_serialize_instruction_reply() {
let downgrade_region_reply = InstructionReply::DowngradeRegions(
DowngradeRegionsReply::single(DowngradeRegionReply {
region_id: RegionId::new(1024, 1),
last_entry_id: None,
metadata_last_entry_id: None,
exists: true,
error: None,
},
));
}),
);
let serialized = serde_json::to_string(&downgrade_region).unwrap();
let serialized = serde_json::to_string(&downgrade_region_reply).unwrap();
assert_eq!(
r#"{"type":"downgrade_regions","replies":[{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"exists":true,"error":null}]}"#,
serialized
)
);
let upgrade_region_reply =
InstructionReply::UpgradeRegions(UpgradeRegionsReply::single(UpgradeRegionReply {
region_id: RegionId::new(1024, 1),
ready: true,
exists: true,
error: None,
}));
let serialized = serde_json::to_string(&upgrade_region_reply).unwrap();
assert_eq!(
r#"{"type":"upgrade_regions","replies":[{"region_id":4398046511105,"ready":true,"exists":true,"error":null}]}"#,
serialized
);
}
#[test]
fn test_deserialize_instruction() {
// legacy open region instruction
let open_region_instruction = r#"{"OpenRegion":{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}}"#;
let open_region_instruction: Instruction =
serde_json::from_str(open_region_instruction).unwrap();
@@ -785,6 +862,7 @@ mod tests {
)]);
assert_eq!(open_region_instruction, open_region);
// legacy close region instruction
let close_region_instruction = r#"{"CloseRegion":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}}"#;
let close_region_instruction: Instruction =
serde_json::from_str(close_region_instruction).unwrap();
@@ -796,6 +874,7 @@ mod tests {
}]);
assert_eq!(close_region_instruction, close_region);
// legacy downgrade region instruction
let downgrade_region_instruction = r#"{"DowngradeRegions":{"region_id":4398046511105,"flush_timeout":{"secs":1,"nanos":0}}}"#;
let downgrade_region_instruction: Instruction =
serde_json::from_str(downgrade_region_instruction).unwrap();
@@ -805,6 +884,25 @@ mod tests {
}]);
assert_eq!(downgrade_region_instruction, downgrade_region);
// legacy upgrade region instruction
let upgrade_region_instruction = r#"{"UpgradeRegion":{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"replay_timeout":"1s","location_id":null,"replay_entry_id":null,"metadata_replay_entry_id":null}}"#;
let upgrade_region_instruction: Instruction =
serde_json::from_str(upgrade_region_instruction).unwrap();
let upgrade_region = Instruction::UpgradeRegions(vec![UpgradeRegion {
region_id: RegionId::new(1024, 1),
last_entry_id: None,
metadata_last_entry_id: None,
replay_timeout: Some(Duration::from_millis(1000)),
location_id: None,
replay_entry_id: None,
metadata_replay_entry_id: None,
}]);
assert_eq!(upgrade_region_instruction, upgrade_region);
}
#[test]
fn test_deserialize_instruction_reply() {
// legacy close region reply
let close_region_instruction_reply =
r#"{"result":true,"error":null,"type":"close_region"}"#;
let close_region_instruction_reply: InstructionReply =
@@ -815,6 +913,7 @@ mod tests {
});
assert_eq!(close_region_instruction_reply, close_region_reply);
// legacy open region reply
let open_region_instruction_reply = r#"{"result":true,"error":null,"type":"open_region"}"#;
let open_region_instruction_reply: InstructionReply =
serde_json::from_str(open_region_instruction_reply).unwrap();
@@ -824,6 +923,7 @@ mod tests {
});
assert_eq!(open_region_instruction_reply, open_region_reply);
// legacy downgrade region reply
let downgrade_region_instruction_reply = r#"{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"exists":true,"error":null,"type":"downgrade_region"}"#;
let downgrade_region_instruction_reply: InstructionReply =
serde_json::from_str(downgrade_region_instruction_reply).unwrap();
@@ -837,6 +937,19 @@ mod tests {
}),
);
assert_eq!(downgrade_region_instruction_reply, downgrade_region_reply);
// legacy upgrade region reply
let upgrade_region_instruction_reply = r#"{"region_id":4398046511105,"ready":true,"exists":true,"error":null,"type":"upgrade_region"}"#;
let upgrade_region_instruction_reply: InstructionReply =
serde_json::from_str(upgrade_region_instruction_reply).unwrap();
let upgrade_region_reply =
InstructionReply::UpgradeRegions(UpgradeRegionsReply::single(UpgradeRegionReply {
region_id: RegionId::new(1024, 1),
ready: true,
exists: true,
error: None,
}));
assert_eq!(upgrade_region_instruction_reply, upgrade_region_reply);
}
#[derive(Debug, Clone, Serialize, Deserialize)]

View File

@@ -114,7 +114,7 @@ impl RegionHeartbeatResponseHandler {
)),
Instruction::FlushRegions(_) => Ok(Box::new(FlushRegionsHandler.into())),
Instruction::DowngradeRegions(_) => Ok(Box::new(DowngradeRegionsHandler.into())),
Instruction::UpgradeRegion(_) => Ok(Box::new(UpgradeRegionsHandler.into())),
Instruction::UpgradeRegions(_) => Ok(Box::new(UpgradeRegionsHandler.into())),
Instruction::GetFileRefs(_) => Ok(Box::new(GetFileRefsHandler.into())),
Instruction::GcRegions(_) => Ok(Box::new(GcRegionsHandler.into())),
Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(),
@@ -194,7 +194,7 @@ dispatch_instr!(
OpenRegions => OpenRegions,
FlushRegions => FlushRegions,
DowngradeRegions => DowngradeRegions,
UpgradeRegion => UpgradeRegions,
UpgradeRegions => UpgradeRegions,
GetFileRefs => GetFileRefs,
GcRegions => GcRegions,
);
@@ -334,10 +334,10 @@ mod tests {
);
// Upgrade region
let instruction = Instruction::UpgradeRegion(UpgradeRegion {
let instruction = Instruction::UpgradeRegions(vec![UpgradeRegion {
region_id,
..Default::default()
});
}]);
assert!(
heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((meta, instruction)))
);

View File

@@ -12,8 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_meta::instruction::{InstructionReply, UpgradeRegion, UpgradeRegionReply};
use common_meta::instruction::{
InstructionReply, UpgradeRegion, UpgradeRegionReply, UpgradeRegionsReply,
};
use common_telemetry::{info, warn};
use futures::future::join_all;
use store_api::region_request::{RegionCatchupRequest, RegionRequest, ReplayCheckpoint};
use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
@@ -22,115 +25,151 @@ use crate::heartbeat::task_tracker::WaitResult;
#[derive(Debug, Clone, Copy, Default)]
pub struct UpgradeRegionsHandler;
impl UpgradeRegionsHandler {
// Handles upgrade regions instruction.
//
// Returns batch of upgrade region replies, the order of the replies is not guaranteed.
async fn handle_upgrade_regions(
&self,
ctx: &HandlerContext,
upgrade_regions: Vec<UpgradeRegion>,
) -> Vec<UpgradeRegionReply> {
let mut replies = Vec::with_capacity(upgrade_regions.len());
let mut catchup_request = Vec::with_capacity(upgrade_regions.len());
for upgrade_region in upgrade_regions {
let Some(writable) = ctx.region_server.is_region_leader(upgrade_region.region_id)
else {
replies.push(UpgradeRegionReply {
region_id: upgrade_region.region_id,
ready: false,
exists: false,
error: None,
});
continue;
};
if writable {
replies.push(UpgradeRegionReply {
region_id: upgrade_region.region_id,
ready: true,
exists: true,
error: None,
});
} else {
let UpgradeRegion {
last_entry_id,
metadata_last_entry_id,
location_id,
replay_entry_id,
metadata_replay_entry_id,
replay_timeout,
..
} = upgrade_region;
let checkpoint = match (replay_entry_id, metadata_replay_entry_id) {
(Some(entry_id), metadata_entry_id) => Some(ReplayCheckpoint {
entry_id,
metadata_entry_id,
}),
_ => None,
};
catchup_request.push((
upgrade_region.region_id,
replay_timeout.unwrap_or_default(),
RegionCatchupRequest {
set_writable: true,
entry_id: last_entry_id,
metadata_entry_id: metadata_last_entry_id,
location_id,
checkpoint,
},
));
}
}
let mut wait_results = Vec::with_capacity(catchup_request.len());
for (region_id, replay_timeout, catchup_request) in catchup_request {
let region_server_moved = ctx.region_server.clone();
// TODO(weny): parallelize the catchup tasks.
let result = ctx
.catchup_tasks
.try_register(
region_id,
Box::pin(async move {
info!(
"Executing region: {region_id} catchup to: last entry id {:?}",
catchup_request.entry_id
);
region_server_moved
.handle_request(region_id, RegionRequest::Catchup(catchup_request))
.await?;
Ok(())
}),
)
.await;
if result.is_busy() {
warn!("Another catchup task is running for the region: {region_id}");
}
// We don't care that it returns a newly registered or running task.
let mut watcher = result.into_watcher();
wait_results.push((
region_id,
ctx.catchup_tasks.wait(&mut watcher, replay_timeout).await,
));
}
let results = join_all(
wait_results
.into_iter()
.map(|(region_id, result)| async move {
match result {
WaitResult::Timeout => UpgradeRegionReply {
region_id,
ready: false,
exists: true,
error: None,
},
WaitResult::Finish(Ok(_)) => UpgradeRegionReply {
region_id,
ready: true,
exists: true,
error: None,
},
WaitResult::Finish(Err(err)) => UpgradeRegionReply {
region_id,
ready: false,
exists: true,
error: Some(format!("{err:?}")),
},
}
}),
)
.await;
replies.extend(results.into_iter());
replies
}
}
#[async_trait::async_trait]
impl InstructionHandler for UpgradeRegionsHandler {
type Instruction = UpgradeRegion;
type Instruction = Vec<UpgradeRegion>;
async fn handle(
&self,
ctx: &HandlerContext,
UpgradeRegion {
region_id,
last_entry_id,
metadata_last_entry_id,
replay_timeout,
location_id,
replay_entry_id,
metadata_replay_entry_id,
}: UpgradeRegion,
upgrade_regions: Self::Instruction,
) -> Option<InstructionReply> {
let Some(writable) = ctx.region_server.is_region_leader(region_id) else {
return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready: false,
exists: false,
error: None,
}));
};
let replies = self.handle_upgrade_regions(ctx, upgrade_regions).await;
if writable {
return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready: true,
exists: true,
error: None,
}));
}
let region_server_moved = ctx.region_server.clone();
let checkpoint = match (replay_entry_id, metadata_replay_entry_id) {
(Some(entry_id), metadata_entry_id) => Some(ReplayCheckpoint {
entry_id,
metadata_entry_id,
}),
_ => None,
};
// The catchup task is almost zero cost if the inside region is writable.
// Therefore, it always registers a new catchup task.
let register_result = ctx
.catchup_tasks
.try_register(
region_id,
Box::pin(async move {
info!(
"Executing region: {region_id} catchup to: last entry id {last_entry_id:?}"
);
region_server_moved
.handle_request(
region_id,
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: true,
entry_id: last_entry_id,
metadata_entry_id: metadata_last_entry_id,
location_id,
checkpoint,
}),
)
.await?;
Ok(())
}),
)
.await;
if register_result.is_busy() {
warn!("Another catchup task is running for the region: {region_id}");
}
// Returns immediately
let Some(replay_timeout) = replay_timeout else {
return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready: false,
exists: true,
error: None,
}));
};
// We don't care that it returns a newly registered or running task.
let mut watcher = register_result.into_watcher();
let result = ctx.catchup_tasks.wait(&mut watcher, replay_timeout).await;
match result {
WaitResult::Timeout => Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready: false,
exists: true,
error: None,
})),
WaitResult::Finish(Ok(_)) => {
Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready: true,
exists: true,
error: None,
}))
}
WaitResult::Finish(Err(err)) => {
Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready: false,
exists: true,
error: Some(format!("{err:?}")),
}))
}
}
Some(InstructionReply::UpgradeRegions(UpgradeRegionsReply::new(
replies,
)))
}
}
@@ -164,15 +203,15 @@ mod tests {
let reply = UpgradeRegionsHandler
.handle(
&handler_context,
UpgradeRegion {
vec![UpgradeRegion {
region_id,
replay_timeout,
..Default::default()
},
}],
)
.await;
let reply = reply.unwrap().expect_upgrade_region_reply();
let reply = &reply.unwrap().expect_upgrade_regions_reply()[0];
assert!(!reply.exists);
assert!(reply.error.is_none());
}
@@ -201,15 +240,15 @@ mod tests {
let reply = UpgradeRegionsHandler
.handle(
&handler_context,
UpgradeRegion {
vec![UpgradeRegion {
region_id,
replay_timeout,
..Default::default()
},
}],
)
.await;
let reply = reply.unwrap().expect_upgrade_region_reply();
let reply = &reply.unwrap().expect_upgrade_regions_reply()[0];
assert!(reply.ready);
assert!(reply.exists);
assert!(reply.error.is_none());
@@ -239,15 +278,15 @@ mod tests {
let reply = UpgradeRegionsHandler
.handle(
&handler_context,
UpgradeRegion {
vec![UpgradeRegion {
region_id,
replay_timeout,
..Default::default()
},
}],
)
.await;
let reply = reply.unwrap().expect_upgrade_region_reply();
let reply = &reply.unwrap().expect_upgrade_regions_reply()[0];
assert!(!reply.ready);
assert!(reply.exists);
assert!(reply.error.is_none());
@@ -280,15 +319,15 @@ mod tests {
let reply = UpgradeRegionsHandler
.handle(
&handler_context,
UpgradeRegion {
vec![UpgradeRegion {
region_id,
replay_timeout,
..Default::default()
},
}],
)
.await;
let reply = reply.unwrap().expect_upgrade_region_reply();
let reply = &reply.unwrap().expect_upgrade_regions_reply()[0];
assert!(!reply.ready);
assert!(reply.exists);
assert!(reply.error.is_none());
@@ -298,17 +337,17 @@ mod tests {
let reply = UpgradeRegionsHandler
.handle(
&handler_context,
UpgradeRegion {
vec![UpgradeRegion {
region_id,
replay_timeout: Some(Duration::from_millis(500)),
..Default::default()
},
}],
)
.await;
// Must less than 300 ms.
assert!(timer.elapsed().as_millis() < 300);
let reply = reply.unwrap().expect_upgrade_region_reply();
let reply = &reply.unwrap().expect_upgrade_regions_reply()[0];
assert!(reply.ready);
assert!(reply.exists);
assert!(reply.error.is_none());
@@ -339,15 +378,15 @@ mod tests {
let reply = UpgradeRegionsHandler
.handle(
&handler_context,
UpgradeRegion {
vec![UpgradeRegion {
region_id,
..Default::default()
},
}],
)
.await;
// It didn't wait for handle returns; it had no idea about the error.
let reply = reply.unwrap().expect_upgrade_region_reply();
let reply = &reply.unwrap().expect_upgrade_regions_reply()[0];
assert!(!reply.ready);
assert!(reply.exists);
assert!(reply.error.is_none());
@@ -355,18 +394,18 @@ mod tests {
let reply = UpgradeRegionsHandler
.handle(
&handler_context,
UpgradeRegion {
vec![UpgradeRegion {
region_id,
replay_timeout: Some(Duration::from_millis(200)),
..Default::default()
},
}],
)
.await;
let reply = reply.unwrap().expect_upgrade_region_reply();
let reply = &reply.unwrap().expect_upgrade_regions_reply()[0];
assert!(!reply.ready);
assert!(reply.exists);
assert!(reply.error.is_some());
assert!(reply.error.unwrap().contains("mock_error"));
assert!(reply.error.as_ref().unwrap().contains("mock_error"));
}
}

View File

@@ -17,7 +17,9 @@ use std::time::Duration;
use api::v1::meta::MailboxMessage;
use common_meta::ddl::utils::parse_region_wal_options;
use common_meta::instruction::{Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply};
use common_meta::instruction::{
Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply, UpgradeRegionsReply,
};
use common_meta::lock_key::RemoteWalLock;
use common_meta::wal_options_allocator::extract_topic_from_wal_options;
use common_procedure::{Context as ProcedureContext, Status};
@@ -131,7 +133,7 @@ impl UpgradeCandidateRegion {
None
};
let upgrade_instruction = Instruction::UpgradeRegion(
let upgrade_instruction = Instruction::UpgradeRegions(vec![
UpgradeRegion {
region_id,
last_entry_id,
@@ -143,7 +145,7 @@ impl UpgradeCandidateRegion {
}
.with_replay_entry_id(checkpoint.map(|c| c.entry_id))
.with_metadata_replay_entry_id(checkpoint.and_then(|c| c.metadata_entry_id)),
);
]);
Ok(upgrade_instruction)
}
@@ -193,11 +195,7 @@ impl UpgradeCandidateRegion {
match receiver.await {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
let InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready,
exists,
error,
}) = reply
let InstructionReply::UpgradeRegions(UpgradeRegionsReply { replies }) = reply
else {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
@@ -205,6 +203,13 @@ impl UpgradeCandidateRegion {
}
.fail();
};
// TODO(weny): handle multiple replies.
let UpgradeRegionReply {
ready,
exists,
error,
..
} = &replies[0];
// Notes: The order of handling is important.
if error.is_some() {

View File

@@ -18,7 +18,7 @@ use api::v1::meta::mailbox_message::Payload;
use api::v1::meta::{HeartbeatResponse, MailboxMessage};
use common_meta::instruction::{
DowngradeRegionReply, DowngradeRegionsReply, FlushRegionReply, InstructionReply, SimpleReply,
UpgradeRegionReply,
UpgradeRegionReply, UpgradeRegionsReply,
};
use common_meta::key::TableMetadataManagerRef;
use common_meta::key::table_route::TableRouteValue;
@@ -212,11 +212,14 @@ pub fn new_upgrade_region_reply(
to: "meta".to_string(),
timestamp_millis: current_time_millis(),
payload: Some(Payload::Json(
serde_json::to_string(&InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready,
exists,
error,
}))
serde_json::to_string(&InstructionReply::UpgradeRegions(
UpgradeRegionsReply::single(UpgradeRegionReply {
region_id: RegionId::new(0, 0),
ready,
exists,
error,
}),
))
.unwrap(),
)),
}

View File

@@ -46,7 +46,9 @@ use store_api::region_engine::{
RegionStatistic, SetRegionRoleStateResponse, SetRegionRoleStateSuccess,
SettableRegionRoleState, SyncManifestResponse,
};
use store_api::region_request::{BatchRegionDdlRequest, RegionOpenRequest, RegionRequest};
use store_api::region_request::{
BatchRegionDdlRequest, RegionCatchupRequest, RegionOpenRequest, RegionRequest,
};
use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
use crate::config::EngineConfig;
@@ -142,6 +144,17 @@ impl RegionEngine for MetricEngine {
.map_err(BoxedError::new)
}
async fn handle_batch_catchup_requests(
&self,
parallelism: usize,
requests: Vec<(RegionId, RegionCatchupRequest)>,
) -> Result<BatchResponses, BoxedError> {
self.inner
.handle_batch_catchup_requests(parallelism, requests)
.await
.map_err(BoxedError::new)
}
async fn handle_batch_ddl_requests(
&self,
batch_request: BatchRegionDdlRequest,

View File

@@ -12,9 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use common_error::ext::BoxedError;
use common_telemetry::debug;
use snafu::{OptionExt, ResultExt};
use store_api::region_engine::RegionEngine;
use store_api::region_engine::{BatchResponses, RegionEngine};
use store_api::region_request::{
AffectedRows, RegionCatchupRequest, RegionRequest, ReplayCheckpoint,
};
@@ -22,11 +25,101 @@ use store_api::storage::RegionId;
use crate::engine::MetricEngineInner;
use crate::error::{
MitoCatchupOperationSnafu, PhysicalRegionNotFoundSnafu, Result, UnsupportedRegionRequestSnafu,
BatchCatchupMitoRegionSnafu, MitoCatchupOperationSnafu, PhysicalRegionNotFoundSnafu, Result,
UnsupportedRegionRequestSnafu,
};
use crate::utils;
impl MetricEngineInner {
pub async fn handle_batch_catchup_requests(
&self,
parallelism: usize,
requests: Vec<(RegionId, RegionCatchupRequest)>,
) -> Result<BatchResponses> {
let mut all_requests = Vec::with_capacity(requests.len() * 2);
let mut physical_region_options_list = Vec::with_capacity(requests.len());
for (region_id, req) in requests {
let metadata_region_id = utils::to_metadata_region_id(region_id);
let data_region_id = utils::to_data_region_id(region_id);
let physical_region_options = *self
.state
.read()
.unwrap()
.physical_region_states()
.get(&data_region_id)
.context(PhysicalRegionNotFoundSnafu {
region_id: data_region_id,
})?
.options();
physical_region_options_list.push((data_region_id, physical_region_options));
all_requests.push((
metadata_region_id,
RegionCatchupRequest {
set_writable: req.set_writable,
entry_id: req.metadata_entry_id,
metadata_entry_id: None,
location_id: req.location_id,
checkpoint: req.checkpoint.map(|c| ReplayCheckpoint {
entry_id: c.metadata_entry_id.unwrap_or_default(),
metadata_entry_id: None,
}),
},
));
all_requests.push((
data_region_id,
RegionCatchupRequest {
set_writable: req.set_writable,
entry_id: req.entry_id,
metadata_entry_id: None,
location_id: req.location_id,
checkpoint: req.checkpoint.map(|c| ReplayCheckpoint {
entry_id: c.entry_id,
metadata_entry_id: None,
}),
},
));
}
let mut results = self
.mito
.handle_batch_catchup_requests(parallelism, all_requests)
.await
.context(BatchCatchupMitoRegionSnafu {})?
.into_iter()
.collect::<HashMap<_, _>>();
let mut responses = Vec::with_capacity(physical_region_options_list.len());
for (physical_region_id, physical_region_options) in physical_region_options_list {
let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
let data_region_id = utils::to_data_region_id(physical_region_id);
let metadata_region_result = results.remove(&metadata_region_id);
let data_region_result = results.remove(&data_region_id);
// Pass the optional `metadata_region_result` and `data_region_result` to
// `recover_physical_region_with_results`. This function handles errors for each
// catchup physical region request, allowing the process to continue with the
// remaining regions even if some requests fail.
let response = self
.recover_physical_region_with_results(
metadata_region_result,
data_region_result,
physical_region_id,
physical_region_options,
// Note: We intentionally dont close the region if recovery fails.
// Closing it here might confuse the region server since it links RegionIds to Engines.
// If recovery didnt succeed, the region should stay open.
false,
)
.await
.map_err(BoxedError::new);
responses.push((physical_region_id, response));
}
Ok(responses)
}
pub async fn catchup_region(
&self,
region_id: RegionId,

View File

@@ -72,17 +72,19 @@ impl MetricEngineInner {
let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
let data_region_id = utils::to_data_region_id(physical_region_id);
let metadata_region_result = results.remove(&metadata_region_id);
let data_region_result = results.remove(&data_region_id);
let data_region_result: Option<std::result::Result<RegionResponse, BoxedError>> =
results.remove(&data_region_id);
// Pass the optional `metadata_region_result` and `data_region_result` to
// `open_physical_region_with_results`. This function handles errors for each
// `recover_physical_region_with_results`. This function handles errors for each
// open physical region request, allowing the process to continue with the
// remaining regions even if some requests fail.
let response = self
.open_physical_region_with_results(
.recover_physical_region_with_results(
metadata_region_result,
data_region_result,
physical_region_id,
physical_region_options,
true,
)
.await
.map_err(BoxedError::new);
@@ -107,12 +109,13 @@ impl MetricEngineInner {
}
}
async fn open_physical_region_with_results(
pub(crate) async fn recover_physical_region_with_results(
&self,
metadata_region_result: Option<std::result::Result<RegionResponse, BoxedError>>,
data_region_result: Option<std::result::Result<RegionResponse, BoxedError>>,
physical_region_id: RegionId,
physical_region_options: PhysicalRegionOptions,
close_region_on_failure: bool,
) -> Result<RegionResponse> {
let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
let data_region_id = utils::to_data_region_id(physical_region_id);
@@ -136,8 +139,10 @@ impl MetricEngineInner {
.recover_states(physical_region_id, physical_region_options)
.await
{
self.close_physical_region_on_recovery_failure(physical_region_id)
.await;
if close_region_on_failure {
self.close_physical_region_on_recovery_failure(physical_region_id)
.await;
}
return Err(err);
}
Ok(data_region_response)

View File

@@ -50,6 +50,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to batch catchup mito region"))]
BatchCatchupMitoRegion {
source: BoxedError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("No open region result for region {}", region_id))]
NoOpenRegionResult {
region_id: RegionId,
@@ -361,7 +368,8 @@ impl ErrorExt for Error {
| MitoFlushOperation { source, .. }
| MitoDeleteOperation { source, .. }
| MitoSyncOperation { source, .. }
| BatchOpenMitoRegion { source, .. } => source.status_code(),
| BatchOpenMitoRegion { source, .. }
| BatchCatchupMitoRegion { source, .. } => source.status_code(),
EncodePrimaryKey { source, .. } => source.status_code(),

View File

@@ -21,6 +21,8 @@ mod append_mode_test;
#[cfg(test)]
mod basic_test;
#[cfg(test)]
mod batch_catchup_test;
#[cfg(test)]
mod batch_open_test;
#[cfg(test)]
mod bump_committed_sequence_test;
@@ -91,7 +93,7 @@ use snafu::{OptionExt, ResultExt, ensure};
use store_api::ManifestVersion;
use store_api::codec::PrimaryKeyEncoding;
use store_api::logstore::LogStore;
use store_api::logstore::provider::Provider;
use store_api::logstore::provider::{KafkaProvider, Provider};
use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
use store_api::metric_engine_consts::{
MANIFEST_INFO_EXTENSION_KEY, TABLE_COLUMN_METADATA_EXTENSION_KEY,
@@ -100,7 +102,9 @@ use store_api::region_engine::{
BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef,
RegionStatistic, SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse,
};
use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
use store_api::region_request::{
AffectedRows, RegionCatchupRequest, RegionOpenRequest, RegionRequest,
};
use store_api::sst_entry::{ManifestSstEntry, PuffinIndexMetaEntry, StorageSstEntry};
use store_api::storage::{FileId, FileRefsManifest, RegionId, ScanRequest, SequenceNumber};
use tokio::sync::{Semaphore, oneshot};
@@ -772,6 +776,122 @@ impl EngineInner {
Ok(responses)
}
async fn catchup_topic_regions(
&self,
provider: Provider,
region_requests: Vec<(RegionId, RegionCatchupRequest)>,
) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
let now = Instant::now();
let region_ids = region_requests
.iter()
.map(|(region_id, _)| *region_id)
.collect::<Vec<_>>();
let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers(
provider.clone(),
self.wal_raw_entry_reader.clone(),
&region_ids,
DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
);
let mut responses = Vec::with_capacity(region_requests.len());
for ((region_id, request), entry_receiver) in
region_requests.into_iter().zip(entry_receivers)
{
let (request, receiver) =
WorkerRequest::new_catchup_region_request(region_id, request, Some(entry_receiver));
self.workers.submit_to_worker(region_id, request).await?;
responses.push(async move { receiver.await.context(RecvSnafu)? });
}
// Wait for entries distribution.
let distribution =
common_runtime::spawn_global(async move { distributor.distribute().await });
// Wait for worker returns.
let responses = join_all(responses).await;
distribution.await.context(JoinSnafu)??;
let num_failure = responses.iter().filter(|r| r.is_err()).count();
info!(
"Caught up {} regions for topic '{}', failures: {}, elapsed: {:?}",
region_ids.len() - num_failure,
// Safety: provider is kafka provider.
provider.as_kafka_provider().unwrap(),
num_failure,
now.elapsed(),
);
Ok(region_ids.into_iter().zip(responses).collect())
}
async fn handle_batch_catchup_requests(
&self,
parallelism: usize,
requests: Vec<(RegionId, RegionCatchupRequest)>,
) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
let mut responses = Vec::with_capacity(requests.len());
let mut topic_regions: HashMap<Arc<KafkaProvider>, Vec<_>> = HashMap::new();
let mut remaining_region_requests = vec![];
for (region_id, request) in requests {
match self.workers.get_region(region_id) {
Some(region) => match region.provider.as_kafka_provider() {
Some(provider) => {
topic_regions
.entry(provider.clone())
.or_default()
.push((region_id, request));
}
None => {
remaining_region_requests.push((region_id, request));
}
},
None => responses.push((region_id, RegionNotFoundSnafu { region_id }.fail())),
}
}
let semaphore = Arc::new(Semaphore::new(parallelism));
if !topic_regions.is_empty() {
let mut tasks = Vec::with_capacity(topic_regions.len());
for (provider, region_requests) in topic_regions {
let semaphore_moved = semaphore.clone();
tasks.push(async move {
// Safety: semaphore must exist
let _permit = semaphore_moved.acquire().await.unwrap();
self.catchup_topic_regions(Provider::Kafka(provider), region_requests)
.await
})
}
let r = try_join_all(tasks).await?;
responses.extend(r.into_iter().flatten());
}
if !remaining_region_requests.is_empty() {
let mut tasks = Vec::with_capacity(remaining_region_requests.len());
let mut region_ids = Vec::with_capacity(remaining_region_requests.len());
for (region_id, request) in remaining_region_requests {
let semaphore_moved = semaphore.clone();
region_ids.push(region_id);
tasks.push(async move {
// Safety: semaphore must exist
let _permit = semaphore_moved.acquire().await.unwrap();
let (request, receiver) =
WorkerRequest::new_catchup_region_request(region_id, request, None);
self.workers.submit_to_worker(region_id, request).await?;
receiver.await.context(RecvSnafu)?
})
}
let results = join_all(tasks).await;
responses.extend(region_ids.into_iter().zip(results));
}
Ok(responses)
}
/// Handles [RegionRequest] and return its executed result.
async fn handle_request(
&self,
@@ -914,6 +1034,29 @@ impl RegionEngine for MitoEngine {
.map_err(BoxedError::new)
}
#[tracing::instrument(skip_all)]
async fn handle_batch_catchup_requests(
&self,
parallelism: usize,
requests: Vec<(RegionId, RegionCatchupRequest)>,
) -> Result<BatchResponses, BoxedError> {
self.inner
.handle_batch_catchup_requests(parallelism, requests)
.await
.map(|responses| {
responses
.into_iter()
.map(|(region_id, response)| {
(
region_id,
response.map(RegionResponse::new).map_err(BoxedError::new),
)
})
.collect::<Vec<_>>()
})
.map_err(BoxedError::new)
}
#[tracing::instrument(skip_all)]
async fn handle_request(
&self,

View File

@@ -0,0 +1,239 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::v1::Rows;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_recordbatch::RecordBatches;
use common_wal::options::{KafkaWalOptions, WAL_OPTIONS_KEY, WalOptions};
use rstest::rstest;
use rstest_reuse::apply;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{PathType, RegionCatchupRequest, RegionOpenRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use crate::config::MitoConfig;
use crate::engine::MitoEngine;
use crate::test_util::{
CreateRequestBuilder, LogStoreFactory, TestEnv, build_rows, flush_region,
kafka_log_store_factory, prepare_test_for_kafka_log_store, put_rows, rows_schema,
single_kafka_log_store_factory,
};
#[apply(single_kafka_log_store_factory)]
async fn test_batch_catchup(factory: Option<LogStoreFactory>) {
test_batch_catchup_with_format(factory.clone(), false).await;
test_batch_catchup_with_format(factory, true).await;
}
async fn test_batch_catchup_with_format(factory: Option<LogStoreFactory>, flat_format: bool) {
common_telemetry::init_default_ut_logging();
let Some(factory) = factory else {
return;
};
let mut env = TestEnv::with_prefix("catchup-batch-regions")
.await
.with_log_store_factory(factory.clone());
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
..Default::default()
})
.await;
let topic = prepare_test_for_kafka_log_store(&factory).await;
// FIXME(weny): change region number to 3.
let num_regions = 2u32;
let table_dir_fn = |region_id| format!("test/{region_id}");
let mut region_schema = HashMap::new();
for id in 1..=num_regions {
let engine = engine.clone();
let topic = topic.clone();
let region_id = RegionId::new(1, id);
let request = CreateRequestBuilder::new()
.table_dir(&table_dir_fn(region_id))
.kafka_topic(topic.clone())
.build();
let column_schemas = rows_schema(&request);
region_schema.insert(region_id, column_schemas);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
}
for i in 0..10 {
for region_number in 1..=num_regions {
let region_id = RegionId::new(1, region_number);
let rows = Rows {
schema: region_schema[&region_id].clone(),
rows: build_rows(
(region_number as usize) * 120 + i as usize,
(region_number as usize) * 120 + i as usize + 1,
),
};
put_rows(&engine, region_id, rows).await;
if i % region_number == 0 {
flush_region(&engine, region_id, None).await;
}
}
}
let assert_result = |engine: MitoEngine| async move {
for i in 1..=num_regions {
let region_id = RegionId::new(1, i);
let request = ScanRequest::default();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let mut expected = String::new();
expected.push_str(
"+-------+---------+---------------------+\n| tag_0 | field_0 | ts |\n+-------+---------+---------------------+\n",
);
for row in 0..10 {
expected.push_str(&format!(
"| {} | {}.0 | 1970-01-01T00:{:02}:{:02} |\n",
i * 120 + row,
i * 120 + row,
2 * i,
row
));
}
expected.push_str("+-------+---------+---------------------+");
assert_eq!(expected, batches.pretty_print().unwrap());
}
};
assert_result(engine.clone()).await;
// Reopen engine.
let engine = env
.reopen_engine(
engine,
MitoConfig {
default_experimental_flat_format: flat_format,
..Default::default()
},
)
.await;
let mut options = HashMap::new();
if let Some(topic) = &topic {
options.insert(
WAL_OPTIONS_KEY.to_string(),
serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
topic: topic.clone(),
}))
.unwrap(),
);
};
let requests = (1..=num_regions)
.map(|id| {
let region_id = RegionId::new(1, id);
(
region_id,
RegionOpenRequest {
engine: String::new(),
table_dir: table_dir_fn(region_id),
options: options.clone(),
skip_wal_replay: true,
path_type: PathType::Bare,
checkpoint: None,
},
)
})
.collect::<Vec<_>>();
let results = engine
.handle_batch_open_requests(4, requests)
.await
.unwrap();
for (_, result) in results {
assert!(result.is_ok());
}
let requests = (1..=num_regions)
.map(|id| {
let region_id = RegionId::new(1, id);
(
region_id,
RegionCatchupRequest {
set_writable: true,
entry_id: None,
metadata_entry_id: None,
location_id: None,
checkpoint: None,
},
)
})
.collect::<Vec<_>>();
let results = engine
.handle_batch_catchup_requests(4, requests)
.await
.unwrap();
for (_, result) in results {
assert!(result.is_ok());
}
assert_result(engine.clone()).await;
}
#[apply(single_kafka_log_store_factory)]
async fn test_batch_catchup_err(factory: Option<LogStoreFactory>) {
test_batch_catchup_err_with_format(factory.clone(), false).await;
test_batch_catchup_err_with_format(factory, true).await;
}
async fn test_batch_catchup_err_with_format(factory: Option<LogStoreFactory>, flat_format: bool) {
common_telemetry::init_default_ut_logging();
let Some(factory) = factory else {
return;
};
let mut env = TestEnv::with_prefix("catchup-regions-err")
.await
.with_log_store_factory(factory.clone());
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
..Default::default()
})
.await;
let num_regions = 3u32;
let requests = (1..num_regions)
.map(|id| {
let region_id = RegionId::new(1, id);
(
region_id,
RegionCatchupRequest {
set_writable: true,
entry_id: None,
metadata_entry_id: None,
location_id: None,
checkpoint: None,
},
)
})
.collect::<Vec<_>>();
let results = engine
.handle_batch_catchup_requests(4, requests)
.await
.unwrap();
for (_, result) in results {
assert_eq!(
result.unwrap_err().status_code(),
StatusCode::RegionNotFound
);
}
}

View File

@@ -602,6 +602,7 @@ pub(crate) enum WorkerRequest {
}
impl WorkerRequest {
/// Creates a new open region request.
pub(crate) fn new_open_region_request(
region_id: RegionId,
request: RegionOpenRequest,
@@ -618,6 +619,21 @@ impl WorkerRequest {
(worker_request, receiver)
}
/// Creates a new catchup region request.
pub(crate) fn new_catchup_region_request(
region_id: RegionId,
request: RegionCatchupRequest,
entry_receiver: Option<WalEntryReceiver>,
) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
let (sender, receiver) = oneshot::channel();
let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
region_id,
sender: sender.into(),
request: DdlRequest::Catchup((request, entry_receiver)),
});
(worker_request, receiver)
}
/// Converts request from a [RegionRequest].
pub(crate) fn try_from_region_request(
region_id: RegionId,
@@ -701,7 +717,7 @@ impl WorkerRequest {
RegionRequest::Catchup(v) => WorkerRequest::Ddl(SenderDdlRequest {
region_id,
sender: sender.into(),
request: DdlRequest::Catchup(v),
request: DdlRequest::Catchup((v, None)),
}),
RegionRequest::BulkInserts(region_bulk_inserts_request) => WorkerRequest::BulkInserts {
metadata: region_metadata,
@@ -757,7 +773,7 @@ pub(crate) enum DdlRequest {
Compact(RegionCompactRequest),
BuildIndex(RegionBuildIndexRequest),
Truncate(RegionTruncateRequest),
Catchup(RegionCatchupRequest),
Catchup((RegionCatchupRequest, Option<WalEntryReceiver>)),
}
/// Sender and Ddl request.

View File

@@ -1027,7 +1027,10 @@ impl<S: LogStore> RegionWorkerLoop<S> {
.await;
continue;
}
DdlRequest::Catchup(req) => self.handle_catchup_request(ddl.region_id, req).await,
DdlRequest::Catchup((req, wal_entry_receiver)) => {
self.handle_catchup_request(ddl.region_id, req, wal_entry_receiver)
.await
}
};
ddl.sender.send(res);

View File

@@ -28,6 +28,7 @@ use tokio::time::Instant;
use crate::error::{self, Result};
use crate::region::MitoRegion;
use crate::region::opener::{RegionOpener, replay_memtable};
use crate::wal::entry_distributor::WalEntryReceiver;
use crate::worker::RegionWorkerLoop;
impl<S: LogStore> RegionWorkerLoop<S> {
@@ -35,6 +36,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
&mut self,
region_id: RegionId,
request: RegionCatchupRequest,
entry_receiver: Option<WalEntryReceiver>,
) -> Result<AffectedRows> {
let Some(region) = self.regions.get_region(region_id) else {
return error::RegionNotFoundSnafu { region_id }.fail();
@@ -76,9 +78,10 @@ impl<S: LogStore> RegionWorkerLoop<S> {
region.provider
);
let timer = Instant::now();
let wal_entry_reader =
let wal_entry_reader = entry_receiver.map(|r| Box::new(r) as _).unwrap_or_else(|| {
self.wal
.wal_entry_reader(&region.provider, region_id, request.location_id);
.wal_entry_reader(&region.provider, region_id, request.location_id)
});
let on_region_opened = self.wal.on_region_opened();
let last_entry_id = replay_memtable(
&region.provider,

View File

@@ -42,7 +42,7 @@ impl Display for KafkaProvider {
}
// The Provider of raft engine log store
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct RaftEngineProvider {
pub id: u64,
}
@@ -59,7 +59,7 @@ impl RaftEngineProvider {
}
/// The Provider of LogStore
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum Provider {
RaftEngine(RaftEngineProvider),
Kafka(Arc<KafkaProvider>),

View File

@@ -34,7 +34,9 @@ use tokio::sync::Semaphore;
use crate::logstore::entry;
use crate::metadata::RegionMetadataRef;
use crate::region_request::{BatchRegionDdlRequest, RegionOpenRequest, RegionRequest};
use crate::region_request::{
BatchRegionDdlRequest, RegionCatchupRequest, RegionOpenRequest, RegionRequest,
};
use crate::storage::{RegionId, ScanRequest, SequenceNumber};
/// The settable region role state.
@@ -715,6 +717,30 @@ pub trait RegionEngine: Send + Sync {
Ok(join_all(tasks).await)
}
async fn handle_batch_catchup_requests(
&self,
parallelism: usize,
requests: Vec<(RegionId, RegionCatchupRequest)>,
) -> Result<BatchResponses, BoxedError> {
let semaphore = Arc::new(Semaphore::new(parallelism));
let mut tasks = Vec::with_capacity(requests.len());
for (region_id, request) in requests {
let semaphore_moved = semaphore.clone();
tasks.push(async move {
// Safety: semaphore must exist
let _permit = semaphore_moved.acquire().await.unwrap();
let result = self
.handle_request(region_id, RegionRequest::Catchup(request))
.await;
(region_id, result)
});
}
Ok(join_all(tasks).await)
}
async fn handle_batch_ddl_requests(
&self,
request: BatchRegionDdlRequest,