feat: add downgrade leader region step (#2792)

* feat: add downgrade leader region step

* chore: apply suggestions from CR

* chore: rename exist to exists

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2023-11-29 18:17:28 +09:00
committed by GitHub
parent 616eb04914
commit cce5edc88e
9 changed files with 661 additions and 71 deletions

View File

@@ -48,6 +48,27 @@ impl Display for RegionIdent {
}
}
/// The result of downgrade leader region.
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct DowngradeRegionReply {
/// Returns the `last_entry_id` if available.
pub last_entry_id: Option<u64>,
/// Indicates whether the region exists.
pub exists: bool,
/// Return error if any during the operation.
pub error: Option<String>,
}
impl Display for DowngradeRegionReply {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"(last_entry_id={:?}, exists={}, error={:?})",
self.last_entry_id, self.exists, self.error
)
}
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct SimpleReply {
pub result: bool,
@@ -87,10 +108,23 @@ impl OpenRegion {
}
}
/// The instruction of downgrading leader region.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DowngradeRegion {
pub region_id: RegionId,
}
impl Display for DowngradeRegion {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "DowngradeRegion(region_id={})", self.region_id)
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Display)]
pub enum Instruction {
OpenRegion(OpenRegion),
CloseRegion(RegionIdent),
DowngradeRegion(DowngradeRegion),
InvalidateTableIdCache(TableId),
InvalidateTableNameCache(TableName),
}
@@ -101,6 +135,7 @@ pub enum InstructionReply {
OpenRegion(SimpleReply),
CloseRegion(SimpleReply),
InvalidateTableCache(SimpleReply),
DowngradeRegion(DowngradeRegionReply),
}
impl Display for InstructionReply {
@@ -111,6 +146,9 @@ impl Display for InstructionReply {
Self::InvalidateTableCache(reply) => {
write!(f, "InstructionReply::Invalidate({})", reply)
}
Self::DowngradeRegion(reply) => {
write!(f, "InstructionReply::DowngradeRegion({})", reply)
}
}
}
}

View File

@@ -65,6 +65,10 @@ impl RegionHeartbeatResponseHandler {
Instruction::InvalidateTableIdCache(_) | Instruction::InvalidateTableNameCache(_) => {
InvalidHeartbeatResponseSnafu.fail()
}
Instruction::DowngradeRegion(_) => {
// TODO(weny): add it later.
todo!()
}
}
}
@@ -88,6 +92,10 @@ impl RegionHeartbeatResponseHandler {
error: None,
})
}
Instruction::DowngradeRegion(_) => {
// TODO(weny): add it later.
todo!()
}
}
}
@@ -114,6 +122,10 @@ impl RegionHeartbeatResponseHandler {
reply.result = success;
reply.error = error;
}
InstructionReply::DowngradeRegion(_) => {
// TODO(weny): add it later.
todo!()
}
}
template

View File

@@ -19,9 +19,11 @@ pub(crate) mod open_candidate_region;
#[cfg(test)]
pub(crate) mod test_util;
pub(crate) mod update_metadata;
pub(crate) mod upgrade_candidate_region;
use std::any::Any;
use std::fmt::Debug;
use std::time::Duration;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
@@ -34,6 +36,7 @@ use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
use serde::{Deserialize, Serialize};
use snafu::{location, Location, OptionExt, ResultExt};
use store_api::storage::RegionId;
use tokio::time::Instant;
use self::migration_start::RegionMigrationStart;
use crate::error::{self, Error, Result};
@@ -80,6 +83,29 @@ pub struct VolatileContext {
opening_region_guard: Option<OpeningRegionGuard>,
/// `table_route_info` is stored via previous steps for future use.
table_route_info: Option<DeserializedValueWithBytes<TableRouteValue>>,
/// The deadline of leader region lease.
leader_region_lease_deadline: Option<Instant>,
/// The last_entry_id of leader region.
leader_region_last_entry_id: Option<u64>,
}
impl VolatileContext {
/// Sets the `leader_region_lease_deadline` if it does not exist.
pub fn set_leader_region_lease_deadline(&mut self, lease_timeout: Duration) {
if self.leader_region_lease_deadline.is_none() {
self.leader_region_lease_deadline = Some(Instant::now() + lease_timeout);
}
}
/// Resets the `leader_region_lease_deadline`.
pub fn reset_leader_region_lease_deadline(&mut self) {
self.leader_region_lease_deadline = None;
}
/// Sets the `leader_region_last_entry_id`.
pub fn set_last_entry_id(&mut self, last_entry_id: u64) {
self.leader_region_last_entry_id = Some(last_entry_id)
}
}
/// Used to generate new [Context].

View File

@@ -13,23 +13,506 @@
// limitations under the License.
use std::any::Any;
use std::time::Duration;
use api::v1::meta::MailboxMessage;
use common_meta::distributed_time_constants::REGION_LEASE_SECS;
use common_meta::instruction::{
DowngradeRegion, DowngradeRegionReply, Instruction, InstructionReply,
};
use common_telemetry::warn;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use tokio::time::sleep;
use crate::error::Result;
use super::upgrade_candidate_region::UpgradeCandidateRegion;
use crate::error::{self, Result};
use crate::handler::HeartbeatMailbox;
use crate::procedure::region_migration::{Context, State};
use crate::service::mailbox::Channel;
const DOWNGRADE_LEADER_REGION_TIMEOUT: Duration = Duration::from_secs(1);
#[derive(Debug, Serialize, Deserialize)]
pub struct DowngradeLeaderRegion;
pub struct DowngradeLeaderRegion {
// The optimistic retry times.
optimistic_retry: usize,
// The retry initial interval.
retry_initial_interval: Duration,
}
impl Default for DowngradeLeaderRegion {
fn default() -> Self {
Self {
optimistic_retry: 3,
retry_initial_interval: Duration::from_millis(500),
}
}
}
#[async_trait::async_trait]
#[typetag::serde]
impl State for DowngradeLeaderRegion {
async fn next(&mut self, _ctx: &mut Context) -> Result<Box<dyn State>> {
todo!()
async fn next(&mut self, ctx: &mut Context) -> Result<Box<dyn State>> {
// Ensures the `leader_region_lease_deadline` must exist after recovering.
ctx.volatile_ctx
.set_leader_region_lease_deadline(Duration::from_secs(REGION_LEASE_SECS));
self.downgrade_region_with_retry(ctx).await;
// Safety: must exist.
if let Some(deadline) = ctx.volatile_ctx.leader_region_lease_deadline.as_ref() {
tokio::time::sleep_until(*deadline).await;
}
Ok(Box::new(UpgradeCandidateRegion))
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl DowngradeLeaderRegion {
/// Builds downgrade region instruction.
fn build_downgrade_region_instruction(&self, ctx: &Context) -> Instruction {
let pc = &ctx.persistent_ctx;
let region_id = pc.region_id;
Instruction::DowngradeRegion(DowngradeRegion { region_id })
}
/// Tries to downgrade a leader region.
///
/// Retry:
/// - [MailboxTimeout](error::Error::MailboxTimeout), Timeout.
/// - Failed to downgrade region on the Datanode.
///
/// Abort:
/// - [PusherNotFound](error::Error::PusherNotFound), The datanode is unreachable.
/// - [PushMessage](error::Error::PushMessage), The receiver is dropped.
/// - [MailboxReceiver](error::Error::MailboxReceiver), The sender is dropped without sending (impossible).
/// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply).
/// - Invalid JSON.
async fn downgrade_region(
&self,
ctx: &mut Context,
downgrade_instruction: &Instruction,
) -> Result<()> {
let pc = &ctx.persistent_ctx;
let region_id = pc.region_id;
let leader = &pc.from_peer;
let msg = MailboxMessage::json_message(
&format!("Downgrade leader region: {}", region_id),
&format!("Meta@{}", ctx.server_addr()),
&format!("Datanode-{}@{}", leader.id, leader.addr),
common_time::util::current_time_millis(),
downgrade_instruction,
)
.with_context(|_| error::SerializeToJsonSnafu {
input: downgrade_instruction.to_string(),
})?;
let ch = Channel::Datanode(leader.id);
let receiver = ctx
.mailbox
.send(&ch, msg, DOWNGRADE_LEADER_REGION_TIMEOUT)
.await?;
match receiver.await? {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
let InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id,
exists,
error,
}) = reply
else {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
reason: "expect downgrade region reply",
}
.fail();
};
if error.is_some() {
return error::RetryLaterSnafu {
reason: format!(
"Failed to downgrade the region {} on Datanode {:?}, error: {:?}",
region_id, leader, error
),
}
.fail();
}
if !exists {
warn!(
"Trying to downgrade the region {} on Datanode {}, but region doesn't exist!",
region_id, leader
);
}
if let Some(last_entry_id) = last_entry_id {
ctx.volatile_ctx.set_last_entry_id(last_entry_id);
}
Ok(())
}
Err(error::Error::MailboxTimeout { .. }) => {
let reason = format!(
"Mailbox received timeout for downgrade leader region {region_id} on Datanode {:?}",
leader,
);
error::RetryLaterSnafu { reason }.fail()
}
Err(err) => Err(err),
}
}
/// Downgrades a leader region.
///
/// Fast path:
/// - Waits for the reply of downgrade instruction.
///
/// Slow path:
/// - Waits for the lease of the leader region expired.
async fn downgrade_region_with_retry(&self, ctx: &mut Context) {
let instruction = self.build_downgrade_region_instruction(ctx);
let mut retry = 0;
loop {
if let Err(err) = self.downgrade_region(ctx, &instruction).await {
retry += 1;
if err.is_retryable() && retry < self.optimistic_retry {
warn!("Failed to downgrade region, error: {err:?}, retry later");
sleep(self.retry_initial_interval).await;
} else {
break;
}
} else {
// Resets the deadline.
ctx.volatile_ctx.reset_leader_region_lease_deadline();
break;
}
}
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use api::v1::meta::mailbox_message::Payload;
use common_meta::peer::Peer;
use common_time::util::current_time_millis;
use store_api::storage::RegionId;
use tokio::time::Instant;
use super::*;
use crate::error::Error;
use crate::procedure::region_migration::test_util::{
new_close_region_reply, send_mock_reply, TestingEnv,
};
use crate::procedure::region_migration::{ContextFactory, PersistentContext};
fn new_persistent_context() -> PersistentContext {
PersistentContext {
from_peer: Peer::empty(1),
to_peer: Peer::empty(2),
region_id: RegionId::new(1024, 1),
cluster_id: 0,
}
}
fn new_downgrade_region_reply(
id: u64,
last_entry_id: Option<u64>,
exist: bool,
error: Option<String>,
) -> MailboxMessage {
MailboxMessage {
id,
subject: "mock".to_string(),
from: "datanode".to_string(),
to: "meta".to_string(),
timestamp_millis: current_time_millis(),
payload: Some(Payload::Json(
serde_json::to_string(&InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id,
exists: exist,
error,
}))
.unwrap(),
)),
}
}
#[tokio::test]
async fn test_datanode_is_unreachable() {
let state = DowngradeLeaderRegion::default();
let persistent_context = new_persistent_context();
let env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let instruction = &state.build_downgrade_region_instruction(&ctx);
let err = state
.downgrade_region(&mut ctx, instruction)
.await
.unwrap_err();
assert_matches!(err, Error::PusherNotFound { .. });
assert!(!err.is_retryable());
}
#[tokio::test]
async fn test_pusher_dropped() {
let state = DowngradeLeaderRegion::default();
let persistent_context = new_persistent_context();
let from_peer_id = persistent_context.from_peer.id;
let mut env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let mailbox_ctx = env.mailbox_context();
let (tx, rx) = tokio::sync::mpsc::channel(1);
mailbox_ctx
.insert_heartbeat_response_receiver(from_peer_id, tx)
.await;
drop(rx);
let instruction = &state.build_downgrade_region_instruction(&ctx);
let err = state
.downgrade_region(&mut ctx, instruction)
.await
.unwrap_err();
assert_matches!(err, Error::PushMessage { .. });
assert!(!err.is_retryable());
}
#[tokio::test]
async fn test_unexpected_instruction_reply() {
let state = DowngradeLeaderRegion::default();
let persistent_context = new_persistent_context();
let from_peer_id = persistent_context.from_peer.id;
let mut env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone();
let (tx, rx) = tokio::sync::mpsc::channel(1);
mailbox_ctx
.insert_heartbeat_response_receiver(from_peer_id, tx)
.await;
// Sends an incorrect reply.
send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
let instruction = &state.build_downgrade_region_instruction(&ctx);
let err = state
.downgrade_region(&mut ctx, instruction)
.await
.unwrap_err();
assert_matches!(err, Error::UnexpectedInstructionReply { .. });
assert!(!err.is_retryable());
}
#[tokio::test]
async fn test_instruction_exceeded_deadline() {
let state = DowngradeLeaderRegion::default();
let persistent_context = new_persistent_context();
let from_peer_id = persistent_context.from_peer.id;
let mut env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone();
let (tx, rx) = tokio::sync::mpsc::channel(1);
mailbox_ctx
.insert_heartbeat_response_receiver(from_peer_id, tx)
.await;
send_mock_reply(mailbox, rx, |id| {
Err(error::MailboxTimeoutSnafu { id }.build())
});
let instruction = &state.build_downgrade_region_instruction(&ctx);
let err = state
.downgrade_region(&mut ctx, instruction)
.await
.unwrap_err();
assert_matches!(err, Error::RetryLater { .. });
assert!(err.is_retryable());
}
#[tokio::test]
async fn test_downgrade_region_failed() {
let state = DowngradeLeaderRegion::default();
let persistent_context = new_persistent_context();
let from_peer_id = persistent_context.from_peer.id;
let mut env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone();
let (tx, rx) = tokio::sync::mpsc::channel(1);
mailbox_ctx
.insert_heartbeat_response_receiver(from_peer_id, tx)
.await;
send_mock_reply(mailbox, rx, |id| {
Ok(new_downgrade_region_reply(
id,
None,
false,
Some("test mocked".to_string()),
))
});
let instruction = &state.build_downgrade_region_instruction(&ctx);
let err = state
.downgrade_region(&mut ctx, instruction)
.await
.unwrap_err();
assert_matches!(err, Error::RetryLater { .. });
assert!(err.is_retryable());
assert!(err.to_string().contains("test mocked"));
}
#[tokio::test]
async fn test_downgrade_region_with_retry_fast_path() {
let state = DowngradeLeaderRegion::default();
let persistent_context = new_persistent_context();
let from_peer_id = persistent_context.from_peer.id;
let mut env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone();
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
mailbox_ctx
.insert_heartbeat_response_receiver(from_peer_id, tx)
.await;
common_runtime::spawn_bg(async move {
// retry: 0.
let resp = rx.recv().await.unwrap().unwrap();
let reply_id = resp.mailbox_message.unwrap().id;
mailbox
.on_recv(
reply_id,
Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
)
.await
.unwrap();
// retry: 1.
let resp = rx.recv().await.unwrap().unwrap();
let reply_id = resp.mailbox_message.unwrap().id;
mailbox
.on_recv(
reply_id,
Ok(new_downgrade_region_reply(reply_id, Some(1), true, None)),
)
.await
.unwrap();
});
state.downgrade_region_with_retry(&mut ctx).await;
assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, Some(1));
assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none());
}
#[tokio::test]
async fn test_downgrade_region_with_retry_slow_path() {
let state = DowngradeLeaderRegion {
optimistic_retry: 3,
retry_initial_interval: Duration::from_millis(100),
};
let persistent_context = new_persistent_context();
let from_peer_id = persistent_context.from_peer.id;
let mut env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone();
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
mailbox_ctx
.insert_heartbeat_response_receiver(from_peer_id, tx)
.await;
common_runtime::spawn_bg(async move {
for _ in 0..3 {
let resp = rx.recv().await.unwrap().unwrap();
let reply_id = resp.mailbox_message.unwrap().id;
mailbox
.on_recv(
reply_id,
Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
)
.await
.unwrap();
}
});
ctx.volatile_ctx
.set_leader_region_lease_deadline(Duration::from_secs(5));
let expected_deadline = ctx.volatile_ctx.leader_region_lease_deadline.unwrap();
state.downgrade_region_with_retry(&mut ctx).await;
assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, None);
// Should remain no change.
assert_eq!(
ctx.volatile_ctx.leader_region_lease_deadline.unwrap(),
expected_deadline
)
}
#[tokio::test]
async fn test_next_upgrade_candidate_state() {
let mut state = Box::<DowngradeLeaderRegion>::default();
let persistent_context = new_persistent_context();
let from_peer_id = persistent_context.from_peer.id;
let mut env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone();
let (tx, rx) = tokio::sync::mpsc::channel(1);
mailbox_ctx
.insert_heartbeat_response_receiver(from_peer_id, tx)
.await;
send_mock_reply(mailbox, rx, |id| {
Ok(new_downgrade_region_reply(id, Some(1), true, None))
});
let timer = Instant::now();
let next = state.next(&mut ctx).await.unwrap();
let elapsed = timer.elapsed().as_secs();
assert!(elapsed < REGION_LEASE_SECS / 2);
assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, Some(1));
assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none());
let _ = next
.as_any()
.downcast_ref::<UpgradeCandidateRegion>()
.unwrap();
}
}

View File

@@ -47,7 +47,7 @@ impl State for RegionMigrationStart {
if self.check_leader_region_on_peer(&region_route, to_peer)? {
Ok(Box::new(RegionMigrationEnd))
} else if self.check_candidate_region_on_peer(&region_route, to_peer) {
Ok(Box::new(DowngradeLeaderRegion))
Ok(Box::<DowngradeLeaderRegion>::default())
} else {
Ok(Box::new(OpenCandidateRegion))
}

View File

@@ -41,7 +41,7 @@ impl State for OpenCandidateRegion {
let instruction = self.build_open_region_instruction(ctx).await?;
self.open_candidate_region(ctx, instruction).await?;
Ok(Box::new(DowngradeLeaderRegion))
Ok(Box::<DowngradeLeaderRegion>::default())
}
fn as_any(&self) -> &dyn Any {
@@ -197,7 +197,9 @@ mod tests {
use super::*;
use crate::error::Error;
use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion;
use crate::procedure::region_migration::test_util::TestingEnv;
use crate::procedure::region_migration::test_util::{
new_close_region_reply, send_mock_reply, TestingEnv,
};
use crate::procedure::region_migration::{ContextFactory, PersistentContext};
fn new_persistent_context() -> PersistentContext {
@@ -223,23 +225,6 @@ mod tests {
})
}
fn new_close_region_reply(id: u64) -> MailboxMessage {
MailboxMessage {
id,
subject: "mock".to_string(),
from: "datanode".to_string(),
to: "meta".to_string(),
timestamp_millis: current_time_millis(),
payload: Some(Payload::Json(
serde_json::to_string(&InstructionReply::CloseRegion(SimpleReply {
result: false,
error: None,
}))
.unwrap(),
)),
}
}
fn new_open_region_reply(id: u64, result: bool, error: Option<String>) -> MailboxMessage {
MailboxMessage {
id,
@@ -328,21 +313,14 @@ mod tests {
let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone();
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let (tx, rx) = tokio::sync::mpsc::channel(1);
mailbox_ctx
.insert_heartbeat_response_receiver(to_peer_id, tx)
.await;
// Sends an incorrect reply.
common_runtime::spawn_bg(async move {
let resp = rx.recv().await.unwrap().unwrap();
let reply_id = resp.mailbox_message.unwrap().id;
mailbox
.on_recv(reply_id, Ok(new_close_region_reply(reply_id)))
.await
.unwrap();
});
send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
let err = state
@@ -368,23 +346,15 @@ mod tests {
let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone();
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let (tx, rx) = tokio::sync::mpsc::channel(1);
mailbox_ctx
.insert_heartbeat_response_receiver(to_peer_id, tx)
.await;
// Sends an timeout error.
common_runtime::spawn_bg(async move {
let resp = rx.recv().await.unwrap().unwrap();
let reply_id = resp.mailbox_message.unwrap().id;
mailbox
.on_recv(
reply_id,
Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
)
.await
.unwrap();
send_mock_reply(mailbox, rx, |id| {
Err(error::MailboxTimeoutSnafu { id }.build())
});
let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
@@ -411,26 +381,18 @@ mod tests {
let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone();
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let (tx, rx) = tokio::sync::mpsc::channel(1);
mailbox_ctx
.insert_heartbeat_response_receiver(to_peer_id, tx)
.await;
common_runtime::spawn_bg(async move {
let resp = rx.recv().await.unwrap().unwrap();
let reply_id = resp.mailbox_message.unwrap().id;
mailbox
.on_recv(
reply_id,
Ok(new_open_region_reply(
reply_id,
false,
Some("test mocked".to_string()),
)),
)
.await
.unwrap();
send_mock_reply(mailbox, rx, |id| {
Ok(new_open_region_reply(
id,
false,
Some("test mocked".to_string()),
))
});
let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
@@ -471,20 +433,13 @@ mod tests {
let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone();
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let (tx, rx) = tokio::sync::mpsc::channel(1);
mailbox_ctx
.insert_heartbeat_response_receiver(to_peer_id, tx)
.await;
common_runtime::spawn_bg(async move {
let resp = rx.recv().await.unwrap().unwrap();
let reply_id = resp.mailbox_message.unwrap().id;
mailbox
.on_recv(reply_id, Ok(new_open_region_reply(reply_id, true, None)))
.await
.unwrap();
});
send_mock_reply(mailbox, rx, |id| Ok(new_open_region_reply(id, true, None)));
let next = state.next(&mut ctx).await.unwrap();
let vc = ctx.volatile_ctx;

View File

@@ -14,20 +14,26 @@
use std::sync::Arc;
use api::v1::meta::{HeartbeatResponse, RequestHeader};
use api::v1::meta::mailbox_message::Payload;
use api::v1::meta::{HeartbeatResponse, MailboxMessage, RequestHeader};
use common_meta::instruction::{InstructionReply, SimpleReply};
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::sequence::Sequence;
use common_meta::DatanodeId;
use common_procedure::{Context as ProcedureContext, ProcedureId};
use common_procedure_test::MockContextProvider;
use tokio::sync::mpsc::Sender;
use common_time::util::current_time_millis;
use tokio::sync::mpsc::{Receiver, Sender};
use super::ContextFactoryImpl;
use crate::error::Result;
use crate::handler::{HeartbeatMailbox, Pusher, Pushers};
use crate::region::lease_keeper::{OpeningRegionKeeper, OpeningRegionKeeperRef};
use crate::service::mailbox::{Channel, MailboxRef};
pub type MockHeartbeatReceiver = Receiver<std::result::Result<HeartbeatResponse, tonic::Status>>;
/// The context of mailbox.
pub struct MailboxContext {
mailbox: MailboxRef,
@@ -120,3 +126,32 @@ impl TestingEnv {
}
}
}
pub fn new_close_region_reply(id: u64) -> MailboxMessage {
MailboxMessage {
id,
subject: "mock".to_string(),
from: "datanode".to_string(),
to: "meta".to_string(),
timestamp_millis: current_time_millis(),
payload: Some(Payload::Json(
serde_json::to_string(&InstructionReply::CloseRegion(SimpleReply {
result: false,
error: None,
}))
.unwrap(),
)),
}
}
pub fn send_mock_reply(
mailbox: MailboxRef,
mut rx: MockHeartbeatReceiver,
msg: impl FnOnce(u64) -> Result<MailboxMessage> + Send + 'static,
) {
common_runtime::spawn_bg(async move {
let resp = rx.recv().await.unwrap().unwrap();
let reply_id = resp.mailbox_message.unwrap().id;
mailbox.on_recv(reply_id, msg(reply_id)).await.unwrap();
});
}

View File

@@ -13,7 +13,9 @@
// limitations under the License.
use std::any::Any;
use std::time::Duration;
use common_meta::distributed_time_constants::REGION_LEASE_SECS;
use common_meta::rpc::router::RegionStatus;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
@@ -36,7 +38,7 @@ impl State for UpdateMetadata {
UpdateMetadata::Downgrade => {
self.downgrade_leader_region(ctx).await?;
Ok(Box::new(DowngradeLeaderRegion))
Ok(Box::<DowngradeLeaderRegion>::default())
}
}
}
@@ -88,6 +90,9 @@ impl UpdateMetadata {
debug_assert!(ctx.remove_table_route_value());
ctx.volatile_ctx
.set_leader_region_lease_deadline(Duration::from_secs(REGION_LEASE_SECS));
Ok(())
}
}

View File

@@ -0,0 +1,36 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use serde::{Deserialize, Serialize};
use crate::error::Result;
use crate::procedure::region_migration::{Context, State};
#[derive(Debug, Serialize, Deserialize)]
pub struct UpgradeCandidateRegion;
#[async_trait::async_trait]
#[typetag::serde]
impl State for UpgradeCandidateRegion {
async fn next(&mut self, _ctx: &mut Context) -> Result<Box<dyn State>> {
todo!();
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl UpgradeCandidateRegion {}