feat: update table metadata in lock (#1634)

* feat: using distributed lock to guard against the concurrent updating of table metadatas in region failover procedure

* fix: resolve PR comments

* fix: resolve PR comments
This commit is contained in:
LFC
2023-05-30 08:59:14 +08:00
committed by GitHub
parent 9e21632f23
commit 51b23664f7
13 changed files with 427 additions and 172 deletions

View File

@@ -13,6 +13,8 @@
// limitations under the License.
pub mod etcd;
pub(crate) mod keys;
pub(crate) mod memory;
use std::sync::Arc;
@@ -28,6 +30,14 @@ pub struct Opts {
pub expire_secs: Option<u64>,
}
impl Default for Opts {
fn default() -> Self {
Opts {
expire_secs: Some(DEFAULT_EXPIRE_TIME_SECS),
}
}
}
#[async_trait::async_trait]
pub trait DistLock: Send + Sync {
// Lock acquires a distributed shared lock on a given named lock. On success, it

View File

@@ -0,0 +1,28 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! All keys used for distributed locking in the Metasrv.
//! Place them in this unified module for better maintenance.
use common_meta::RegionIdent;
use crate::lock::Key;
pub(crate) fn table_metadata_lock_key(region: &RegionIdent) -> Key {
format!(
"table_metadata_lock_({}-{}.{}.{}-{})",
region.cluster_id, region.catalog, region.schema, region.table, region.table_id,
)
.into_bytes()
}

View File

@@ -0,0 +1,112 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use async_trait::async_trait;
use dashmap::DashMap;
use tokio::sync::{Mutex, OwnedMutexGuard};
use crate::error::Result;
use crate::lock::{DistLock, Key, Opts};
#[derive(Default)]
pub(crate) struct MemLock {
mutexes: DashMap<Key, Arc<Mutex<()>>>,
guards: DashMap<Key, OwnedMutexGuard<()>>,
}
#[async_trait]
impl DistLock for MemLock {
async fn lock(&self, key: Vec<u8>, _opts: Opts) -> Result<Key> {
let mutex = self
.mutexes
.entry(key.clone())
.or_insert_with(|| Arc::new(Mutex::new(())))
.clone();
let guard = mutex.lock_owned().await;
self.guards.insert(key.clone(), guard);
Ok(key)
}
async fn unlock(&self, key: Vec<u8>) -> Result<()> {
// drop the guard, so that the mutex can be unlocked,
// effectively make the `mutex.lock_owned` in `lock` method to proceed
self.guards.remove(&key);
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::atomic::{AtomicU32, Ordering};
use rand::seq::SliceRandom;
use super::*;
#[tokio::test(flavor = "multi_thread")]
async fn test_mem_lock_concurrently() {
let lock = Arc::new(MemLock::default());
let keys = (0..10)
.map(|i| format!("my-lock-{i}").into_bytes())
.collect::<Vec<Key>>();
let counters: [(Key, AtomicU32); 10] = keys
.iter()
.map(|x| (x.clone(), AtomicU32::new(0)))
.collect::<Vec<_>>()
.try_into()
.unwrap();
let counters = Arc::new(HashMap::from(counters));
let tasks = (0..100)
.map(|_| {
let mut keys = keys.clone();
keys.shuffle(&mut rand::thread_rng());
let lock_clone = lock.clone();
let counters_clone = counters.clone();
tokio::spawn(async move {
// every key counter will be added by 1 for 10 times
for i in 0..100 {
let key = &keys[i % keys.len()];
lock_clone
.lock(key.clone(), Opts { expire_secs: None })
.await
.unwrap();
// Intentionally create a critical section:
// if our MemLock is flawed, the resulting counter is wrong.
//
// Note that AtomicU32 is only used to enable the updates from multiple tasks,
// does not make any guarantee about the correctness of the result.
let counter = counters_clone.get(key).unwrap();
let v = counter.load(Ordering::Relaxed);
counter.store(v + 1, Ordering::Relaxed);
lock_clone.unlock(key.clone()).await.unwrap();
}
})
})
.collect::<Vec<_>>();
futures::future::join_all(tasks).await;
assert!(counters.values().all(|x| x.load(Ordering::Relaxed) == 1000));
}
}

View File

@@ -120,7 +120,7 @@ pub struct MetaSrv {
handler_group: HeartbeatHandlerGroup,
election: Option<ElectionRef>,
meta_peer_client: Option<MetaPeerClient>,
lock: Option<DistLockRef>,
lock: DistLockRef,
procedure_manager: ProcedureManagerRef,
metadata_service: MetadataServiceRef,
mailbox: MailboxRef,
@@ -244,8 +244,8 @@ impl MetaSrv {
}
#[inline]
pub fn lock(&self) -> Option<DistLockRef> {
self.lock.clone()
pub fn lock(&self) -> &DistLockRef {
&self.lock
}
#[inline]

View File

@@ -25,6 +25,7 @@ use crate::handler::{
KeepLeaseHandler, OnLeaderStartHandler, PersistStatsHandler, Pushers, RegionFailureHandler,
ResponseHeaderHandler,
};
use crate::lock::memory::MemLock;
use crate::lock::DistLockRef;
use crate::metadata_service::{DefaultMetadataService, MetadataServiceRef};
use crate::metasrv::{
@@ -140,6 +141,8 @@ impl MetaSrvBuilder {
let state_store = Arc::new(MetaStateStore::new(kv_store.clone()));
let procedure_manager = Arc::new(LocalManager::new(ManagerConfig::default(), state_store));
let lock = lock.unwrap_or_else(|| Arc::new(MemLock::default()));
let handler_group = match handler_group {
Some(handler_group) => handler_group,
None => {
@@ -154,6 +157,7 @@ impl MetaSrvBuilder {
catalog: None,
schema: None,
},
lock.clone(),
));
let region_failure_handler =

View File

@@ -38,6 +38,7 @@ use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use crate::error::{Error, RegisterProcedureLoaderSnafu, Result};
use crate::lock::DistLockRef;
use crate::metasrv::{SelectorContext, SelectorRef};
use crate::service::mailbox::MailboxRef;
@@ -49,6 +50,7 @@ pub(crate) struct RegionFailoverManager {
procedure_manager: ProcedureManagerRef,
selector: SelectorRef,
selector_ctx: SelectorContext,
dist_lock: DistLockRef,
running_procedures: Arc<Mutex<HashSet<RegionIdent>>>,
}
@@ -72,33 +74,35 @@ impl RegionFailoverManager {
procedure_manager: ProcedureManagerRef,
selector: SelectorRef,
selector_ctx: SelectorContext,
dist_lock: DistLockRef,
) -> Self {
Self {
mailbox,
procedure_manager,
selector,
selector_ctx,
dist_lock,
running_procedures: Arc::new(Mutex::new(HashSet::new())),
}
}
fn create_context(&self) -> RegionFailoverContext {
RegionFailoverContext {
mailbox: self.mailbox.clone(),
selector: self.selector.clone(),
selector_ctx: self.selector_ctx.clone(),
dist_lock: self.dist_lock.clone(),
}
}
pub(crate) fn try_start(&self) -> Result<()> {
let mailbox = self.mailbox.clone();
let selector = self.selector.clone();
let selector_ctx = self.selector_ctx.clone();
let context = self.create_context();
self.procedure_manager
.register_loader(
RegionFailoverProcedure::TYPE_NAME,
Box::new(move |json| {
RegionFailoverProcedure::from_json(
json,
RegionFailoverContext {
mailbox: mailbox.clone(),
selector: selector.clone(),
selector_ctx: selector_ctx.clone(),
},
)
.map(|p| Box::new(p) as _)
let context = context.clone();
RegionFailoverProcedure::from_json(json, context).map(|p| Box::new(p) as _)
}),
)
.context(RegisterProcedureLoaderSnafu {
@@ -120,14 +124,8 @@ impl RegionFailoverManager {
return;
}
let procedure = RegionFailoverProcedure::new(
failed_region.clone(),
RegionFailoverContext {
mailbox: self.mailbox.clone(),
selector: self.selector.clone(),
selector_ctx: self.selector_ctx.clone(),
},
);
let context = self.create_context();
let procedure = RegionFailoverProcedure::new(failed_region.clone(), context);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
let procedure_id = procedure_with_id.id;
info!("Starting region failover procedure {procedure_id} for region {failed_region:?}");
@@ -172,6 +170,7 @@ pub struct RegionFailoverContext {
pub mailbox: MailboxRef,
pub selector: SelectorRef,
pub selector_ctx: SelectorContext,
pub dist_lock: DistLockRef,
}
/// The state machine of region failover procedure. Driven by the call to `next`.
@@ -316,6 +315,7 @@ mod tests {
use super::*;
use crate::handler::{HeartbeatMailbox, Pusher, Pushers};
use crate::lock::memory::MemLock;
use crate::selector::{Namespace, Selector};
use crate::sequence::Sequence;
use crate::service::mailbox::Channel;
@@ -356,21 +356,54 @@ mod tests {
pub struct TestingEnv {
pub context: RegionFailoverContext,
pub failed_region: RegionIdent,
pub heartbeat_receivers: HashMap<DatanodeId, Receiver<tonic::Result<HeartbeatResponse>>>,
}
impl TestingEnv {
pub async fn failed_region(&self, region_number: u32) -> RegionIdent {
let table = "my_table";
let key = TableGlobalKey {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table.to_string(),
};
let value =
table_routes::get_table_global_value(&self.context.selector_ctx.kv_store, &key)
.await
.unwrap()
.unwrap();
let failed_datanode = value
.regions_id_map
.iter()
.find_map(|(&datanode_id, regions)| {
if regions.contains(&region_number) {
Some(datanode_id)
} else {
None
}
})
.unwrap();
RegionIdent {
cluster_id: 0,
datanode_id: failed_datanode,
table_id: 1,
engine: MITO_ENGINE.to_string(),
region_number,
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table: table.to_string(),
}
}
}
pub struct TestingEnvBuilder {
selector: Option<SelectorRef>,
failed_region: Option<u32>,
}
impl TestingEnvBuilder {
pub fn new() -> Self {
Self {
selector: None,
failed_region: None,
}
Self { selector: None }
}
#[allow(unused)]
@@ -379,11 +412,6 @@ mod tests {
self
}
pub fn with_failed_region(mut self, failed_region: u32) -> Self {
self.failed_region = Some(failed_region);
self
}
pub async fn build(self) -> TestingEnv {
let kv_store = Arc::new(MemStore::new()) as _;
@@ -409,29 +437,6 @@ mod tests {
Sequence::new("test_heartbeat_mailbox", 0, 100, kv_store.clone());
let mailbox = HeartbeatMailbox::create(pushers, mailbox_sequence);
let failed_region = self.failed_region.unwrap_or(1);
let failed_datanode = table_global_value
.regions_id_map
.iter()
.find_map(|(datanode_id, regions)| {
if regions.contains(&failed_region) {
Some(*datanode_id)
} else {
None
}
})
.unwrap();
let failed_region = RegionIdent {
cluster_id: 0,
datanode_id: failed_datanode,
table_id: 1,
engine: MITO_ENGINE.to_string(),
region_number: failed_region,
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table: table.to_string(),
};
let selector = self.selector.unwrap_or_else(|| {
let nodes = (1..=table_global_value.regions_id_map.len())
.map(|id| Peer {
@@ -454,8 +459,8 @@ mod tests {
mailbox,
selector,
selector_ctx,
dist_lock: Arc::new(MemLock::default()),
},
failed_region,
heartbeat_receivers,
}
}
@@ -465,21 +470,19 @@ mod tests {
async fn test_region_failover_procedure() {
common_telemetry::init_default_ut_logging();
let TestingEnv {
context,
failed_region,
mut heartbeat_receivers,
} = TestingEnvBuilder::new().build().await;
let mut env = TestingEnvBuilder::new().build().await;
let failed_region = env.failed_region(1).await;
let mut procedure = Box::new(RegionFailoverProcedure::new(
failed_region.clone(),
context.clone(),
env.context.clone(),
)) as BoxedProcedure;
let mut failed_datanode = heartbeat_receivers
let mut failed_datanode = env
.heartbeat_receivers
.remove(&failed_region.datanode_id)
.unwrap();
let mailbox_clone = context.mailbox.clone();
let mailbox_clone = env.context.mailbox.clone();
let failed_region_clone = failed_region.clone();
common_runtime::spawn_bg(async move {
let resp = failed_datanode.recv().await.unwrap().unwrap();
@@ -516,8 +519,8 @@ mod tests {
});
let (candidate_tx, mut candidate_rx) = tokio::sync::mpsc::channel(1);
for (datanode_id, mut recv) in heartbeat_receivers.into_iter() {
let mailbox_clone = context.mailbox.clone();
for (datanode_id, mut recv) in env.heartbeat_receivers.into_iter() {
let mailbox_clone = env.context.mailbox.clone();
let failed_region_clone = failed_region.clone();
let candidate_tx = candidate_tx.clone();
common_runtime::spawn_bg(async move {
@@ -575,7 +578,7 @@ mod tests {
schema_name: failed_region.schema.clone(),
table_name: failed_region.table.clone(),
};
let value = table_routes::get_table_global_value(&context.selector_ctx.kv_store, &key)
let value = table_routes::get_table_global_value(&env.context.selector_ctx.kv_store, &key)
.await
.unwrap()
.unwrap();
@@ -595,18 +598,18 @@ mod tests {
#[tokio::test]
async fn test_state_serde() {
let TestingEnv {
context,
failed_region,
heartbeat_receivers: _,
} = TestingEnvBuilder::new().build().await;
let env = TestingEnvBuilder::new().build().await;
let failed_region = env.failed_region(1).await;
let state = RegionFailoverStart::new();
let node = Node {
failed_region,
state: Some(Box::new(state)),
};
let procedure = RegionFailoverProcedure { node, context };
let procedure = RegionFailoverProcedure {
node,
context: env.context,
};
let s = procedure.dump().unwrap();
assert_eq!(

View File

@@ -131,30 +131,27 @@ mod tests {
use api::v1::meta::mailbox_message::Payload;
use common_meta::instruction::SimpleReply;
use super::super::tests::{TestingEnv, TestingEnvBuilder};
use super::super::tests::TestingEnvBuilder;
use super::*;
#[tokio::test]
async fn test_activate_region_success() {
common_telemetry::init_default_ut_logging();
let TestingEnv {
context,
failed_region,
mut heartbeat_receivers,
} = TestingEnvBuilder::new().build().await;
let mut env = TestingEnvBuilder::new().build().await;
let failed_region = env.failed_region(1).await;
let candidate = 2;
let state = ActivateRegion::new(Peer::new(candidate, ""));
let mailbox_receiver = state
.send_open_region_message(&context, &failed_region, Duration::from_millis(100))
.send_open_region_message(&env.context, &failed_region, Duration::from_millis(100))
.await
.unwrap();
let message_id = mailbox_receiver.message_id();
// verify that the open region message is sent
let rx = heartbeat_receivers.get_mut(&candidate).unwrap();
let rx = env.heartbeat_receivers.get_mut(&candidate).unwrap();
let resp = rx.recv().await.unwrap().unwrap();
let received = &resp.mailbox_message.unwrap();
assert_eq!(received.id, message_id);
@@ -169,7 +166,7 @@ mod tests {
);
// simulating response from Datanode
context
env.context
.mailbox
.on_recv(
message_id,
@@ -205,21 +202,18 @@ mod tests {
async fn test_activate_region_timeout() {
common_telemetry::init_default_ut_logging();
let TestingEnv {
context,
failed_region,
mut heartbeat_receivers,
} = TestingEnvBuilder::new().build().await;
let mut env = TestingEnvBuilder::new().build().await;
let failed_region = env.failed_region(1).await;
let candidate = 2;
let state = ActivateRegion::new(Peer::new(candidate, ""));
let mailbox_receiver = state
.send_open_region_message(&context, &failed_region, Duration::from_millis(100))
.send_open_region_message(&env.context, &failed_region, Duration::from_millis(100))
.await
.unwrap();
// verify that the open region message is sent
let rx = heartbeat_receivers.get_mut(&candidate).unwrap();
let rx = env.heartbeat_receivers.get_mut(&candidate).unwrap();
let resp = rx.recv().await.unwrap().unwrap();
let received = &resp.mailbox_message.unwrap();
assert_eq!(received.id, mailbox_receiver.message_id());

View File

@@ -136,29 +136,27 @@ mod tests {
use api::v1::meta::mailbox_message::Payload;
use common_meta::instruction::SimpleReply;
use super::super::tests::{TestingEnv, TestingEnvBuilder};
use super::super::tests::TestingEnvBuilder;
use super::*;
#[tokio::test]
async fn test_deactivate_region_success() {
common_telemetry::init_default_ut_logging();
let TestingEnv {
context,
failed_region,
mut heartbeat_receivers,
} = TestingEnvBuilder::new().build().await;
let mut env = TestingEnvBuilder::new().build().await;
let failed_region = env.failed_region(1).await;
let state = DeactivateRegion::new(Peer::new(2, ""));
let mailbox_receiver = state
.send_close_region_message(&context, &failed_region, Duration::from_millis(100))
.send_close_region_message(&env.context, &failed_region, Duration::from_millis(100))
.await
.unwrap();
let message_id = mailbox_receiver.message_id();
// verify that the close region message is sent
let rx = heartbeat_receivers
let rx = env
.heartbeat_receivers
.get_mut(&failed_region.datanode_id)
.unwrap();
let resp = rx.recv().await.unwrap().unwrap();
@@ -175,7 +173,7 @@ mod tests {
);
// simulating response from Datanode
context
env.context
.mailbox
.on_recv(
message_id,
@@ -211,20 +209,18 @@ mod tests {
async fn test_deactivate_region_timeout() {
common_telemetry::init_default_ut_logging();
let TestingEnv {
context,
failed_region,
mut heartbeat_receivers,
} = TestingEnvBuilder::new().build().await;
let mut env = TestingEnvBuilder::new().build().await;
let failed_region = env.failed_region(1).await;
let state = DeactivateRegion::new(Peer::new(2, ""));
let mailbox_receiver = state
.send_close_region_message(&context, &failed_region, Duration::from_millis(100))
.send_close_region_message(&env.context, &failed_region, Duration::from_millis(100))
.await
.unwrap();
// verify that the open region message is sent
let rx = heartbeat_receivers
let rx = env
.heartbeat_receivers
.get_mut(&failed_region.datanode_id)
.unwrap();
let resp = rx.recv().await.unwrap().unwrap();

View File

@@ -101,30 +101,27 @@ impl State for RegionFailoverStart {
#[cfg(test)]
mod tests {
use super::super::tests::{TestingEnv, TestingEnvBuilder};
use super::super::tests::TestingEnvBuilder;
use super::*;
#[tokio::test]
async fn test_choose_failover_candidate() {
common_telemetry::init_default_ut_logging();
let TestingEnv {
context,
failed_region,
heartbeat_receivers: _,
} = TestingEnvBuilder::new().build().await;
let env = TestingEnvBuilder::new().build().await;
let failed_region = env.failed_region(1).await;
let mut state = RegionFailoverStart::new();
assert!(state.failover_candidate.is_none());
let candidate = state
.choose_candidate(&context, &failed_region)
.choose_candidate(&env.context, &failed_region)
.await
.unwrap();
assert_ne!(candidate.id, failed_region.datanode_id);
let candidate_again = state
.choose_candidate(&context, &failed_region)
.choose_candidate(&env.context, &failed_region)
.await
.unwrap();
assert_eq!(candidate, candidate_again);

View File

@@ -29,6 +29,8 @@ use crate::error::{
TableRouteConversionSnafu,
};
use crate::keys::TableRouteKey;
use crate::lock::keys::table_metadata_lock_key;
use crate::lock::Opts;
use crate::table_routes;
#[derive(Serialize, Deserialize, Debug)]
@@ -41,13 +43,29 @@ impl UpdateRegionMetadata {
Self { candidate }
}
async fn update_meta(
// TODO(LFC): Update the two table metadata values in a batch atomically.
//
// Though the updating of the two metadata values is guarded by a distributed lock,
// it does not robust enough. For example, the lock lease could be expired in the middle of
// one's updating, letting others to start updating concurrently. For now, we set the lease of
// the distributed lock to 10 seconds, which is long enough here to get the job done.
//
// Maybe we should introduce "version" companion values to these two metadata values, and
// use ETCD transaction request to update them?
/// Updates the metadata of the table. Specifically, the [TableGlobalValue] and [TableRouteValue].
async fn update_metadata(
&self,
ctx: &RegionFailoverContext,
failed_region: &RegionIdent,
) -> Result<()> {
let key = table_metadata_lock_key(failed_region);
let key = ctx.dist_lock.lock(key, Opts::default()).await?;
self.update_table_global_value(ctx, failed_region).await?;
self.update_table_route(ctx, failed_region).await?;
ctx.dist_lock.unlock(key).await?;
Ok(())
}
@@ -183,15 +201,17 @@ impl State for UpdateRegionMetadata {
ctx: &RegionFailoverContext,
failed_region: &RegionIdent,
) -> Result<Box<dyn State>> {
self.update_meta(ctx, failed_region).await.map_err(|e| {
RetryLaterSnafu {
reason: format!(
"Failed to update metadata for failed region: {}, error: {}",
failed_region, e
),
}
.build()
})?;
self.update_metadata(ctx, failed_region)
.await
.map_err(|e| {
RetryLaterSnafu {
reason: format!(
"Failed to update metadata for failed region: {}, error: {}",
failed_region, e
),
}
.build()
})?;
Ok(Box::new(RegionFailoverEnd))
}
}
@@ -209,12 +229,8 @@ mod tests {
async fn test_update_table_global_value() {
common_telemetry::init_default_ut_logging();
async fn test(env: TestingEnv, candidate: u64) -> TableGlobalValue {
let TestingEnv {
context,
failed_region,
heartbeat_receivers: _,
} = env;
async fn test(env: TestingEnv, failed_region: u32, candidate: u64) -> TableGlobalValue {
let failed_region = env.failed_region(failed_region).await;
let key = TableGlobalKey {
catalog_name: failed_region.catalog.clone(),
@@ -223,19 +239,19 @@ mod tests {
};
let original =
table_routes::get_table_global_value(&context.selector_ctx.kv_store, &key)
table_routes::get_table_global_value(&env.context.selector_ctx.kv_store, &key)
.await
.unwrap()
.unwrap();
let state = UpdateRegionMetadata::new(Peer::new(candidate, ""));
state
.update_table_global_value(&context, &failed_region)
.update_table_global_value(&env.context, &failed_region)
.await
.unwrap();
let updated =
table_routes::get_table_global_value(&context.selector_ctx.kv_store, &key)
table_routes::get_table_global_value(&env.context.selector_ctx.kv_store, &key)
.await
.unwrap()
.unwrap();
@@ -253,8 +269,8 @@ mod tests {
// 3 => 4
// Testing failed region 1 moves to Datanode 2.
let env = TestingEnvBuilder::new().with_failed_region(1).build().await;
let updated = test(env, 2).await;
let env = TestingEnvBuilder::new().build().await;
let updated = test(env, 1, 2).await;
let new_region_id_map = updated.regions_id_map;
assert_eq!(new_region_id_map.len(), 3);
@@ -263,8 +279,8 @@ mod tests {
assert_eq!(new_region_id_map.get(&3), Some(&vec![4]));
// Testing failed region 3 moves to Datanode 3.
let env = TestingEnvBuilder::new().with_failed_region(3).build().await;
let updated = test(env, 3).await;
let env = TestingEnvBuilder::new().build().await;
let updated = test(env, 3, 3).await;
let new_region_id_map = updated.regions_id_map;
assert_eq!(new_region_id_map.len(), 2);
@@ -272,8 +288,8 @@ mod tests {
assert_eq!(new_region_id_map.get(&3), Some(&vec![4, 3]));
// Testing failed region 1 moves to a new Datanode, 4.
let env = TestingEnvBuilder::new().with_failed_region(1).build().await;
let updated = test(env, 4).await;
let env = TestingEnvBuilder::new().build().await;
let updated = test(env, 1, 4).await;
let new_region_id_map = updated.regions_id_map;
assert_eq!(new_region_id_map.len(), 4);
@@ -283,8 +299,8 @@ mod tests {
assert_eq!(new_region_id_map.get(&4), Some(&vec![1]));
// Testing failed region 3 moves to a new Datanode, 4.
let env = TestingEnvBuilder::new().with_failed_region(3).build().await;
let updated = test(env, 4).await;
let env = TestingEnvBuilder::new().build().await;
let updated = test(env, 3, 4).await;
let new_region_id_map = updated.regions_id_map;
assert_eq!(new_region_id_map.len(), 3);
@@ -297,16 +313,12 @@ mod tests {
async fn test_update_table_route() {
common_telemetry::init_default_ut_logging();
async fn test(env: TestingEnv, candidate: u64) -> TableRouteValue {
let TestingEnv {
context,
failed_region,
heartbeat_receivers: _,
} = env;
async fn test(env: TestingEnv, failed_region: u32, candidate: u64) -> TableRouteValue {
let failed_region = env.failed_region(failed_region).await;
let state = UpdateRegionMetadata::new(Peer::new(candidate, ""));
state
.update_table_route(&context, &failed_region)
.update_table_route(&env.context, &failed_region)
.await
.unwrap();
@@ -316,7 +328,7 @@ mod tests {
schema_name: &failed_region.schema,
table_name: &failed_region.table,
};
table_routes::get_table_route_value(&context.selector_ctx.kv_store, &key)
table_routes::get_table_route_value(&env.context.selector_ctx.kv_store, &key)
.await
.unwrap()
}
@@ -329,8 +341,8 @@ mod tests {
// 4 => 3
// Testing failed region 1 moves to Datanode 2.
let env = TestingEnvBuilder::new().with_failed_region(1).build().await;
let updated = test(env, 2).await;
let env = TestingEnvBuilder::new().build().await;
let updated = test(env, 1, 2).await;
let actual = &updated.table_route.as_ref().unwrap().region_routes;
// Expected region routes:
@@ -341,17 +353,17 @@ mod tests {
// 4 => 3
let peers = &updated.peers;
assert_eq!(peers.len(), 3);
let expected = vec![
let expected = &vec![
new_region_route(1, peers, 2),
new_region_route(2, peers, 1),
new_region_route(3, peers, 2),
new_region_route(4, peers, 3),
];
assert_eq!(actual, &expected);
assert_eq!(actual, expected);
// Testing failed region 3 moves to Datanode 3.
let env = TestingEnvBuilder::new().with_failed_region(3).build().await;
let updated = test(env, 3).await;
let env = TestingEnvBuilder::new().build().await;
let updated = test(env, 3, 3).await;
let actual = &updated.table_route.as_ref().unwrap().region_routes;
// Expected region routes:
@@ -362,17 +374,17 @@ mod tests {
// 4 => 3
let peers = &updated.peers;
assert_eq!(peers.len(), 2);
let expected = vec![
let expected = &vec![
new_region_route(1, peers, 1),
new_region_route(2, peers, 1),
new_region_route(3, peers, 3),
new_region_route(4, peers, 3),
];
assert_eq!(actual, &expected);
assert_eq!(actual, expected);
// Testing failed region 1 moves to a new Datanode, 4.
let env = TestingEnvBuilder::new().with_failed_region(1).build().await;
let updated = test(env, 4).await;
let env = TestingEnvBuilder::new().build().await;
let updated = test(env, 1, 4).await;
let actual = &updated.table_route.as_ref().unwrap().region_routes;
// Expected region routes:
@@ -383,17 +395,17 @@ mod tests {
// 4 => 3
let peers = &updated.peers;
assert_eq!(peers.len(), 4);
let expected = vec![
let expected = &vec![
new_region_route(1, peers, 4),
new_region_route(2, peers, 1),
new_region_route(3, peers, 2),
new_region_route(4, peers, 3),
];
assert_eq!(actual, &expected);
assert_eq!(actual, expected);
// Testing failed region 3 moves to a new Datanode, 4.
let env = TestingEnvBuilder::new().with_failed_region(3).build().await;
let updated = test(env, 4).await;
let env = TestingEnvBuilder::new().build().await;
let updated = test(env, 3, 4).await;
let actual = &updated.table_route.as_ref().unwrap().region_routes;
// Expected region routes:
@@ -404,12 +416,112 @@ mod tests {
// 4 => 3
let peers = &updated.peers;
assert_eq!(peers.len(), 3);
let expected = vec![
let expected = &vec![
new_region_route(1, peers, 1),
new_region_route(2, peers, 1),
new_region_route(3, peers, 4),
new_region_route(4, peers, 3),
];
assert_eq!(actual, &expected);
assert_eq!(actual, expected);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_update_metadata_concurrently() {
common_telemetry::init_default_ut_logging();
// Test the correctness of concurrently updating the region distribution in table global
// value, and region routes in table route value. Region 1 moves to Datanode 2; region 2
// moves to Datanode 3.
//
// Datanode => Regions
// Before: | After:
// 1 => 1, 2 |
// 2 => 3 | 2 => 3, 1
// 3 => 4 | 3 => 4, 2
//
// region number => leader node
// Before: | After:
// 1 => 1 | 1 => 2
// 2 => 1 | 2 => 3
// 3 => 2 | 3 => 2
// 4 => 3 | 4 => 3
//
// Test case runs 10 times to enlarge the possibility of concurrent updating.
for _ in 0..10 {
let env = TestingEnvBuilder::new().build().await;
let ctx_1 = env.context.clone();
let ctx_2 = env.context.clone();
let failed_region_1 = env.failed_region(1).await;
let failed_region_2 = env.failed_region(2).await;
let catalog_name = failed_region_1.catalog.clone();
let schema_name = failed_region_1.schema.clone();
let table_name = failed_region_1.table.clone();
let table_id = failed_region_1.table_id as u64;
futures::future::join_all(vec![
tokio::spawn(async move {
let state = UpdateRegionMetadata::new(Peer::new(2, ""));
state
.update_metadata(&ctx_1, &failed_region_1)
.await
.unwrap();
}),
tokio::spawn(async move {
let state = UpdateRegionMetadata::new(Peer::new(3, ""));
state
.update_metadata(&ctx_2, &failed_region_2)
.await
.unwrap();
}),
])
.await;
let table_route_key = TableRouteKey {
table_id,
catalog_name: &catalog_name,
schema_name: &schema_name,
table_name: &table_name,
};
let table_route_value = table_routes::get_table_route_value(
&env.context.selector_ctx.kv_store,
&table_route_key,
)
.await
.unwrap();
let peers = &table_route_value.peers;
let actual = &table_route_value
.table_route
.as_ref()
.unwrap()
.region_routes;
let expected = &vec![
new_region_route(1, peers, 2),
new_region_route(2, peers, 3),
new_region_route(3, peers, 2),
new_region_route(4, peers, 3),
];
assert_eq!(peers.len(), 2);
assert_eq!(actual, expected);
let table_global_key = TableGlobalKey {
catalog_name,
schema_name,
table_name,
};
let table_global_value = table_routes::get_table_global_value(
&env.context.selector_ctx.kv_store,
&table_global_key,
)
.await
.unwrap()
.unwrap();
let map = table_global_value.regions_id_map;
assert_eq!(map.len(), 2);
assert_eq!(map.get(&2), Some(&vec![3, 1]));
assert_eq!(map.get(&3), Some(&vec![4, 2]));
}
}
}

View File

@@ -13,11 +13,9 @@
// limitations under the License.
use api::v1::meta::{lock_server, LockRequest, LockResponse, UnlockRequest, UnlockResponse};
use snafu::OptionExt;
use tonic::{Request, Response};
use super::GrpcResult;
use crate::error;
use crate::lock::Opts;
use crate::metasrv::MetaSrv;
@@ -29,8 +27,7 @@ impl lock_server::Lock for MetaSrv {
} = request.into_inner();
let expire_secs = Some(expire_secs as u64);
let lock = self.lock().context(error::LockNotConfigSnafu)?;
let key = lock.lock(name, Opts { expire_secs }).await?;
let key = self.lock().lock(name, Opts { expire_secs }).await?;
let resp = LockResponse {
key,
@@ -43,8 +40,7 @@ impl lock_server::Lock for MetaSrv {
async fn unlock(&self, request: Request<UnlockRequest>) -> GrpcResult<UnlockResponse> {
let UnlockRequest { key, .. } = request.into_inner();
let lock = self.lock().context(error::LockNotConfigSnafu)?;
let _ = lock.unlock(key).await?;
let _ = self.lock().unlock(key).await?;
let resp = UnlockResponse {
..Default::default()

View File

@@ -17,6 +17,7 @@ use std::sync::Arc;
use common_procedure::local::{LocalManager, ManagerConfig};
use crate::handler::{HeartbeatMailbox, Pushers};
use crate::lock::memory::MemLock;
use crate::metasrv::SelectorContext;
use crate::procedure::region_failover::RegionFailoverManager;
use crate::procedure::state_store::MetaStateStore;
@@ -48,5 +49,6 @@ pub(crate) fn create_region_failover_manager() -> Arc<RegionFailoverManager> {
procedure_manager,
selector,
selector_ctx,
Arc::new(MemLock::default()),
))
}

View File

@@ -134,6 +134,7 @@ async fn run_region_failover_procedure(cluster: &GreptimeDbCluster, failed_regio
catalog: None,
schema: None,
},
dist_lock: meta_srv.lock().clone(),
},
);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));