feat: add open candidate region step (#2757)

* feat: add open candidate region state

* feat: register the opening region

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2023-11-20 12:36:00 +09:00
committed by GitHub
parent ce959ddd3f
commit 4fcda272fb
5 changed files with 575 additions and 6 deletions

View File

@@ -16,9 +16,11 @@ use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_meta::peer::Peer;
use common_meta::DatanodeId;
use common_runtime::JoinError;
use servers::define_into_tonic_status;
use snafu::{Location, Snafu};
use store_api::storage::RegionId;
use table::metadata::TableId;
use tokio::sync::mpsc::error::SendError;
use tonic::codegen::http;
@@ -29,6 +31,17 @@ use crate::pubsub::Message;
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display(
"Another procedure is opening the region: {} on peer: {}",
region_id,
peer_id
))]
RegionOpeningRace {
location: Location,
peer_id: DatanodeId,
region_id: RegionId,
},
#[snafu(display("Failed to create default catalog and schema"))]
InitMetadata {
location: Location,
@@ -625,7 +638,8 @@ impl ErrorExt for Error {
| Error::UnexpectedInstructionReply { .. }
| Error::Unexpected { .. }
| Error::Txn { .. }
| Error::TableIdChanged { .. } => StatusCode::Unexpected,
| Error::TableIdChanged { .. }
| Error::RegionOpeningRace { .. } => StatusCode::Unexpected,
Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::InvalidateTableCache { source, .. } => source.status_code(),
Error::RequestDatanode { source, .. } => source.status_code(),

View File

@@ -36,6 +36,8 @@ use store_api::storage::RegionId;
use self::migration_start::RegionMigrationStart;
use crate::error::{Error, Result};
use crate::procedure::utils::region_lock_key;
use crate::region::lease_keeper::{OpeningRegionGuard, OpeningRegionKeeperRef};
use crate::service::mailbox::MailboxRef;
/// It's shared in each step and available even after recovering.
///
@@ -66,7 +68,15 @@ impl PersistentContext {
///
/// The additional remote fetches are only required in the worst cases.
#[derive(Debug, Clone, Default)]
pub struct VolatileContext {}
pub struct VolatileContext {
/// `opening_region_guard` will be set after the
/// [OpenCandidateRegion](crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion) step.
///
/// `opening_region_guard` should be consumed after
/// the corresponding [RegionRoute](common_meta::rpc::router::RegionRoute) of the opening region
/// was written into [TableRouteValue](common_meta::key::table_route::TableRouteValue) .
opening_region_guard: Option<OpeningRegionGuard>,
}
/// Used to generate new [Context].
pub trait ContextFactory {
@@ -77,6 +87,9 @@ pub trait ContextFactory {
pub struct ContextFactoryImpl {
volatile_ctx: VolatileContext,
table_metadata_manager: TableMetadataManagerRef,
opening_region_keeper: OpeningRegionKeeperRef,
mailbox: MailboxRef,
server_addr: String,
}
impl ContextFactory for ContextFactoryImpl {
@@ -85,6 +98,9 @@ impl ContextFactory for ContextFactoryImpl {
persistent_ctx,
volatile_ctx: self.volatile_ctx,
table_metadata_manager: self.table_metadata_manager,
opening_region_keeper: self.opening_region_keeper,
mailbox: self.mailbox,
server_addr: self.server_addr,
}
}
}
@@ -96,12 +112,15 @@ pub struct Context {
persistent_ctx: PersistentContext,
volatile_ctx: VolatileContext,
table_metadata_manager: TableMetadataManagerRef,
opening_region_keeper: OpeningRegionKeeperRef,
mailbox: MailboxRef,
server_addr: String,
}
impl Context {
/// Returns address of meta server.
pub fn server_addr(&self) -> &str {
todo!()
&self.server_addr
}
}

View File

@@ -13,11 +13,23 @@
// limitations under the License.
use std::any::Any;
use std::collections::HashMap;
use std::time::Duration;
use api::v1::meta::MailboxMessage;
use common_meta::ddl::utils::region_storage_path;
use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
use common_meta::RegionIdent;
use serde::{Deserialize, Serialize};
use snafu::{location, Location, OptionExt, ResultExt};
use crate::error::Result;
use crate::error::{self, Result};
use crate::handler::HeartbeatMailbox;
use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion;
use crate::procedure::region_migration::{Context, State};
use crate::service::mailbox::Channel;
const OPEN_CANDIDATE_REGION_TIMEOUT: Duration = Duration::from_secs(1);
#[derive(Debug, Serialize, Deserialize)]
pub struct OpenCandidateRegion;
@@ -25,11 +37,465 @@ pub struct OpenCandidateRegion;
#[async_trait::async_trait]
#[typetag::serde]
impl State for OpenCandidateRegion {
async fn next(&mut self, _ctx: &mut Context) -> Result<Box<dyn State>> {
todo!()
async fn next(&mut self, ctx: &mut Context) -> Result<Box<dyn State>> {
let instruction = self.build_open_region_instruction(ctx).await?;
self.open_candidate_region(ctx, instruction).await?;
Ok(Box::new(DowngradeLeaderRegion))
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl OpenCandidateRegion {
/// Builds open region instructions
///
/// Abort(non-retry):
/// - Table Info is not found.
async fn build_open_region_instruction(&self, ctx: &Context) -> Result<Instruction> {
let pc = &ctx.persistent_ctx;
let cluster_id = pc.cluster_id;
let table_id = pc.region_id.table_id();
let region_number = pc.region_id.region_number();
let candidate = &pc.to_peer;
let table_info = ctx
.table_metadata_manager
.table_info_manager()
.get(table_id)
.await
.context(error::TableMetadataManagerSnafu)
.map_err(|e| error::Error::RetryLater {
reason: e.to_string(),
location: location!(),
})?
.context(error::TableInfoNotFoundSnafu { table_id })?
.into_inner()
.table_info;
// The region storage path is immutable after the region is created.
// Therefore, it's safe to store it in `VolatileContext` for future use.
let region_storage_path =
region_storage_path(&table_info.catalog_name, &table_info.schema_name);
let engine = table_info.meta.engine;
let region_options: HashMap<String, String> = (&table_info.meta.options).into();
let open_instruction = Instruction::OpenRegion(OpenRegion::new(
RegionIdent {
cluster_id,
datanode_id: candidate.id,
table_id,
region_number,
engine,
},
&region_storage_path,
region_options,
));
Ok(open_instruction)
}
/// Opens the candidate region.
///
/// Abort(non-retry):
/// - The Datanode is unreachable(e.g., Candidate pusher is not found).
/// - Unexpected instruction reply.
/// - Another procedure is opening the candidate region.
///
/// Retry:
/// - Exceeded deadline of open instruction.
/// - Datanode failed to open the candidate region.
async fn open_candidate_region(
&self,
ctx: &mut Context,
open_instruction: Instruction,
) -> Result<()> {
let pc = &ctx.persistent_ctx;
let vc = &mut ctx.volatile_ctx;
let region_id = pc.region_id;
let candidate = &pc.to_peer;
// Registers the opening region.
let guard = ctx
.opening_region_keeper
.register(candidate.id, region_id)
.context(error::RegionOpeningRaceSnafu {
peer_id: candidate.id,
region_id,
})?;
debug_assert!(vc.opening_region_guard.is_none());
vc.opening_region_guard = Some(guard);
let msg = MailboxMessage::json_message(
&format!("Open candidate region: {}", region_id),
&format!("Meta@{}", ctx.server_addr()),
&format!("Datanode-{}@{}", candidate.id, candidate.addr),
common_time::util::current_time_millis(),
&open_instruction,
)
.with_context(|_| error::SerializeToJsonSnafu {
input: open_instruction.to_string(),
})?;
let ch = Channel::Datanode(candidate.id);
let receiver = ctx
.mailbox
.send(&ch, msg, OPEN_CANDIDATE_REGION_TIMEOUT)
.await?;
match receiver.await? {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
let InstructionReply::OpenRegion(SimpleReply { result, error }) = reply else {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
reason: "expect open region reply",
}
.fail();
};
if result {
Ok(())
} else {
error::RetryLaterSnafu {
reason: format!(
"Region {region_id} is not opened by Datanode {:?}, error: {error:?}",
candidate,
),
}
.fail()
}
}
Err(error::Error::MailboxTimeout { .. }) => {
let reason = format!(
"Mailbox received timeout for open candidate region {region_id} on Datanode {:?}",
candidate,
);
error::RetryLaterSnafu { reason }.fail()
}
Err(e) => Err(e),
}
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use api::v1::meta::mailbox_message::Payload;
use common_catalog::consts::MITO2_ENGINE;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::DatanodeId;
use common_time::util::current_time_millis;
use store_api::storage::RegionId;
use super::*;
use crate::error::Error;
use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion;
use crate::procedure::region_migration::test_util::TestingEnv;
use crate::procedure::region_migration::{ContextFactory, PersistentContext};
fn new_persistent_context() -> PersistentContext {
PersistentContext {
from_peer: Peer::empty(1),
to_peer: Peer::empty(2),
region_id: RegionId::new(1024, 1),
cluster_id: 0,
}
}
fn new_mock_open_instruction(datanode_id: DatanodeId, region_id: RegionId) -> Instruction {
Instruction::OpenRegion(OpenRegion {
region_ident: RegionIdent {
cluster_id: 0,
datanode_id,
table_id: region_id.table_id(),
region_number: region_id.region_number(),
engine: MITO2_ENGINE.to_string(),
},
region_storage_path: "/bar/foo/region/".to_string(),
options: Default::default(),
})
}
fn new_close_region_reply(id: u64) -> MailboxMessage {
MailboxMessage {
id,
subject: "mock".to_string(),
from: "datanode".to_string(),
to: "meta".to_string(),
timestamp_millis: current_time_millis(),
payload: Some(Payload::Json(
serde_json::to_string(&InstructionReply::CloseRegion(SimpleReply {
result: false,
error: None,
}))
.unwrap(),
)),
}
}
fn new_open_region_reply(id: u64, result: bool, error: Option<String>) -> MailboxMessage {
MailboxMessage {
id,
subject: "mock".to_string(),
from: "datanode".to_string(),
to: "meta".to_string(),
timestamp_millis: current_time_millis(),
payload: Some(Payload::Json(
serde_json::to_string(&InstructionReply::OpenRegion(SimpleReply { result, error }))
.unwrap(),
)),
}
}
#[tokio::test]
async fn test_table_info_is_not_found_error() {
let state = OpenCandidateRegion;
let persistent_context = new_persistent_context();
let env = TestingEnv::new();
let ctx = env.context_factory().new_context(persistent_context);
let err = state.build_open_region_instruction(&ctx).await.unwrap_err();
assert_matches!(err, Error::TableInfoNotFound { .. });
assert!(!err.is_retryable());
}
#[tokio::test]
async fn test_datanode_is_unreachable() {
let state = OpenCandidateRegion;
// from_peer: 1
// to_peer: 2
let persistent_context = new_persistent_context();
let region_id = persistent_context.region_id;
let to_peer_id = persistent_context.to_peer.id;
let env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
let err = state
.open_candidate_region(&mut ctx, open_instruction)
.await
.unwrap_err();
assert_matches!(err, Error::PusherNotFound { .. });
assert!(!err.is_retryable());
}
#[tokio::test]
async fn test_candidate_region_opening_error() {
let state = OpenCandidateRegion;
// from_peer: 1
// to_peer: 2
let persistent_context = new_persistent_context();
let region_id = persistent_context.region_id;
let to_peer_id = persistent_context.to_peer.id;
let env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let opening_region_keeper = env.opening_region_keeper();
let _guard = opening_region_keeper
.register(to_peer_id, region_id)
.unwrap();
let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
let err = state
.open_candidate_region(&mut ctx, open_instruction)
.await
.unwrap_err();
assert_matches!(err, Error::RegionOpeningRace { .. });
assert!(!err.is_retryable());
}
#[tokio::test]
async fn test_unexpected_instruction_reply() {
let state = OpenCandidateRegion;
// from_peer: 1
// to_peer: 2
let persistent_context = new_persistent_context();
let region_id = persistent_context.region_id;
let to_peer_id = persistent_context.to_peer.id;
let mut env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone();
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
mailbox_ctx
.insert_heartbeat_response_receiver(to_peer_id, tx)
.await;
// Sends an incorrect reply.
common_runtime::spawn_bg(async move {
let resp = rx.recv().await.unwrap().unwrap();
let reply_id = resp.mailbox_message.unwrap().id;
mailbox
.on_recv(reply_id, Ok(new_close_region_reply(reply_id)))
.await
.unwrap();
});
let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
let err = state
.open_candidate_region(&mut ctx, open_instruction)
.await
.unwrap_err();
assert_matches!(err, Error::UnexpectedInstructionReply { .. });
assert!(!err.is_retryable());
}
#[tokio::test]
async fn test_instruction_exceeded_deadline() {
let state = OpenCandidateRegion;
// from_peer: 1
// to_peer: 2
let persistent_context = new_persistent_context();
let region_id = persistent_context.region_id;
let to_peer_id = persistent_context.to_peer.id;
let mut env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone();
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
mailbox_ctx
.insert_heartbeat_response_receiver(to_peer_id, tx)
.await;
// Sends an timeout error.
common_runtime::spawn_bg(async move {
let resp = rx.recv().await.unwrap().unwrap();
let reply_id = resp.mailbox_message.unwrap().id;
mailbox
.on_recv(
reply_id,
Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
)
.await
.unwrap();
});
let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
let err = state
.open_candidate_region(&mut ctx, open_instruction)
.await
.unwrap_err();
assert_matches!(err, Error::RetryLater { .. });
assert!(err.is_retryable());
}
#[tokio::test]
async fn test_open_candidate_region_failed() {
let state = OpenCandidateRegion;
// from_peer: 1
// to_peer: 2
let persistent_context = new_persistent_context();
let region_id = persistent_context.region_id;
let to_peer_id = persistent_context.to_peer.id;
let mut env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone();
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
mailbox_ctx
.insert_heartbeat_response_receiver(to_peer_id, tx)
.await;
common_runtime::spawn_bg(async move {
let resp = rx.recv().await.unwrap().unwrap();
let reply_id = resp.mailbox_message.unwrap().id;
mailbox
.on_recv(
reply_id,
Ok(new_open_region_reply(
reply_id,
false,
Some("test mocked".to_string()),
)),
)
.await
.unwrap();
});
let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
let err = state
.open_candidate_region(&mut ctx, open_instruction)
.await
.unwrap_err();
assert_matches!(err, Error::RetryLater { .. });
assert!(err.is_retryable());
assert!(err.to_string().contains("test mocked"));
}
#[tokio::test]
async fn test_next_downgrade_leader_region_state() {
let mut state = Box::new(OpenCandidateRegion);
// from_peer: 1
// to_peer: 2
let persistent_context = new_persistent_context();
let region_id = persistent_context.region_id;
let to_peer_id = persistent_context.to_peer.id;
let mut env = TestingEnv::new();
// Prepares table
let table_info = new_test_table_info(1024, vec![1]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(persistent_context.region_id),
leader_peer: Some(Peer::empty(3)),
..Default::default()
}];
env.table_metadata_manager()
.create_table_metadata(table_info, region_routes)
.await
.unwrap();
let mut ctx = env.context_factory().new_context(persistent_context);
let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone();
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
mailbox_ctx
.insert_heartbeat_response_receiver(to_peer_id, tx)
.await;
common_runtime::spawn_bg(async move {
let resp = rx.recv().await.unwrap().unwrap();
let reply_id = resp.mailbox_message.unwrap().id;
mailbox
.on_recv(reply_id, Ok(new_open_region_reply(reply_id, true, None)))
.await
.unwrap();
});
let next = state.next(&mut ctx).await.unwrap();
let vc = ctx.volatile_ctx;
assert_eq!(
vc.opening_region_guard.unwrap().info(),
(to_peer_id, region_id)
);
let _ = next
.as_any()
.downcast_ref::<DowngradeLeaderRegion>()
.unwrap();
}
}

View File

@@ -14,16 +14,57 @@
use std::sync::Arc;
use api::v1::meta::{HeartbeatResponse, RequestHeader};
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::sequence::Sequence;
use common_meta::DatanodeId;
use common_procedure::{Context as ProcedureContext, ProcedureId};
use common_procedure_test::MockContextProvider;
use tokio::sync::mpsc::Sender;
use super::ContextFactoryImpl;
use crate::handler::{HeartbeatMailbox, Pusher, Pushers};
use crate::region::lease_keeper::{OpeningRegionKeeper, OpeningRegionKeeperRef};
use crate::service::mailbox::{Channel, MailboxRef};
/// The context of mailbox.
pub struct MailboxContext {
mailbox: MailboxRef,
// The pusher is used in the mailbox.
pushers: Pushers,
}
impl MailboxContext {
pub fn new(sequence: Sequence) -> Self {
let pushers = Pushers::default();
let mailbox = HeartbeatMailbox::create(pushers.clone(), sequence);
Self { mailbox, pushers }
}
/// Inserts a pusher for `datanode_id`
pub async fn insert_heartbeat_response_receiver(
&mut self,
datanode_id: DatanodeId,
tx: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>,
) {
let pusher_id = Channel::Datanode(datanode_id).pusher_id();
let pusher = Pusher::new(tx, &RequestHeader::default());
let _ = self.pushers.insert(pusher_id, pusher).await;
}
pub fn mailbox(&self) -> &MailboxRef {
&self.mailbox
}
}
/// `TestingEnv` provides components during the tests.
pub struct TestingEnv {
table_metadata_manager: TableMetadataManagerRef,
mailbox_ctx: MailboxContext,
opening_region_keeper: OpeningRegionKeeperRef,
server_addr: String,
}
impl TestingEnv {
@@ -32,8 +73,16 @@ impl TestingEnv {
let kv_backend = Arc::new(MemoryKvBackend::new());
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
let mailbox_sequence = Sequence::new("test_heartbeat_mailbox", 0, 1, kv_backend.clone());
let mailbox_ctx = MailboxContext::new(mailbox_sequence);
let opening_region_keeper = Arc::new(OpeningRegionKeeper::default());
Self {
table_metadata_manager,
opening_region_keeper,
mailbox_ctx,
server_addr: "localhost".to_string(),
}
}
@@ -41,14 +90,28 @@ impl TestingEnv {
pub fn context_factory(&self) -> ContextFactoryImpl {
ContextFactoryImpl {
table_metadata_manager: self.table_metadata_manager.clone(),
opening_region_keeper: self.opening_region_keeper.clone(),
volatile_ctx: Default::default(),
mailbox: self.mailbox_ctx.mailbox().clone(),
server_addr: self.server_addr.to_string(),
}
}
/// Returns the mutable [MailboxContext].
pub fn mailbox_context(&mut self) -> &mut MailboxContext {
&mut self.mailbox_ctx
}
/// Returns the [TableMetadataManagerRef]
pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
&self.table_metadata_manager
}
/// Returns the [OpeningRegionKeeperRef]
pub fn opening_region_keeper(&self) -> &OpeningRegionKeeperRef {
&self.opening_region_keeper
}
/// Returns a [ProcedureContext] with a random [ProcedureId] and a [MockContextProvider].
pub fn procedure_context() -> ProcedureContext {
ProcedureContext {

View File

@@ -171,6 +171,13 @@ impl Drop for OpeningRegionGuard {
}
}
impl OpeningRegionGuard {
/// Returns opening region info.
pub fn info(&self) -> (DatanodeId, RegionId) {
(self.datanode_id, self.region_id)
}
}
pub type OpeningRegionKeeperRef = Arc<OpeningRegionKeeper>;
#[derive(Debug, Clone, Default)]