chore: remove original region failover implementation (#4237)

chore: remove original region failure implementation
This commit is contained in:
Weny Xu
2024-07-05 17:03:46 +09:00
committed by GitHub
parent 0b624dc337
commit 4f0984c1d7
13 changed files with 0 additions and 2795 deletions

View File

@@ -13,7 +13,6 @@
// limitations under the License.
pub mod etcd;
pub(crate) mod keys;
pub(crate) mod memory;
use std::sync::Arc;

View File

@@ -1,28 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! All keys used for distributed locking in the Metasrv.
//! Place them in this unified module for better maintenance.
use common_meta::RegionIdent;
use crate::lock::Key;
pub(crate) fn table_metadata_lock_key(region: &RegionIdent) -> Key {
format!(
"table_metadata_lock_({}-{})",
region.cluster_id, region.table_id,
)
.into_bytes()
}

View File

@@ -12,9 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// TODO(weny): remove it.
#[allow(unused)]
pub mod region_failover;
pub mod region_migration;
#[cfg(test)]
mod tests;

View File

@@ -1,844 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod activate_region;
mod deactivate_region;
mod failover_end;
mod failover_start;
mod invalidate_cache;
mod update_metadata;
use std::collections::HashSet;
use std::fmt::Debug;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use async_trait::async_trait;
use common_meta::key::datanode_table::DatanodeTableKey;
use common_meta::key::{TableMetadataManagerRef, MAINTENANCE_KEY};
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock};
use common_meta::{ClusterId, RegionIdent};
use common_procedure::error::{
Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
};
use common_procedure::{
watcher, Context as ProcedureContext, LockKey, Procedure, ProcedureManagerRef, ProcedureWithId,
Status,
};
use common_telemetry::{error, info, warn};
use failover_start::RegionFailoverStart;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::TableId;
use table::table_name::TableName;
use crate::error::{
self, KvBackendSnafu, RegisterProcedureLoaderSnafu, Result, TableMetadataManagerSnafu,
};
use crate::lock::DistLockRef;
use crate::metasrv::{SelectorContext, SelectorRef};
use crate::service::mailbox::MailboxRef;
const OPEN_REGION_MESSAGE_TIMEOUT: Duration = Duration::from_secs(30);
/// A key for the preventing running multiple failover procedures for the same region.
#[derive(PartialEq, Eq, Hash, Clone)]
pub(crate) struct RegionFailoverKey {
pub(crate) cluster_id: ClusterId,
pub(crate) table_id: TableId,
pub(crate) region_number: RegionNumber,
}
impl From<RegionIdent> for RegionFailoverKey {
fn from(region_ident: RegionIdent) -> Self {
Self {
cluster_id: region_ident.cluster_id,
table_id: region_ident.table_id,
region_number: region_ident.region_number,
}
}
}
pub(crate) struct RegionFailoverManager {
region_lease_secs: u64,
in_memory: ResettableKvBackendRef,
kv_backend: KvBackendRef,
mailbox: MailboxRef,
procedure_manager: ProcedureManagerRef,
selector: SelectorRef,
selector_ctx: SelectorContext,
dist_lock: DistLockRef,
running_procedures: Arc<RwLock<HashSet<RegionFailoverKey>>>,
table_metadata_manager: TableMetadataManagerRef,
}
struct FailoverProcedureGuard {
running_procedures: Arc<RwLock<HashSet<RegionFailoverKey>>>,
key: RegionFailoverKey,
}
impl Drop for FailoverProcedureGuard {
fn drop(&mut self) {
let _ = self.running_procedures.write().unwrap().remove(&self.key);
}
}
impl RegionFailoverManager {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
region_lease_secs: u64,
in_memory: ResettableKvBackendRef,
kv_backend: KvBackendRef,
mailbox: MailboxRef,
procedure_manager: ProcedureManagerRef,
(selector, selector_ctx): (SelectorRef, SelectorContext),
dist_lock: DistLockRef,
table_metadata_manager: TableMetadataManagerRef,
) -> Self {
Self {
region_lease_secs,
in_memory,
kv_backend,
mailbox,
procedure_manager,
selector,
selector_ctx,
dist_lock,
running_procedures: Arc::new(RwLock::new(HashSet::new())),
table_metadata_manager,
}
}
pub(crate) fn create_context(&self) -> RegionFailoverContext {
RegionFailoverContext {
region_lease_secs: self.region_lease_secs,
in_memory: self.in_memory.clone(),
kv_backend: self.kv_backend.clone(),
mailbox: self.mailbox.clone(),
selector: self.selector.clone(),
selector_ctx: self.selector_ctx.clone(),
dist_lock: self.dist_lock.clone(),
table_metadata_manager: self.table_metadata_manager.clone(),
}
}
pub(crate) fn try_start(&self) -> Result<()> {
let context = self.create_context();
self.procedure_manager
.register_loader(
RegionFailoverProcedure::TYPE_NAME,
Box::new(move |json| {
let context = context.clone();
RegionFailoverProcedure::from_json(json, context).map(|p| Box::new(p) as _)
}),
)
.context(RegisterProcedureLoaderSnafu {
type_name: RegionFailoverProcedure::TYPE_NAME,
})
}
fn insert_running_procedures(
&self,
failed_region: &RegionIdent,
) -> Option<FailoverProcedureGuard> {
let key = RegionFailoverKey::from(failed_region.clone());
let mut procedures = self.running_procedures.write().unwrap();
if procedures.insert(key.clone()) {
Some(FailoverProcedureGuard {
running_procedures: self.running_procedures.clone(),
key,
})
} else {
None
}
}
pub(crate) async fn is_maintenance_mode(&self) -> Result<bool> {
self.kv_backend
.exists(MAINTENANCE_KEY.as_bytes())
.await
.context(KvBackendSnafu)
}
pub(crate) async fn do_region_failover(&self, failed_region: &RegionIdent) -> Result<()> {
let Some(guard) = self.insert_running_procedures(failed_region) else {
warn!("Region failover procedure for region {failed_region} is already running!");
return Ok(());
};
let table_info = self
.table_metadata_manager
.table_info_manager()
.get(failed_region.table_id)
.await
.context(error::TableMetadataManagerSnafu)?;
if table_info.is_none() {
// The table could be dropped before the failure detector knows it. Then the region
// failover is not needed.
// Or the table could be renamed. But we will have a new region ident to detect failure.
// So the region failover here is not needed either.
return Ok(());
}
if !self.failed_region_exists(failed_region).await? {
// The failed region could be failover by another procedure.
return Ok(());
}
let context = self.create_context();
// Safety: Check before.
let table_info = table_info.unwrap();
let TableName {
catalog_name,
schema_name,
..
} = table_info.table_name();
let procedure =
RegionFailoverProcedure::new(catalog_name, schema_name, failed_region.clone(), context);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
let procedure_id = procedure_with_id.id;
info!("Starting region failover procedure {procedure_id} for region {failed_region:?}");
let procedure_manager = self.procedure_manager.clone();
let failed_region = failed_region.clone();
let _handle = common_runtime::spawn_bg(async move {
let _ = guard;
let watcher = &mut match procedure_manager.submit(procedure_with_id).await {
Ok(watcher) => watcher,
Err(e) => {
error!(e; "Failed to submit region failover procedure {procedure_id} for region {failed_region}");
return;
}
};
if let Err(e) = watcher::wait(watcher).await {
error!(e; "Failed to wait region failover procedure {procedure_id} for region {failed_region}");
return;
}
info!("Region failover procedure {procedure_id} for region {failed_region} is finished successfully!");
});
Ok(())
}
async fn failed_region_exists(&self, failed_region: &RegionIdent) -> Result<bool> {
let table_id = failed_region.table_id;
let datanode_id = failed_region.datanode_id;
let value = self
.table_metadata_manager
.datanode_table_manager()
.get(&DatanodeTableKey::new(datanode_id, table_id))
.await
.context(TableMetadataManagerSnafu)?;
Ok(value
.map(|value| {
value
.regions
.iter()
.any(|region| *region == failed_region.region_number)
})
.unwrap_or_default())
}
}
#[derive(Serialize, Deserialize, Debug)]
struct LockMeta {
catalog: String,
schema: String,
}
/// A "Node" in the state machine of region failover procedure.
/// Contains the current state and the data.
#[derive(Serialize, Deserialize, Debug)]
struct Node {
lock_meta: LockMeta,
failed_region: RegionIdent,
state: Box<dyn State>,
}
/// The "Context" of region failover procedure state machine.
#[derive(Clone)]
pub struct RegionFailoverContext {
pub region_lease_secs: u64,
pub in_memory: ResettableKvBackendRef,
pub kv_backend: KvBackendRef,
pub mailbox: MailboxRef,
pub selector: SelectorRef,
pub selector_ctx: SelectorContext,
pub dist_lock: DistLockRef,
pub table_metadata_manager: TableMetadataManagerRef,
}
/// The state machine of region failover procedure. Driven by the call to `next`.
#[async_trait]
#[typetag::serde(tag = "region_failover_state")]
trait State: Sync + Send + Debug {
async fn next(
&mut self,
ctx: &RegionFailoverContext,
failed_region: &RegionIdent,
) -> Result<Box<dyn State>>;
fn status(&self) -> Status {
Status::executing(true)
}
}
/// The states transition of region failover procedure:
///
/// ```text
/// ┌───────────────────┐
/// │RegionFailoverStart│
/// └─────────┬─────────┘
/// │
/// │ Selects a candidate(Datanode)
/// ┌─────────┐ │ to place the failed region
/// │ │ │
/// If replied with │ ┌───▼────▼───────┐
/// "Close region │ │DeactivateRegion│
/// failed" │ └───┬────┬───────┘
/// │ │ │
/// └─────────┘ │ Sends "Close Region" request
/// │ to the failed Datanode, and
/// | wait for the Region lease expiry
/// ┌─────────┐ │ seconds
/// │ │ │
/// │ ┌──▼────▼──────┐
/// Wait candidate │ │ActivateRegion◄───────────────────────┐
/// response timeout │ └──┬────┬──────┘ │
/// │ │ │ │
/// └─────────┘ │ Sends "Open Region" request │
/// │ to the candidate Datanode, │
/// │ and wait for 30 seconds │
/// │ │
/// │ Check Datanode returns │
/// │ │
/// success ├──────────────────────────────┘
/// │ failed
/// ┌─────────▼──────────┐
/// │UpdateRegionMetadata│
/// └─────────┬──────────┘
/// │
/// │ Updates the Region
/// │ placement metadata
/// │
/// ┌───────▼───────┐
/// │InvalidateCache│
/// └───────┬───────┘
/// │
/// │ Broadcast Invalidate Table
/// │ Cache
/// │
/// ┌────────▼────────┐
/// │RegionFailoverEnd│
/// └─────────────────┘
/// ```
pub struct RegionFailoverProcedure {
node: Node,
context: RegionFailoverContext,
}
impl RegionFailoverProcedure {
const TYPE_NAME: &'static str = "metasrv-procedure::RegionFailover";
pub fn new(
catalog: String,
schema: String,
failed_region: RegionIdent,
context: RegionFailoverContext,
) -> Self {
let state = RegionFailoverStart::new();
let node = Node {
lock_meta: LockMeta { catalog, schema },
failed_region,
state: Box::new(state),
};
Self { node, context }
}
fn from_json(json: &str, context: RegionFailoverContext) -> ProcedureResult<Self> {
let node: Node = serde_json::from_str(json).context(FromJsonSnafu)?;
Ok(Self { node, context })
}
}
#[async_trait]
impl Procedure for RegionFailoverProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &mut self.node.state;
*state = state
.next(&self.context, &self.node.failed_region)
.await
.map_err(|e| {
if e.is_retryable() {
ProcedureError::retry_later(e)
} else {
ProcedureError::external(e)
}
})?;
Ok(state.status())
}
fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.node).context(ToJsonSnafu)
}
fn lock_key(&self) -> LockKey {
let region_ident = &self.node.failed_region;
let lock_key = vec![
CatalogLock::Read(&self.node.lock_meta.catalog).into(),
SchemaLock::read(&self.node.lock_meta.catalog, &self.node.lock_meta.catalog).into(),
TableLock::Read(region_ident.table_id).into(),
RegionLock::Write(RegionId::new(
region_ident.table_id,
region_ident.region_number,
))
.into(),
];
LockKey::new(lock_key)
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Mutex;
use api::v1::meta::mailbox_message::Payload;
use api::v1::meta::{HeartbeatResponse, MailboxMessage, RequestHeader};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_meta::ddl::utils::region_storage_path;
use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::peer::Peer;
use common_meta::sequence::SequenceBuilder;
use common_meta::DatanodeId;
use common_procedure::{BoxedProcedure, ProcedureId};
use common_procedure_test::MockContextProvider;
use rand::prelude::SliceRandom;
use tokio::sync::mpsc::Receiver;
use super::*;
use crate::cluster::MetaPeerClientBuilder;
use crate::handler::{HeartbeatMailbox, Pusher, Pushers};
use crate::lock::memory::MemLock;
use crate::selector::{Namespace, Selector, SelectorOptions};
use crate::service::mailbox::Channel;
use crate::test_util;
struct RandomNodeSelector {
nodes: Vec<Peer>,
}
#[async_trait]
impl Selector for RandomNodeSelector {
type Context = SelectorContext;
type Output = Vec<Peer>;
async fn select(
&self,
_ns: Namespace,
_ctx: &Self::Context,
_opts: SelectorOptions,
) -> Result<Self::Output> {
let mut rng = rand::thread_rng();
let mut nodes = self.nodes.clone();
nodes.shuffle(&mut rng);
Ok(nodes)
}
}
pub struct TestingEnv {
pub context: RegionFailoverContext,
pub heartbeat_receivers: HashMap<DatanodeId, Receiver<tonic::Result<HeartbeatResponse>>>,
pub pushers: Pushers,
pub path: String,
}
impl TestingEnv {
pub async fn failed_region(&self, region_number: u32) -> RegionIdent {
let region_distribution = self
.context
.table_metadata_manager
.table_route_manager()
.get_region_distribution(1)
.await
.unwrap()
.unwrap();
let failed_datanode = region_distribution
.iter()
.find_map(|(&datanode_id, regions)| {
if regions.contains(&region_number) {
Some(datanode_id)
} else {
None
}
})
.unwrap();
RegionIdent {
cluster_id: 0,
region_number,
datanode_id: failed_datanode,
table_id: 1,
engine: "mito2".to_string(),
}
}
}
pub struct TestingEnvBuilder {
selector: Option<SelectorRef>,
}
impl TestingEnvBuilder {
pub fn new() -> Self {
Self { selector: None }
}
fn with_selector(mut self, selector: SelectorRef) -> Self {
self.selector = Some(selector);
self
}
pub async fn build(self) -> TestingEnv {
let in_memory = Arc::new(MemoryKvBackend::new());
let kv_backend = Arc::new(MemoryKvBackend::new());
let meta_peer_client = MetaPeerClientBuilder::default()
.election(None)
.in_memory(Arc::new(MemoryKvBackend::new()))
.build()
.map(Arc::new)
// Safety: all required fields set at initialization
.unwrap();
let table_id = 1;
let table = "my_table";
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
test_util::prepare_table_region_and_info_value(&table_metadata_manager, table).await;
let region_distribution = table_metadata_manager
.table_route_manager()
.get_region_distribution(1)
.await
.unwrap()
.unwrap();
let pushers = Pushers::default();
let mut heartbeat_receivers = HashMap::with_capacity(3);
for datanode_id in 1..=3 {
let (tx, rx) = tokio::sync::mpsc::channel(1);
let pusher_id = Channel::Datanode(datanode_id).pusher_id();
let pusher = Pusher::new(tx, &RequestHeader::default());
let _ = pushers.insert(pusher_id, pusher).await;
let _ = heartbeat_receivers.insert(datanode_id, rx);
}
let mailbox_sequence =
SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone())
.initial(0)
.step(100)
.build();
let mailbox = HeartbeatMailbox::create(pushers.clone(), mailbox_sequence);
let selector = self.selector.unwrap_or_else(|| {
let nodes = (1..=region_distribution.len())
.map(|id| Peer {
id: id as u64,
addr: String::default(),
})
.collect();
Arc::new(RandomNodeSelector { nodes })
});
let selector_ctx = SelectorContext {
datanode_lease_secs: 10,
flownode_lease_secs: 10,
server_addr: "127.0.0.1:3002".to_string(),
kv_backend: kv_backend.clone(),
meta_peer_client,
table_id: Some(table_id),
};
TestingEnv {
context: RegionFailoverContext {
region_lease_secs: 10,
in_memory,
kv_backend,
mailbox,
selector,
selector_ctx,
dist_lock: Arc::new(MemLock::default()),
table_metadata_manager,
},
pushers,
heartbeat_receivers,
path: region_storage_path(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME).to_string(),
}
}
}
#[tokio::test]
async fn test_region_failover_procedure() {
let mut env = TestingEnvBuilder::new().build().await;
let failed_region = env.failed_region(1).await;
let mut procedure = Box::new(RegionFailoverProcedure::new(
"greptime".into(),
"public".into(),
failed_region.clone(),
env.context.clone(),
)) as BoxedProcedure;
let mut failed_datanode = env
.heartbeat_receivers
.remove(&failed_region.datanode_id)
.unwrap();
let mailbox_clone = env.context.mailbox.clone();
let failed_region_clone = failed_region.clone();
let _handle = common_runtime::spawn_bg(async move {
let resp = failed_datanode.recv().await.unwrap().unwrap();
let received = &resp.mailbox_message.unwrap();
assert_eq!(
received.payload,
Some(Payload::Json(
serde_json::to_string(&Instruction::CloseRegion(failed_region_clone.clone()))
.unwrap(),
))
);
// simulating response from Datanode
mailbox_clone
.on_recv(
1,
Ok(MailboxMessage {
id: 1,
subject: "Deactivate Region".to_string(),
from: format!("Datanode-{}", failed_region.datanode_id),
to: "Metasrv".to_string(),
timestamp_millis: common_time::util::current_time_millis(),
payload: Some(Payload::Json(
serde_json::to_string(&InstructionReply::CloseRegion(SimpleReply {
result: true,
error: None,
}))
.unwrap(),
)),
}),
)
.await
.unwrap();
});
let (candidate_tx, mut candidate_rx) = tokio::sync::mpsc::channel(1);
for (datanode_id, mut recv) in env.heartbeat_receivers.into_iter() {
let mailbox_clone = env.context.mailbox.clone();
let opening_region = RegionIdent {
datanode_id,
..failed_region.clone()
};
let path = env.path.to_string();
let candidate_tx = candidate_tx.clone();
let _handle = common_runtime::spawn_bg(async move {
let resp = recv.recv().await.unwrap().unwrap();
let received = &resp.mailbox_message.unwrap();
assert_eq!(
received.payload,
Some(Payload::Json(
serde_json::to_string(&Instruction::OpenRegion(OpenRegion::new(
opening_region,
&path,
HashMap::new(),
HashMap::new(),
false
)))
.unwrap(),
))
);
candidate_tx.send(datanode_id).await.unwrap();
// simulating response from Datanode
mailbox_clone
.on_recv(
// Very tricky here:
// the procedure only sends two messages in sequence, the second one is
// "Activate Region", and its message id is 2.
2,
Ok(MailboxMessage {
id: 2,
subject: "Activate Region".to_string(),
from: format!("Datanode-{datanode_id}"),
to: "Metasrv".to_string(),
timestamp_millis: common_time::util::current_time_millis(),
payload: Some(Payload::Json(
serde_json::to_string(&InstructionReply::OpenRegion(SimpleReply {
result: true,
error: None,
}))
.unwrap(),
)),
}),
)
.await
.unwrap();
});
}
common_procedure_test::execute_procedure_until_done(&mut procedure).await;
assert_eq!(
procedure.dump().unwrap(),
r#"{"lock_meta":{"catalog":"greptime","schema":"public"},"failed_region":{"cluster_id":0,"datanode_id":1,"table_id":1,"region_number":1,"engine":"mito2"},"state":{"region_failover_state":"RegionFailoverEnd"}}"#
);
// Verifies that the failed region (region 1) is moved from failed datanode (datanode 1) to the candidate datanode.
let region_distribution = env
.context
.table_metadata_manager
.table_route_manager()
.get_region_distribution(1)
.await
.unwrap()
.unwrap();
assert_eq!(
region_distribution.get(&failed_region.datanode_id).unwrap(),
&vec![2]
);
assert!(region_distribution
.get(&candidate_rx.recv().await.unwrap())
.unwrap()
.contains(&1));
}
#[tokio::test]
async fn test_state_serde() {
let env = TestingEnvBuilder::new().build().await;
let failed_region = env.failed_region(1).await;
let state = RegionFailoverStart::new();
let node = Node {
lock_meta: LockMeta {
catalog: "greptime".into(),
schema: "public".into(),
},
failed_region,
state: Box::new(state),
};
let procedure = RegionFailoverProcedure {
node,
context: env.context,
};
let s = procedure.dump().unwrap();
assert_eq!(
s,
r#"{"lock_meta":{"catalog":"greptime","schema":"public"},"failed_region":{"cluster_id":0,"datanode_id":1,"table_id":1,"region_number":1,"engine":"mito2"},"state":{"region_failover_state":"RegionFailoverStart","failover_candidate":null}}"#,
);
let n: Node = serde_json::from_str(&s).unwrap();
assert_eq!(
format!("{n:?}"),
r#"Node { lock_meta: LockMeta { catalog: "greptime", schema: "public" }, failed_region: RegionIdent { cluster_id: 0, datanode_id: 1, table_id: 1, region_number: 1, engine: "mito2" }, state: RegionFailoverStart { failover_candidate: None } }"#,
);
}
#[tokio::test]
async fn test_state_not_changed_upon_failure() {
struct MySelector {
peers: Arc<Mutex<Vec<Option<Peer>>>>,
}
#[async_trait]
impl Selector for MySelector {
type Context = SelectorContext;
type Output = Vec<Peer>;
async fn select(
&self,
_ns: Namespace,
_ctx: &Self::Context,
_opts: SelectorOptions,
) -> Result<Self::Output> {
let mut peers = self.peers.lock().unwrap();
Ok(if let Some(Some(peer)) = peers.pop() {
vec![peer]
} else {
vec![]
})
}
}
// Returns a valid peer the second time called "select".
let selector = MySelector {
peers: Arc::new(Mutex::new(vec![
Some(Peer {
id: 42,
addr: String::default(),
}),
None,
])),
};
let env = TestingEnvBuilder::new()
.with_selector(Arc::new(selector))
.build()
.await;
let failed_region = env.failed_region(1).await;
let state = RegionFailoverStart::new();
let node = Node {
lock_meta: LockMeta {
catalog: "greptime".into(),
schema: "public".into(),
},
failed_region,
state: Box::new(state),
};
let mut procedure = RegionFailoverProcedure {
node,
context: env.context,
};
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),
};
let result = procedure.execute(&ctx).await;
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.is_retry_later(), "err: {:?}", err);
assert_eq!(
r#"{"region_failover_state":"RegionFailoverStart","failover_candidate":null}"#,
serde_json::to_string(&procedure.node.state).unwrap()
);
let result = procedure.execute(&ctx).await;
assert!(matches!(result, Ok(Status::Executing { persist: true })));
assert_eq!(
r#"{"region_failover_state":"DeactivateRegion","candidate":{"id":42,"addr":""}}"#,
serde_json::to_string(&procedure.node.state).unwrap()
);
}
}

View File

@@ -1,328 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::time::Duration;
use api::v1::meta::MailboxMessage;
use async_trait::async_trait;
use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
use common_meta::key::datanode_table::{DatanodeTableKey, RegionInfo};
use common_meta::peer::Peer;
use common_meta::RegionIdent;
use common_telemetry::{debug, info};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionNumber;
use super::update_metadata::UpdateRegionMetadata;
use super::{RegionFailoverContext, State};
use crate::error::{
self, Error, Result, RetryLaterSnafu, SerializeToJsonSnafu, UnexpectedInstructionReplySnafu,
};
use crate::handler::HeartbeatMailbox;
use crate::procedure::region_failover::OPEN_REGION_MESSAGE_TIMEOUT;
use crate::service::mailbox::{Channel, MailboxReceiver};
#[derive(Serialize, Deserialize, Debug)]
pub(super) struct ActivateRegion {
candidate: Peer,
// If the meta leader node dies during the execution of the procedure,
// the new leader node needs to remark the failed region as "inactive"
// to prevent it from renewing the lease.
remark_inactive_region: bool,
// An `None` option stands for uninitialized.
region_storage_path: Option<String>,
region_options: Option<HashMap<String, String>>,
region_wal_options: Option<HashMap<RegionNumber, String>>,
}
impl ActivateRegion {
pub(super) fn new(candidate: Peer) -> Self {
Self {
candidate,
remark_inactive_region: false,
region_storage_path: None,
region_options: None,
region_wal_options: None,
}
}
async fn send_open_region_message(
&mut self,
ctx: &RegionFailoverContext,
failed_region: &RegionIdent,
timeout: Duration,
) -> Result<MailboxReceiver> {
let table_id = failed_region.table_id;
// Retrieves the wal options from failed datanode table value.
let datanode_table_value = ctx
.table_metadata_manager
.datanode_table_manager()
.get(&DatanodeTableKey::new(failed_region.datanode_id, table_id))
.await
.context(error::TableMetadataManagerSnafu)?
.context(error::DatanodeTableNotFoundSnafu {
table_id,
datanode_id: failed_region.datanode_id,
})?;
let candidate_ident = RegionIdent {
datanode_id: self.candidate.id,
..failed_region.clone()
};
info!("Activating region: {candidate_ident:?}");
let RegionInfo {
region_storage_path,
region_options,
region_wal_options,
..
} = datanode_table_value.region_info;
let instruction = Instruction::OpenRegion(OpenRegion::new(
candidate_ident.clone(),
&region_storage_path,
region_options.clone(),
region_wal_options.clone(),
false,
));
self.region_storage_path = Some(region_storage_path);
self.region_options = Some(region_options);
self.region_wal_options = Some(region_wal_options);
let msg = MailboxMessage::json_message(
"Activate Region",
&format!("Metasrv@{}", ctx.selector_ctx.server_addr),
&format!(
"Datanode-(id={}, addr={})",
self.candidate.id, self.candidate.addr
),
common_time::util::current_time_millis(),
&instruction,
)
.with_context(|_| SerializeToJsonSnafu {
input: instruction.to_string(),
})?;
let ch = Channel::Datanode(self.candidate.id);
ctx.mailbox.send(&ch, msg, timeout).await
}
async fn handle_response(
&mut self,
mailbox_receiver: MailboxReceiver,
failed_region: &RegionIdent,
) -> Result<Box<dyn State>> {
match mailbox_receiver.await? {
Ok(msg) => {
debug!("Received activate region reply: {msg:?}");
let reply = HeartbeatMailbox::json_reply(&msg)?;
let InstructionReply::OpenRegion(SimpleReply { result, error }) = reply else {
return UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
reason: "expect open region reply",
}
.fail();
};
if result {
Ok(Box::new(UpdateRegionMetadata::new(
self.candidate.clone(),
self.region_storage_path
.clone()
.context(error::UnexpectedSnafu {
violated: "expected region_storage_path",
})?,
self.region_options
.clone()
.context(error::UnexpectedSnafu {
violated: "expected region_options",
})?,
self.region_wal_options
.clone()
.context(error::UnexpectedSnafu {
violated: "expected region_wal_options",
})?,
)))
} else {
// The region could be just indeed cannot be opened by the candidate, retry
// would be in vain. Then why not just end the failover procedure? Because we
// currently lack the methods or any maintenance tools to manage the whole
// procedures things, it would be easier to let the procedure keep running.
let reason = format!(
"Region {failed_region:?} is not opened by Datanode {:?}, error: {error:?}",
self.candidate,
);
RetryLaterSnafu { reason }.fail()
}
}
Err(Error::MailboxTimeout { .. }) => {
let reason = format!(
"Mailbox received timeout for activate failed region {failed_region:?} on Datanode {:?}",
self.candidate,
);
RetryLaterSnafu { reason }.fail()
}
Err(e) => Err(e),
}
}
}
#[async_trait]
#[typetag::serde]
impl State for ActivateRegion {
async fn next(
&mut self,
ctx: &RegionFailoverContext,
failed_region: &RegionIdent,
) -> Result<Box<dyn State>> {
let mailbox_receiver = self
.send_open_region_message(ctx, failed_region, OPEN_REGION_MESSAGE_TIMEOUT)
.await?;
self.handle_response(mailbox_receiver, failed_region).await
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use api::v1::meta::mailbox_message::Payload;
use common_meta::instruction::SimpleReply;
use super::super::tests::TestingEnvBuilder;
use super::*;
#[tokio::test]
async fn test_activate_region_success() {
common_telemetry::init_default_ut_logging();
let mut env = TestingEnvBuilder::new().build().await;
let failed_region = env.failed_region(1).await;
let candidate = 2;
let mut state = ActivateRegion::new(Peer::new(candidate, ""));
let mailbox_receiver = state
.send_open_region_message(&env.context, &failed_region, Duration::from_millis(100))
.await
.unwrap();
let message_id = mailbox_receiver.message_id();
// verify that the open region message is sent
let rx = env.heartbeat_receivers.get_mut(&candidate).unwrap();
let resp = rx.recv().await.unwrap().unwrap();
let received = &resp.mailbox_message.unwrap();
assert_eq!(received.id, message_id);
assert_eq!(received.subject, "Activate Region");
assert_eq!(received.from, "Metasrv@127.0.0.1:3002");
assert_eq!(received.to, "Datanode-(id=2, addr=)");
assert_eq!(
received.payload,
Some(Payload::Json(
serde_json::to_string(&Instruction::OpenRegion(OpenRegion::new(
RegionIdent {
datanode_id: candidate,
..failed_region.clone()
},
&env.path,
HashMap::new(),
HashMap::new(),
false
)))
.unwrap(),
))
);
// simulating response from Datanode
env.context
.mailbox
.on_recv(
message_id,
Ok(MailboxMessage {
id: message_id,
subject: "Activate Region".to_string(),
from: "Datanode-2".to_string(),
to: "Metasrv".to_string(),
timestamp_millis: common_time::util::current_time_millis(),
payload: Some(Payload::Json(
serde_json::to_string(&InstructionReply::OpenRegion(SimpleReply {
result: true,
error: None,
}))
.unwrap(),
)),
}),
)
.await
.unwrap();
let next_state = state
.handle_response(mailbox_receiver, &failed_region)
.await
.unwrap();
assert_eq!(
format!("{next_state:?}"),
r#"UpdateRegionMetadata { candidate: Peer { id: 2, addr: "" }, region_storage_path: "greptime/public", region_options: {}, region_wal_options: {} }"#
);
}
#[tokio::test]
async fn test_activate_region_timeout() {
common_telemetry::init_default_ut_logging();
let mut env = TestingEnvBuilder::new().build().await;
let failed_region = env.failed_region(1).await;
let candidate = 2;
let mut state = ActivateRegion::new(Peer::new(candidate, ""));
let mailbox_receiver = state
.send_open_region_message(&env.context, &failed_region, Duration::from_millis(100))
.await
.unwrap();
// verify that the open region message is sent
let rx = env.heartbeat_receivers.get_mut(&candidate).unwrap();
let resp = rx.recv().await.unwrap().unwrap();
let received = &resp.mailbox_message.unwrap();
assert_eq!(received.id, mailbox_receiver.message_id());
assert_eq!(received.subject, "Activate Region");
assert_eq!(received.from, "Metasrv@127.0.0.1:3002");
assert_eq!(received.to, "Datanode-(id=2, addr=)");
assert_eq!(
received.payload,
Some(Payload::Json(
serde_json::to_string(&Instruction::OpenRegion(OpenRegion::new(
RegionIdent {
datanode_id: candidate,
..failed_region.clone()
},
&env.path,
HashMap::new(),
HashMap::new(),
false
)))
.unwrap(),
))
);
let result = state
.handle_response(mailbox_receiver, &failed_region)
.await;
assert!(matches!(result, Err(Error::RetryLater { .. })));
}
}

View File

@@ -1,328 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use api::v1::meta::MailboxMessage;
use async_trait::async_trait;
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_meta::peer::Peer;
use common_meta::rpc::router::RegionStatus;
use common_meta::RegionIdent;
use common_telemetry::{debug, info, warn};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use super::activate_region::ActivateRegion;
use super::{RegionFailoverContext, State};
use crate::error::{
self, Error, Result, RetryLaterSnafu, SerializeToJsonSnafu, UnexpectedInstructionReplySnafu,
};
use crate::handler::HeartbeatMailbox;
use crate::service::mailbox::{Channel, MailboxReceiver};
#[derive(Serialize, Deserialize, Debug)]
pub(super) struct DeactivateRegion {
candidate: Peer,
}
impl DeactivateRegion {
pub(super) fn new(candidate: Peer) -> Self {
Self { candidate }
}
async fn mark_leader_downgraded(
&self,
ctx: &RegionFailoverContext,
failed_region: &RegionIdent,
) -> Result<()> {
let table_id = failed_region.table_id;
let table_route_value = ctx
.table_metadata_manager
.table_route_manager()
.table_route_storage()
.get_raw(table_id)
.await
.context(error::TableMetadataManagerSnafu)?
.context(error::TableRouteNotFoundSnafu { table_id })?;
ctx.table_metadata_manager
.update_leader_region_status(table_id, &table_route_value, |region| {
if region.region.id.region_number() == failed_region.region_number {
Some(Some(RegionStatus::Downgraded))
} else {
None
}
})
.await
.context(error::UpdateTableRouteSnafu)?;
Ok(())
}
async fn send_close_region_message(
&self,
ctx: &RegionFailoverContext,
failed_region: &RegionIdent,
) -> Result<MailboxReceiver> {
let instruction = Instruction::CloseRegion(failed_region.clone());
let msg = MailboxMessage::json_message(
"Deactivate Region",
&format!("Metasrv@{}", ctx.selector_ctx.server_addr),
&format!("Datanode-{}", failed_region.datanode_id),
common_time::util::current_time_millis(),
&instruction,
)
.with_context(|_| SerializeToJsonSnafu {
input: instruction.to_string(),
})?;
let ch = Channel::Datanode(failed_region.datanode_id);
let timeout = Duration::from_secs(ctx.region_lease_secs);
ctx.mailbox.send(&ch, msg, timeout).await
}
async fn handle_response(
&self,
_ctx: &RegionFailoverContext,
mailbox_receiver: MailboxReceiver,
failed_region: &RegionIdent,
) -> Result<Box<dyn State>> {
match mailbox_receiver.await? {
Ok(msg) => {
debug!("Received deactivate region reply: {msg:?}");
let reply = HeartbeatMailbox::json_reply(&msg)?;
let InstructionReply::CloseRegion(SimpleReply { result, error }) = reply else {
return UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
reason: "expect close region reply",
}
.fail();
};
if result {
Ok(Box::new(ActivateRegion::new(self.candidate.clone())))
} else {
// Under rare circumstances would a Datanode fail to close a Region.
// So simply retry.
let reason = format!(
"Region {failed_region:?} is not closed by Datanode {}, error: {error:?}",
failed_region.datanode_id,
);
RetryLaterSnafu { reason }.fail()
}
}
Err(Error::MailboxTimeout { .. }) => {
// We have configured the timeout to match the region lease timeout before making
// the call and have disabled region lease renewal. Therefore, if a timeout error
// occurs, it can be concluded that the region has been closed. With this information,
// we can proceed confidently to the next step.
Ok(Box::new(ActivateRegion::new(self.candidate.clone())))
}
Err(e) => Err(e),
}
}
/// Sleep for `region_lease_expiry_seconds`, to make sure the region is closed (by its
/// region alive keeper). This is critical for region not being opened in multiple Datanodes
/// simultaneously.
async fn wait_for_region_lease_expiry(&self, ctx: &RegionFailoverContext) {
tokio::time::sleep(Duration::from_secs(ctx.region_lease_secs)).await;
}
}
#[async_trait]
#[typetag::serde]
impl State for DeactivateRegion {
async fn next(
&mut self,
ctx: &RegionFailoverContext,
failed_region: &RegionIdent,
) -> Result<Box<dyn State>> {
info!("Deactivating region: {failed_region:?}");
self.mark_leader_downgraded(ctx, failed_region).await?;
let result = self.send_close_region_message(ctx, failed_region).await;
let mailbox_receiver = match result {
Ok(mailbox_receiver) => mailbox_receiver,
Err(Error::PusherNotFound { .. }) => {
warn!(
"Datanode {} is not reachable, skip deactivating region {}, just wait for the region lease to expire",
failed_region.datanode_id, failed_region
);
// See the mailbox received timeout situation comments above.
self.wait_for_region_lease_expiry(ctx).await;
return Ok(Box::new(ActivateRegion::new(self.candidate.clone())));
}
Err(e) => return Err(e),
};
self.handle_response(ctx, mailbox_receiver, failed_region)
.await
}
}
#[cfg(test)]
mod tests {
use api::v1::meta::mailbox_message::Payload;
use common_meta::instruction::SimpleReply;
use super::super::tests::TestingEnvBuilder;
use super::*;
#[tokio::test]
async fn test_mark_leader_downgraded() {
common_telemetry::init_default_ut_logging();
let env = TestingEnvBuilder::new().build().await;
let failed_region = env.failed_region(1).await;
let state = DeactivateRegion::new(Peer::new(2, ""));
state
.mark_leader_downgraded(&env.context, &failed_region)
.await
.unwrap();
let table_id = failed_region.table_id;
let table_route_value = env
.context
.table_metadata_manager
.table_route_manager()
.table_route_storage()
.get(table_id)
.await
.unwrap()
.unwrap();
let should_downgraded = table_route_value
.region_routes()
.unwrap()
.iter()
.find(|route| route.region.id.region_number() == failed_region.region_number)
.unwrap();
assert!(should_downgraded.is_leader_downgraded());
}
#[tokio::test]
async fn test_deactivate_region_success() {
common_telemetry::init_default_ut_logging();
let mut env = TestingEnvBuilder::new().build().await;
let failed_region = env.failed_region(1).await;
let state = DeactivateRegion::new(Peer::new(2, ""));
let mailbox_receiver = state
.send_close_region_message(&env.context, &failed_region)
.await
.unwrap();
let message_id = mailbox_receiver.message_id();
// verify that the close region message is sent
let rx = env
.heartbeat_receivers
.get_mut(&failed_region.datanode_id)
.unwrap();
let resp = rx.recv().await.unwrap().unwrap();
let received = &resp.mailbox_message.unwrap();
assert_eq!(received.id, message_id);
assert_eq!(received.subject, "Deactivate Region");
assert_eq!(received.from, "Metasrv@127.0.0.1:3002");
assert_eq!(received.to, "Datanode-1");
assert_eq!(
received.payload,
Some(Payload::Json(
serde_json::to_string(&Instruction::CloseRegion(failed_region.clone())).unwrap(),
))
);
// simulating response from Datanode
env.context
.mailbox
.on_recv(
message_id,
Ok(MailboxMessage {
id: message_id,
subject: "Deactivate Region".to_string(),
from: "Datanode-1".to_string(),
to: "Metasrv".to_string(),
timestamp_millis: common_time::util::current_time_millis(),
payload: Some(Payload::Json(
serde_json::to_string(&InstructionReply::CloseRegion(SimpleReply {
result: true,
error: None,
}))
.unwrap(),
)),
}),
)
.await
.unwrap();
let next_state = state
.handle_response(&env.context, mailbox_receiver, &failed_region)
.await
.unwrap();
assert_eq!(
format!("{next_state:?}"),
r#"ActivateRegion { candidate: Peer { id: 2, addr: "" }, remark_inactive_region: false, region_storage_path: None, region_options: None, region_wal_options: None }"#
);
}
#[tokio::test]
async fn test_deactivate_region_timeout() {
common_telemetry::init_default_ut_logging();
let mut env = TestingEnvBuilder::new().build().await;
let failed_region = env.failed_region(1).await;
let state = DeactivateRegion::new(Peer::new(2, ""));
let mailbox_receiver = state
.send_close_region_message(&env.context, &failed_region)
.await
.unwrap();
// verify that the open region message is sent
let rx = env
.heartbeat_receivers
.get_mut(&failed_region.datanode_id)
.unwrap();
let resp = rx.recv().await.unwrap().unwrap();
let received = &resp.mailbox_message.unwrap();
assert_eq!(received.id, mailbox_receiver.message_id());
assert_eq!(received.subject, "Deactivate Region");
assert_eq!(received.from, "Metasrv@127.0.0.1:3002");
assert_eq!(received.to, "Datanode-1");
assert_eq!(
received.payload,
Some(Payload::Json(
serde_json::to_string(&Instruction::CloseRegion(failed_region.clone())).unwrap(),
))
);
let next_state = state
.handle_response(&env.context, mailbox_receiver, &failed_region)
.await
.unwrap();
// Timeout or not, proceed to `ActivateRegion`.
assert_eq!(
format!("{next_state:?}"),
r#"ActivateRegion { candidate: Peer { id: 2, addr: "" }, remark_inactive_region: false, region_storage_path: None, region_options: None, region_wal_options: None }"#
);
}
}

View File

@@ -1,36 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use async_trait::async_trait;
use common_meta::RegionIdent;
use common_procedure::Status;
use serde::{Deserialize, Serialize};
use super::{RegionFailoverContext, State};
use crate::error::Result;
#[derive(Serialize, Deserialize, Debug)]
pub(super) struct RegionFailoverEnd;
#[async_trait]
#[typetag::serde]
impl State for RegionFailoverEnd {
async fn next(&mut self, _: &RegionFailoverContext, _: &RegionIdent) -> Result<Box<dyn State>> {
Ok(Box::new(RegionFailoverEnd))
}
fn status(&self) -> Status {
Status::done()
}
}

View File

@@ -1,136 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use async_trait::async_trait;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_meta::peer::Peer;
use common_meta::RegionIdent;
use common_telemetry::info;
use serde::{Deserialize, Serialize};
use snafu::{ensure, location, Location};
use super::deactivate_region::DeactivateRegion;
use super::{RegionFailoverContext, State};
use crate::error::{self, RegionFailoverCandidatesNotFoundSnafu, Result};
use crate::selector::SelectorOptions;
#[derive(Serialize, Deserialize, Debug)]
pub(super) struct RegionFailoverStart {
failover_candidate: Option<Peer>,
}
impl RegionFailoverStart {
pub(super) fn new() -> Self {
Self {
failover_candidate: None,
}
}
async fn choose_candidate(
&mut self,
ctx: &RegionFailoverContext,
failed_region: &RegionIdent,
) -> Result<Peer> {
if let Some(candidate) = self.failover_candidate.clone() {
return Ok(candidate);
}
let mut selector_ctx = ctx.selector_ctx.clone();
selector_ctx.table_id = Some(failed_region.table_id);
let cluster_id = failed_region.cluster_id;
let opts = SelectorOptions::default();
let candidates = ctx
.selector
.select(cluster_id, &selector_ctx, opts)
.await?
.iter()
.filter_map(|p| {
if p.id != failed_region.datanode_id {
Some(p.clone())
} else {
None
}
})
.collect::<Vec<Peer>>();
ensure!(
!candidates.is_empty(),
RegionFailoverCandidatesNotFoundSnafu {
failed_region: format!("{failed_region:?}"),
}
);
// Safety: indexing is guarded by the "ensure!" above.
let candidate = &candidates[0];
self.failover_candidate = Some(candidate.clone());
info!("Choose failover candidate datanode {candidate:?} for region: {failed_region}");
Ok(candidate.clone())
}
}
#[async_trait]
#[typetag::serde]
impl State for RegionFailoverStart {
async fn next(
&mut self,
ctx: &RegionFailoverContext,
failed_region: &RegionIdent,
) -> Result<Box<dyn State>> {
let candidate = self
.choose_candidate(ctx, failed_region)
.await
.map_err(|e| {
if e.status_code() == StatusCode::RuntimeResourcesExhausted {
error::Error::RetryLaterWithSource {
reason: format!("Region failover aborted for {failed_region:?}"),
location: location!(),
source: BoxedError::new(e),
}
} else {
e
}
})?;
return Ok(Box::new(DeactivateRegion::new(candidate)));
}
}
#[cfg(test)]
mod tests {
use super::super::tests::TestingEnvBuilder;
use super::*;
#[tokio::test]
async fn test_choose_failover_candidate() {
common_telemetry::init_default_ut_logging();
let env = TestingEnvBuilder::new().build().await;
let failed_region = env.failed_region(1).await;
let mut state = RegionFailoverStart::new();
assert!(state.failover_candidate.is_none());
let candidate = state
.choose_candidate(&env.context, &failed_region)
.await
.unwrap();
assert_ne!(candidate.id, failed_region.datanode_id);
let candidate_again = state
.choose_candidate(&env.context, &failed_region)
.await
.unwrap();
assert_eq!(candidate, candidate_again);
}
}

View File

@@ -1,144 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::MailboxMessage;
use async_trait::async_trait;
use common_meta::instruction::{CacheIdent, Instruction};
use common_meta::RegionIdent;
use common_telemetry::info;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use table::metadata::TableId;
use super::failover_end::RegionFailoverEnd;
use super::{RegionFailoverContext, State};
use crate::error::{self, Result};
use crate::service::mailbox::BroadcastChannel;
#[derive(Serialize, Deserialize, Debug, Default)]
pub(super) struct InvalidateCache;
impl InvalidateCache {
async fn broadcast_invalidate_table_cache_messages(
&self,
ctx: &RegionFailoverContext,
table_id: TableId,
) -> Result<()> {
let instruction = Instruction::InvalidateCaches(vec![CacheIdent::TableId(table_id)]);
let msg = &MailboxMessage::json_message(
"Invalidate Table Cache",
&format!("Metasrv@{}", ctx.selector_ctx.server_addr),
"Frontend broadcast",
common_time::util::current_time_millis(),
&instruction,
)
.with_context(|_| error::SerializeToJsonSnafu {
input: instruction.to_string(),
})?;
ctx.mailbox
.broadcast(&BroadcastChannel::Frontend, msg)
.await
}
}
#[async_trait]
#[typetag::serde]
impl State for InvalidateCache {
async fn next(
&mut self,
ctx: &RegionFailoverContext,
failed_region: &RegionIdent,
) -> Result<Box<dyn State>> {
let table_id = failed_region.table_id;
info!(
"Broadcast invalidate table({}) cache message to frontend",
table_id
);
self.broadcast_invalidate_table_cache_messages(ctx, table_id)
.await?;
Ok(Box::new(RegionFailoverEnd))
}
}
#[cfg(test)]
mod tests {
use api::v1::meta::mailbox_message::Payload;
use api::v1::meta::RequestHeader;
use super::super::tests::TestingEnvBuilder;
use super::*;
use crate::handler::Pusher;
use crate::procedure::region_failover::tests::TestingEnv;
use crate::service::mailbox::Channel;
#[tokio::test]
async fn test_invalidate_table_cache() {
common_telemetry::init_default_ut_logging();
let env = TestingEnvBuilder::new().build().await;
let failed_region = env.failed_region(1).await;
let TestingEnv {
mut heartbeat_receivers,
context,
pushers,
..
} = env;
for frontend_id in 4..=7 {
let (tx, rx) = tokio::sync::mpsc::channel(1);
let pusher_id = Channel::Frontend(frontend_id).pusher_id();
let pusher = Pusher::new(tx, &RequestHeader::default());
let _ = pushers.insert(pusher_id, pusher).await;
let _ = heartbeat_receivers.insert(frontend_id, rx);
}
let table_id = failed_region.table_id;
// lexicographical order
// frontend-4,5,6,7
let next_state = InvalidateCache
.next(&context, &failed_region)
.await
.unwrap();
assert_eq!(format!("{next_state:?}"), "RegionFailoverEnd");
for i in 4..=7 {
// frontend id starts from 4
let rx = heartbeat_receivers.get_mut(&i).unwrap();
let resp = rx.recv().await.unwrap().unwrap();
let received = &resp.mailbox_message.unwrap();
assert_eq!(received.id, 0);
assert_eq!(received.subject, "Invalidate Table Cache");
assert_eq!(received.from, "Metasrv@127.0.0.1:3002");
assert_eq!(received.to, "Frontend broadcast");
assert_eq!(
received.payload,
Some(Payload::Json(
serde_json::to_string(&Instruction::InvalidateCaches(vec![
CacheIdent::TableId(table_id)
]))
.unwrap(),
))
);
}
}
}

View File

@@ -1,496 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_meta::key::datanode_table::RegionInfo;
use common_meta::key::table_route::TableRouteKey;
use common_meta::peer::Peer;
use common_meta::rpc::router::RegionRoute;
use common_meta::RegionIdent;
use common_telemetry::info;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionNumber;
use super::invalidate_cache::InvalidateCache;
use super::{RegionFailoverContext, State};
use crate::error::{self, Result, TableRouteNotFoundSnafu};
use crate::lock::keys::table_metadata_lock_key;
use crate::lock::Opts;
#[derive(Serialize, Deserialize, Debug, PartialEq)]
pub(super) struct UpdateRegionMetadata {
candidate: Peer,
region_storage_path: String,
region_options: HashMap<String, String>,
#[serde(default)]
region_wal_options: HashMap<RegionNumber, String>,
}
impl UpdateRegionMetadata {
pub(super) fn new(
candidate: Peer,
region_storage_path: String,
region_options: HashMap<String, String>,
region_wal_options: HashMap<RegionNumber, String>,
) -> Self {
Self {
candidate,
region_storage_path,
region_options,
region_wal_options,
}
}
/// Updates the metadata of the table.
async fn update_metadata(
&self,
ctx: &RegionFailoverContext,
failed_region: &RegionIdent,
) -> Result<()> {
let key = table_metadata_lock_key(failed_region);
let key = ctx.dist_lock.lock(key, Opts::default()).await?;
self.update_table_route(ctx, failed_region).await?;
ctx.dist_lock.unlock(key).await?;
Ok(())
}
async fn update_table_route(
&self,
ctx: &RegionFailoverContext,
failed_region: &RegionIdent,
) -> Result<()> {
let table_id = failed_region.table_id;
let engine = &failed_region.engine;
let table_route_value = ctx
.table_metadata_manager
.table_route_manager()
.table_route_storage()
.get_raw(table_id)
.await
.context(error::TableMetadataManagerSnafu)?
.context(TableRouteNotFoundSnafu { table_id })?;
let mut new_region_routes = table_route_value
.region_routes()
.context(error::UnexpectedLogicalRouteTableSnafu {
err_msg: format!("{self:?} is a non-physical TableRouteValue."),
})?
.clone();
for region_route in new_region_routes.iter_mut() {
if region_route.region.id.region_number() == failed_region.region_number {
region_route.leader_peer = Some(self.candidate.clone());
region_route.set_leader_status(None);
break;
}
}
pretty_log_table_route_change(
TableRouteKey::new(table_id).to_string(),
&new_region_routes,
failed_region,
);
ctx.table_metadata_manager
.update_table_route(
table_id,
RegionInfo {
engine: engine.to_string(),
region_storage_path: self.region_storage_path.to_string(),
region_options: self.region_options.clone(),
region_wal_options: self.region_wal_options.clone(),
},
&table_route_value,
new_region_routes,
&self.region_options,
&self.region_wal_options,
)
.await
.context(error::UpdateTableRouteSnafu)?;
Ok(())
}
}
fn pretty_log_table_route_change(
key: String,
region_routes: &[RegionRoute],
failed_region: &RegionIdent,
) {
let region_routes = region_routes
.iter()
.map(|x| {
format!(
"{{region: {}, leader: {}, followers: [{}]}}",
x.region.id,
x.leader_peer
.as_ref()
.map(|p| p.id.to_string())
.unwrap_or_else(|| "?".to_string()),
x.follower_peers
.iter()
.map(|p| p.id.to_string())
.collect::<Vec<_>>()
.join(","),
)
})
.collect::<Vec<_>>();
info!(
"Updating region routes in table route value (key = '{}') to [{}]. \
Failed region {} was on Datanode {}.",
key,
region_routes.join(", "),
failed_region.region_number,
failed_region.datanode_id,
);
}
#[async_trait]
#[typetag::serde]
impl State for UpdateRegionMetadata {
async fn next(
&mut self,
ctx: &RegionFailoverContext,
failed_region: &RegionIdent,
) -> Result<Box<dyn State>> {
self.update_metadata(ctx, failed_region)
.await
.map_err(BoxedError::new)
.context(error::RetryLaterWithSourceSnafu {
reason: format!(
"Failed to update metadata for failed region: {}",
failed_region
),
})?;
Ok(Box::new(InvalidateCache))
}
}
#[cfg(test)]
mod tests {
use common_meta::rpc::router::{extract_all_peers, region_distribution};
use futures::TryStreamExt;
use super::super::tests::{TestingEnv, TestingEnvBuilder};
use super::{State, *};
use crate::test_util::new_region_route;
#[tokio::test]
async fn test_next_state() {
let env = TestingEnvBuilder::new().build().await;
let failed_region = env.failed_region(1).await;
let mut state = UpdateRegionMetadata::new(
Peer::new(2, ""),
env.path.clone(),
HashMap::new(),
HashMap::new(),
);
let next_state = state.next(&env.context, &failed_region).await.unwrap();
assert_eq!(format!("{next_state:?}"), "InvalidateCache");
}
#[tokio::test]
async fn test_update_table_route() {
common_telemetry::init_default_ut_logging();
async fn test(env: TestingEnv, failed_region: u32, candidate: u64) -> Vec<RegionRoute> {
let failed_region = env.failed_region(failed_region).await;
let state = UpdateRegionMetadata::new(
Peer::new(candidate, ""),
env.path.clone(),
HashMap::new(),
HashMap::new(),
);
state
.update_table_route(&env.context, &failed_region)
.await
.unwrap();
let table_id = failed_region.table_id;
env.context
.table_metadata_manager
.table_route_manager()
.table_route_storage()
.get_raw(table_id)
.await
.unwrap()
.unwrap()
.into_inner()
.region_routes()
.unwrap()
.clone()
}
// Original region routes:
// region number => leader node
// 1 => 1
// 2 => 1
// 3 => 2
// 4 => 3
// Testing failed region 1 moves to Datanode 2.
let env = TestingEnvBuilder::new().build().await;
let actual = test(env, 1, 2).await;
// Expected region routes:
// region number => leader node
// 1 => 2
// 2 => 1
// 3 => 2
// 4 => 3
let peers = &extract_all_peers(&actual);
assert_eq!(peers.len(), 3);
let expected = vec![
new_region_route(1, peers, 2),
new_region_route(2, peers, 1),
new_region_route(3, peers, 2),
new_region_route(4, peers, 3),
];
assert_eq!(actual, expected);
// Testing failed region 3 moves to Datanode 3.
let env = TestingEnvBuilder::new().build().await;
let actual = test(env, 3, 3).await;
// Expected region routes:
// region number => leader node
// 1 => 1
// 2 => 1
// 3 => 3
// 4 => 3
let peers = &extract_all_peers(&actual);
assert_eq!(peers.len(), 2);
let expected = vec![
new_region_route(1, peers, 1),
new_region_route(2, peers, 1),
new_region_route(3, peers, 3),
new_region_route(4, peers, 3),
];
assert_eq!(actual, expected);
// Testing failed region 1 moves to a new Datanode, 4.
let env = TestingEnvBuilder::new().build().await;
let actual = test(env, 1, 4).await;
// Expected region routes:
// region number => leader node
// 1 => 4
// 2 => 1
// 3 => 2
// 4 => 3
let peers = &extract_all_peers(&actual);
assert_eq!(peers.len(), 4);
let expected = vec![
new_region_route(1, peers, 4),
new_region_route(2, peers, 1),
new_region_route(3, peers, 2),
new_region_route(4, peers, 3),
];
assert_eq!(actual, expected);
// Testing failed region 3 moves to a new Datanode, 4.
let env = TestingEnvBuilder::new().build().await;
let actual = test(env, 3, 4).await;
// Expected region routes:
// region number => leader node
// 1 => 1
// 2 => 1
// 3 => 4
// 4 => 3
let peers = &extract_all_peers(&actual);
assert_eq!(peers.len(), 3);
let expected = vec![
new_region_route(1, peers, 1),
new_region_route(2, peers, 1),
new_region_route(3, peers, 4),
new_region_route(4, peers, 3),
];
assert_eq!(actual, expected);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_update_metadata_concurrently() {
common_telemetry::init_default_ut_logging();
// Test the correctness of concurrently updating the region distribution in table region
// value, and region routes in table route value. Region 1 moves to Datanode 2; region 2
// moves to Datanode 3.
//
// Datanode => Regions
// Before: | After:
// 1 => 1, 2 |
// 2 => 3 | 2 => 3, 1
// 3 => 4 | 3 => 4, 2
//
// region number => leader node
// Before: | After:
// 1 => 1 | 1 => 2
// 2 => 1 | 2 => 3
// 3 => 2 | 3 => 2
// 4 => 3 | 4 => 3
//
// Test case runs 10 times to enlarge the possibility of concurrent updating.
for _ in 0..10 {
let env = TestingEnvBuilder::new().build().await;
let ctx_1 = env.context.clone();
let ctx_2 = env.context.clone();
let failed_region_1 = env.failed_region(1).await;
let failed_region_2 = env.failed_region(2).await;
let table_id = failed_region_1.table_id;
let path = env.path.clone();
let _ = futures::future::join_all(vec![
tokio::spawn(async move {
let state = UpdateRegionMetadata::new(
Peer::new(2, ""),
path,
HashMap::new(),
HashMap::new(),
);
state
.update_metadata(&ctx_1, &failed_region_1)
.await
.unwrap();
}),
tokio::spawn(async move {
let state = UpdateRegionMetadata::new(
Peer::new(3, ""),
env.path.clone(),
HashMap::new(),
HashMap::new(),
);
state
.update_metadata(&ctx_2, &failed_region_2)
.await
.unwrap();
}),
])
.await;
let table_route_value = env
.context
.table_metadata_manager
.table_route_manager()
.table_route_storage()
.get(table_id)
.await
.unwrap()
.unwrap();
let peers = &extract_all_peers(table_route_value.region_routes().unwrap());
let actual = table_route_value.region_routes().unwrap();
let expected = &vec![
new_region_route(1, peers, 2),
new_region_route(2, peers, 3),
new_region_route(3, peers, 2),
new_region_route(4, peers, 3),
];
assert_eq!(peers.len(), 2);
assert_eq!(actual, expected);
let manager = &env.context.table_metadata_manager;
let table_route_value = manager
.table_route_manager()
.table_route_storage()
.get(table_id)
.await
.unwrap()
.unwrap();
let map = region_distribution(table_route_value.region_routes().unwrap());
assert_eq!(map.len(), 2);
assert_eq!(map.get(&2), Some(&vec![1, 3]));
assert_eq!(map.get(&3), Some(&vec![2, 4]));
// test DatanodeTableValues matches the table region distribution
let datanode_table_manager = manager.datanode_table_manager();
let tables = datanode_table_manager
.tables(1)
.try_collect::<Vec<_>>()
.await
.unwrap();
assert!(tables.is_empty());
let tables = datanode_table_manager
.tables(2)
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(tables.len(), 1);
assert_eq!(tables[0].table_id, 1);
assert_eq!(tables[0].regions, vec![1, 3]);
let tables = datanode_table_manager
.tables(3)
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(tables.len(), 1);
assert_eq!(tables[0].table_id, 1);
assert_eq!(tables[0].regions, vec![2, 4]);
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct LegacyUpdateRegionMetadata {
candidate: Peer,
region_storage_path: String,
region_options: HashMap<String, String>,
}
#[test]
fn test_compatible_serialize_update_region_metadata() {
let candidate = Peer::new(1, "test_addr");
let region_storage_path = "test_path".to_string();
let region_options = HashMap::from([
("a".to_string(), "aa".to_string()),
("b".to_string(), "bb".to_string()),
]);
let legacy_update_region_metadata = LegacyUpdateRegionMetadata {
candidate: candidate.clone(),
region_storage_path: region_storage_path.clone(),
region_options: region_options.clone(),
};
// Serialize a LegacyUpdateRegionMetadata.
let serialized = serde_json::to_string(&legacy_update_region_metadata).unwrap();
// Deserialize to UpdateRegionMetadata.
let deserialized = serde_json::from_str(&serialized).unwrap();
let expected = UpdateRegionMetadata {
candidate,
region_storage_path,
region_options,
region_wal_options: HashMap::new(),
};
assert_eq!(expected, deserialized);
}
}

View File

@@ -12,22 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use chrono::DateTime;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE};
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::ClusterId;
use common_time::util as time_util;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema};
use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType};
use table::requests::TableOptions;
use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
use crate::key::{DatanodeLeaseKey, LeaseValue};
@@ -72,69 +63,6 @@ pub(crate) fn create_selector_context() -> SelectorContext {
}
}
pub(crate) async fn prepare_table_region_and_info_value(
table_metadata_manager: &TableMetadataManagerRef,
table: &str,
) {
let table_info = RawTableInfo {
ident: TableIdent::new(1),
name: table.to_string(),
desc: None,
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
meta: RawTableMeta {
schema: RawSchema::new(vec![ColumnSchema::new(
"a",
ConcreteDataType::string_datatype(),
true,
)]),
primary_key_indices: vec![],
value_indices: vec![],
engine: MITO_ENGINE.to_string(),
next_column_id: 1,
region_numbers: vec![1, 2, 3, 4],
options: TableOptions::default(),
created_on: DateTime::default(),
partition_key_indices: vec![],
},
table_type: TableType::Base,
};
let region_route_factory = |region_id: u64, peer: u64| RegionRoute {
region: Region {
id: region_id.into(),
..Default::default()
},
leader_peer: Some(Peer {
id: peer,
addr: String::new(),
}),
follower_peers: vec![],
leader_status: None,
leader_down_since: None,
};
// Region distribution:
// Datanode => Regions
// 1 => 1, 2
// 2 => 3
// 3 => 4
let region_routes = vec![
region_route_factory(1, 1),
region_route_factory(2, 1),
region_route_factory(3, 2),
region_route_factory(4, 3),
];
table_metadata_manager
.create_table_metadata(
table_info,
TableRouteValue::physical(region_routes),
HashMap::default(),
)
.await
.unwrap();
}
pub(crate) async fn put_datanodes(
cluster_id: ClusterId,
meta_peer_client: &MetaPeerClientRef,

View File

@@ -19,10 +19,7 @@ mod http;
#[macro_use]
mod sql;
#[macro_use]
#[allow(dead_code)]
mod region_migration;
// #[macro_use]
// mod region_failover;
grpc_tests!(File, S3, S3WithCache, Oss, Azblob, Gcs);
http_tests!(File, S3, S3WithCache, Oss, Azblob, Gcs);

View File

@@ -1,376 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use std::time::Duration;
use catalog::kvbackend::{CachedMetaKvBackend, KvBackendCatalogManager};
use client::OutputData;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_meta::key::table_route::TableRouteKey;
use common_meta::key::{MetaKey, RegionDistribution};
use common_meta::peer::Peer;
use common_meta::{distributed_time_constants, RegionIdent};
use common_procedure::{watcher, ProcedureWithId};
use common_query::Output;
use common_telemetry::info;
use common_test_util::recordbatch::check_output_stream;
use frontend::error::Result as FrontendResult;
use frontend::instance::Instance;
use futures::TryStreamExt;
use meta_srv::error::Result as MetaResult;
use meta_srv::metasrv::{SelectorContext, SelectorRef};
use meta_srv::procedure::region_failover::{RegionFailoverContext, RegionFailoverProcedure};
use meta_srv::selector::{Namespace, Selector, SelectorOptions};
use servers::query_handler::sql::SqlQueryHandler;
use session::context::{QueryContext, QueryContextRef};
use table::metadata::TableId;
use tests_integration::cluster::{GreptimeDbCluster, GreptimeDbClusterBuilder};
use tests_integration::test_util::{get_test_store_config, StorageType};
use tokio::time;
#[macro_export]
macro_rules! region_failover_test {
($service:ident, $($(#[$meta:meta])* $test:ident),*,) => {
paste::item! {
mod [<integration_region_failover_ $service:lower _test>] {
$(
#[tokio::test(flavor = "multi_thread")]
$(
#[$meta]
)*
async fn [< $test >]() {
let store_type = tests_integration::test_util::StorageType::$service;
if store_type.test_on() {
let _ = $crate::region_failover::$test(store_type).await;
}
}
)*
}
}
};
}
#[macro_export]
macro_rules! region_failover_tests {
($($service:ident),*) => {
$(
region_failover_test!(
$service,
test_region_failover,
);
)*
};
}
pub async fn test_region_failover(store_type: StorageType) {
if store_type == StorageType::File {
// Region failover doesn't make sense when using local file storage.
return;
}
common_telemetry::init_default_ut_logging();
info!("Running region failover test for {}", store_type);
let mut logical_timer = 1685508715000;
let cluster_name = "test_region_failover";
let (store_config, _guard) = get_test_store_config(&store_type);
let datanodes = 5u64;
let builder = GreptimeDbClusterBuilder::new(cluster_name).await;
let cluster = builder
.with_datanodes(datanodes as u32)
.with_store_config(store_config)
.build()
.await;
let frontend = cluster.frontend.clone();
let table_id = prepare_testing_table(&cluster).await;
let results = insert_values(&frontend, logical_timer).await;
logical_timer += 1000;
for result in results {
assert!(matches!(result.unwrap().data, OutputData::AffectedRows(1)));
}
assert!(has_route_cache(&frontend, table_id).await);
let distribution = find_region_distribution(&cluster, table_id).await;
info!("Find region distribution: {distribution:?}");
let mut foreign = 0;
for dn in 1..=datanodes {
if !&distribution.contains_key(&dn) {
foreign = dn
}
}
let selector = Arc::new(ForeignNodeSelector {
foreign: Peer {
id: foreign,
// "127.0.0.1:3001" is just a placeholder, does not actually connect to it.
addr: "127.0.0.1:3001".to_string(),
},
});
let failed_region = choose_failed_region(distribution);
info!("Simulating failed region: {failed_region:#?}");
run_region_failover_procedure(&cluster, failed_region.clone(), selector).await;
let distribution = find_region_distribution(&cluster, table_id).await;
info!("Find region distribution again: {distribution:?}");
// Waits for invalidating table cache
time::sleep(Duration::from_millis(100)).await;
assert!(!has_route_cache(&frontend, table_id).await);
// Inserts data to each datanode after failover
let frontend = cluster.frontend.clone();
let results = insert_values(&frontend, logical_timer).await;
for result in results {
assert!(matches!(result.unwrap().data, OutputData::AffectedRows(1)));
}
assert_values(&frontend).await;
assert!(!distribution.contains_key(&failed_region.datanode_id));
let mut success = false;
let values = distribution.values();
for val in values {
success = success || val.contains(&failed_region.region_number);
}
assert!(success)
}
async fn has_route_cache(instance: &Arc<Instance>, table_id: TableId) -> bool {
let catalog_manager = instance
.catalog_manager()
.as_any()
.downcast_ref::<KvBackendCatalogManager>()
.unwrap();
let kv_backend = catalog_manager.table_metadata_manager_ref().kv_backend();
let cache = kv_backend
.as_any()
.downcast_ref::<CachedMetaKvBackend>()
.unwrap()
.cache();
cache
.get(TableRouteKey::new(table_id).to_bytes().as_slice())
.await
.is_some()
}
async fn insert_values(instance: &Arc<Instance>, ts: u64) -> Vec<FrontendResult<Output>> {
let query_ctx = QueryContext::arc();
let mut results = Vec::new();
for range in [5, 15, 25, 55] {
let result = insert_value(
instance,
&format!("INSERT INTO my_table VALUES ({},{})", range, ts),
query_ctx.clone(),
)
.await;
results.push(result);
}
results
}
async fn insert_value(
instance: &Arc<Instance>,
sql: &str,
query_ctx: QueryContextRef,
) -> FrontendResult<Output> {
instance.do_query(sql, query_ctx).await.remove(0)
}
async fn assert_values(instance: &Arc<Instance>) {
let query_ctx = QueryContext::arc();
let result = instance
.do_query("select * from my_table order by i, ts", query_ctx)
.await
.remove(0);
let expected = "\
+----+---------------------+
| i | ts |
+----+---------------------+
| 5 | 2023-05-31T04:51:55 |
| 5 | 2023-05-31T04:51:56 |
| 15 | 2023-05-31T04:51:55 |
| 15 | 2023-05-31T04:51:56 |
| 25 | 2023-05-31T04:51:55 |
| 25 | 2023-05-31T04:51:56 |
| 55 | 2023-05-31T04:51:55 |
| 55 | 2023-05-31T04:51:56 |
+----+---------------------+";
check_output_stream(result.unwrap().data, expected).await;
}
async fn prepare_testing_table(cluster: &GreptimeDbCluster) -> TableId {
let sql = r"
CREATE TABLE my_table (
i INT PRIMARY KEY,
ts TIMESTAMP TIME INDEX,
) PARTITION BY RANGE COLUMNS (i) (
PARTITION r0 VALUES LESS THAN (10),
PARTITION r1 VALUES LESS THAN (20),
PARTITION r2 VALUES LESS THAN (50),
PARTITION r3 VALUES LESS THAN (MAXVALUE),
)";
let result = cluster.frontend.do_query(sql, QueryContext::arc()).await;
result.first().unwrap().as_ref().unwrap();
let table = cluster
.frontend
.catalog_manager()
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_table")
.await
.unwrap()
.unwrap();
table.table_info().table_id()
}
async fn find_region_distribution(
cluster: &GreptimeDbCluster,
table_id: TableId,
) -> RegionDistribution {
let manager = cluster.metasrv.table_metadata_manager();
let region_distribution = manager
.table_route_manager()
.get_region_distribution(table_id)
.await
.unwrap()
.unwrap();
// test DatanodeTableValues match the table region distribution
for datanode_id in cluster.datanode_instances.keys() {
let mut actual = manager
.datanode_table_manager()
.tables(*datanode_id)
.try_collect::<Vec<_>>()
.await
.unwrap()
.into_iter()
.filter_map(|x| {
if x.table_id == table_id {
Some(x.regions)
} else {
None
}
})
.flatten()
.collect::<Vec<_>>();
actual.sort();
if let Some(mut expected) = region_distribution.get(datanode_id).cloned() {
expected.sort();
assert_eq!(expected, actual);
} else {
assert!(actual.is_empty());
}
}
region_distribution
}
fn choose_failed_region(distribution: RegionDistribution) -> RegionIdent {
let (failed_datanode, failed_region) = distribution
.iter()
.filter_map(|(datanode_id, regions)| {
if !regions.is_empty() {
Some((*datanode_id, regions[0]))
} else {
None
}
})
.next()
.unwrap();
RegionIdent {
cluster_id: 1000,
datanode_id: failed_datanode,
table_id: 1025,
region_number: failed_region,
engine: "mito2".to_string(),
}
}
// The "foreign" means the Datanode is not containing any regions to the table before.
pub struct ForeignNodeSelector {
pub foreign: Peer,
}
#[async_trait::async_trait]
impl Selector for ForeignNodeSelector {
type Context = SelectorContext;
type Output = Vec<Peer>;
async fn select(
&self,
_ns: Namespace,
_ctx: &Self::Context,
_opts: SelectorOptions,
) -> MetaResult<Self::Output> {
Ok(vec![self.foreign.clone()])
}
}
async fn run_region_failover_procedure(
cluster: &GreptimeDbCluster,
failed_region: RegionIdent,
selector: SelectorRef,
) {
let metasrv = &cluster.metasrv;
let procedure_manager = metasrv.procedure_manager();
let procedure = RegionFailoverProcedure::new(
"greptime".into(),
"public".into(),
failed_region.clone(),
RegionFailoverContext {
region_lease_secs: 10,
in_memory: metasrv.in_memory().clone(),
kv_backend: metasrv.kv_backend().clone(),
mailbox: metasrv.mailbox().clone(),
selector,
selector_ctx: SelectorContext {
datanode_lease_secs: distributed_time_constants::REGION_LEASE_SECS,
flownode_lease_secs: distributed_time_constants::REGION_LEASE_SECS,
server_addr: metasrv.options().server_addr.clone(),
kv_backend: metasrv.kv_backend().clone(),
meta_peer_client: metasrv.meta_peer_client().clone(),
table_id: None,
},
dist_lock: metasrv.lock().clone(),
table_metadata_manager: metasrv.table_metadata_manager().clone(),
},
);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
let procedure_id = procedure_with_id.id;
info!("Starting region failover procedure {procedure_id} for region {failed_region:?}");
let watcher = &mut procedure_manager.submit(procedure_with_id).await.unwrap();
watcher::wait(watcher).await.unwrap();
}