diff --git a/src/meta-srv/src/lock.rs b/src/meta-srv/src/lock.rs index 9923ed8945..b4d0a3eafe 100644 --- a/src/meta-srv/src/lock.rs +++ b/src/meta-srv/src/lock.rs @@ -13,6 +13,8 @@ // limitations under the License. pub mod etcd; +pub(crate) mod keys; +pub(crate) mod memory; use std::sync::Arc; @@ -28,6 +30,14 @@ pub struct Opts { pub expire_secs: Option, } +impl Default for Opts { + fn default() -> Self { + Opts { + expire_secs: Some(DEFAULT_EXPIRE_TIME_SECS), + } + } +} + #[async_trait::async_trait] pub trait DistLock: Send + Sync { // Lock acquires a distributed shared lock on a given named lock. On success, it diff --git a/src/meta-srv/src/lock/keys.rs b/src/meta-srv/src/lock/keys.rs new file mode 100644 index 0000000000..b1b87244bf --- /dev/null +++ b/src/meta-srv/src/lock/keys.rs @@ -0,0 +1,28 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! All keys used for distributed locking in the Metasrv. +//! Place them in this unified module for better maintenance. + +use common_meta::RegionIdent; + +use crate::lock::Key; + +pub(crate) fn table_metadata_lock_key(region: &RegionIdent) -> Key { + format!( + "table_metadata_lock_({}-{}.{}.{}-{})", + region.cluster_id, region.catalog, region.schema, region.table, region.table_id, + ) + .into_bytes() +} diff --git a/src/meta-srv/src/lock/memory.rs b/src/meta-srv/src/lock/memory.rs new file mode 100644 index 0000000000..72637700f1 --- /dev/null +++ b/src/meta-srv/src/lock/memory.rs @@ -0,0 +1,112 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use dashmap::DashMap; +use tokio::sync::{Mutex, OwnedMutexGuard}; + +use crate::error::Result; +use crate::lock::{DistLock, Key, Opts}; + +#[derive(Default)] +pub(crate) struct MemLock { + mutexes: DashMap>>, + guards: DashMap>, +} + +#[async_trait] +impl DistLock for MemLock { + async fn lock(&self, key: Vec, _opts: Opts) -> Result { + let mutex = self + .mutexes + .entry(key.clone()) + .or_insert_with(|| Arc::new(Mutex::new(()))) + .clone(); + + let guard = mutex.lock_owned().await; + + self.guards.insert(key.clone(), guard); + Ok(key) + } + + async fn unlock(&self, key: Vec) -> Result<()> { + // drop the guard, so that the mutex can be unlocked, + // effectively make the `mutex.lock_owned` in `lock` method to proceed + self.guards.remove(&key); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::atomic::{AtomicU32, Ordering}; + + use rand::seq::SliceRandom; + + use super::*; + + #[tokio::test(flavor = "multi_thread")] + async fn test_mem_lock_concurrently() { + let lock = Arc::new(MemLock::default()); + + let keys = (0..10) + .map(|i| format!("my-lock-{i}").into_bytes()) + .collect::>(); + let counters: [(Key, AtomicU32); 10] = keys + .iter() + .map(|x| (x.clone(), AtomicU32::new(0))) + .collect::>() + .try_into() + .unwrap(); + let counters = Arc::new(HashMap::from(counters)); + + let tasks = (0..100) + .map(|_| { + let mut keys = keys.clone(); + keys.shuffle(&mut rand::thread_rng()); + + let lock_clone = lock.clone(); + let counters_clone = counters.clone(); + tokio::spawn(async move { + // every key counter will be added by 1 for 10 times + for i in 0..100 { + let key = &keys[i % keys.len()]; + lock_clone + .lock(key.clone(), Opts { expire_secs: None }) + .await + .unwrap(); + + // Intentionally create a critical section: + // if our MemLock is flawed, the resulting counter is wrong. + // + // Note that AtomicU32 is only used to enable the updates from multiple tasks, + // does not make any guarantee about the correctness of the result. + + let counter = counters_clone.get(key).unwrap(); + let v = counter.load(Ordering::Relaxed); + counter.store(v + 1, Ordering::Relaxed); + + lock_clone.unlock(key.clone()).await.unwrap(); + } + }) + }) + .collect::>(); + futures::future::join_all(tasks).await; + + assert!(counters.values().all(|x| x.load(Ordering::Relaxed) == 1000)); + } +} diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 66635b27f8..64e4117aa6 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -120,7 +120,7 @@ pub struct MetaSrv { handler_group: HeartbeatHandlerGroup, election: Option, meta_peer_client: Option, - lock: Option, + lock: DistLockRef, procedure_manager: ProcedureManagerRef, metadata_service: MetadataServiceRef, mailbox: MailboxRef, @@ -244,8 +244,8 @@ impl MetaSrv { } #[inline] - pub fn lock(&self) -> Option { - self.lock.clone() + pub fn lock(&self) -> &DistLockRef { + &self.lock } #[inline] diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index ac41100545..5b89dca3e3 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -25,6 +25,7 @@ use crate::handler::{ KeepLeaseHandler, OnLeaderStartHandler, PersistStatsHandler, Pushers, RegionFailureHandler, ResponseHeaderHandler, }; +use crate::lock::memory::MemLock; use crate::lock::DistLockRef; use crate::metadata_service::{DefaultMetadataService, MetadataServiceRef}; use crate::metasrv::{ @@ -140,6 +141,8 @@ impl MetaSrvBuilder { let state_store = Arc::new(MetaStateStore::new(kv_store.clone())); let procedure_manager = Arc::new(LocalManager::new(ManagerConfig::default(), state_store)); + let lock = lock.unwrap_or_else(|| Arc::new(MemLock::default())); + let handler_group = match handler_group { Some(handler_group) => handler_group, None => { @@ -154,6 +157,7 @@ impl MetaSrvBuilder { catalog: None, schema: None, }, + lock.clone(), )); let region_failure_handler = diff --git a/src/meta-srv/src/procedure/region_failover.rs b/src/meta-srv/src/procedure/region_failover.rs index 49fb17c9ec..167ba1c9b3 100644 --- a/src/meta-srv/src/procedure/region_failover.rs +++ b/src/meta-srv/src/procedure/region_failover.rs @@ -38,6 +38,7 @@ use serde::{Deserialize, Serialize}; use snafu::ResultExt; use crate::error::{Error, RegisterProcedureLoaderSnafu, Result}; +use crate::lock::DistLockRef; use crate::metasrv::{SelectorContext, SelectorRef}; use crate::service::mailbox::MailboxRef; @@ -49,6 +50,7 @@ pub(crate) struct RegionFailoverManager { procedure_manager: ProcedureManagerRef, selector: SelectorRef, selector_ctx: SelectorContext, + dist_lock: DistLockRef, running_procedures: Arc>>, } @@ -72,33 +74,35 @@ impl RegionFailoverManager { procedure_manager: ProcedureManagerRef, selector: SelectorRef, selector_ctx: SelectorContext, + dist_lock: DistLockRef, ) -> Self { Self { mailbox, procedure_manager, selector, selector_ctx, + dist_lock, running_procedures: Arc::new(Mutex::new(HashSet::new())), } } + fn create_context(&self) -> RegionFailoverContext { + RegionFailoverContext { + mailbox: self.mailbox.clone(), + selector: self.selector.clone(), + selector_ctx: self.selector_ctx.clone(), + dist_lock: self.dist_lock.clone(), + } + } + pub(crate) fn try_start(&self) -> Result<()> { - let mailbox = self.mailbox.clone(); - let selector = self.selector.clone(); - let selector_ctx = self.selector_ctx.clone(); + let context = self.create_context(); self.procedure_manager .register_loader( RegionFailoverProcedure::TYPE_NAME, Box::new(move |json| { - RegionFailoverProcedure::from_json( - json, - RegionFailoverContext { - mailbox: mailbox.clone(), - selector: selector.clone(), - selector_ctx: selector_ctx.clone(), - }, - ) - .map(|p| Box::new(p) as _) + let context = context.clone(); + RegionFailoverProcedure::from_json(json, context).map(|p| Box::new(p) as _) }), ) .context(RegisterProcedureLoaderSnafu { @@ -120,14 +124,8 @@ impl RegionFailoverManager { return; } - let procedure = RegionFailoverProcedure::new( - failed_region.clone(), - RegionFailoverContext { - mailbox: self.mailbox.clone(), - selector: self.selector.clone(), - selector_ctx: self.selector_ctx.clone(), - }, - ); + let context = self.create_context(); + let procedure = RegionFailoverProcedure::new(failed_region.clone(), context); let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); let procedure_id = procedure_with_id.id; info!("Starting region failover procedure {procedure_id} for region {failed_region:?}"); @@ -172,6 +170,7 @@ pub struct RegionFailoverContext { pub mailbox: MailboxRef, pub selector: SelectorRef, pub selector_ctx: SelectorContext, + pub dist_lock: DistLockRef, } /// The state machine of region failover procedure. Driven by the call to `next`. @@ -316,6 +315,7 @@ mod tests { use super::*; use crate::handler::{HeartbeatMailbox, Pusher, Pushers}; + use crate::lock::memory::MemLock; use crate::selector::{Namespace, Selector}; use crate::sequence::Sequence; use crate::service::mailbox::Channel; @@ -356,21 +356,54 @@ mod tests { pub struct TestingEnv { pub context: RegionFailoverContext, - pub failed_region: RegionIdent, pub heartbeat_receivers: HashMap>>, } + impl TestingEnv { + pub async fn failed_region(&self, region_number: u32) -> RegionIdent { + let table = "my_table"; + let key = TableGlobalKey { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: table.to_string(), + }; + let value = + table_routes::get_table_global_value(&self.context.selector_ctx.kv_store, &key) + .await + .unwrap() + .unwrap(); + + let failed_datanode = value + .regions_id_map + .iter() + .find_map(|(&datanode_id, regions)| { + if regions.contains(®ion_number) { + Some(datanode_id) + } else { + None + } + }) + .unwrap(); + RegionIdent { + cluster_id: 0, + datanode_id: failed_datanode, + table_id: 1, + engine: MITO_ENGINE.to_string(), + region_number, + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table: table.to_string(), + } + } + } + pub struct TestingEnvBuilder { selector: Option, - failed_region: Option, } impl TestingEnvBuilder { pub fn new() -> Self { - Self { - selector: None, - failed_region: None, - } + Self { selector: None } } #[allow(unused)] @@ -379,11 +412,6 @@ mod tests { self } - pub fn with_failed_region(mut self, failed_region: u32) -> Self { - self.failed_region = Some(failed_region); - self - } - pub async fn build(self) -> TestingEnv { let kv_store = Arc::new(MemStore::new()) as _; @@ -409,29 +437,6 @@ mod tests { Sequence::new("test_heartbeat_mailbox", 0, 100, kv_store.clone()); let mailbox = HeartbeatMailbox::create(pushers, mailbox_sequence); - let failed_region = self.failed_region.unwrap_or(1); - let failed_datanode = table_global_value - .regions_id_map - .iter() - .find_map(|(datanode_id, regions)| { - if regions.contains(&failed_region) { - Some(*datanode_id) - } else { - None - } - }) - .unwrap(); - let failed_region = RegionIdent { - cluster_id: 0, - datanode_id: failed_datanode, - table_id: 1, - engine: MITO_ENGINE.to_string(), - region_number: failed_region, - catalog: DEFAULT_CATALOG_NAME.to_string(), - schema: DEFAULT_SCHEMA_NAME.to_string(), - table: table.to_string(), - }; - let selector = self.selector.unwrap_or_else(|| { let nodes = (1..=table_global_value.regions_id_map.len()) .map(|id| Peer { @@ -454,8 +459,8 @@ mod tests { mailbox, selector, selector_ctx, + dist_lock: Arc::new(MemLock::default()), }, - failed_region, heartbeat_receivers, } } @@ -465,21 +470,19 @@ mod tests { async fn test_region_failover_procedure() { common_telemetry::init_default_ut_logging(); - let TestingEnv { - context, - failed_region, - mut heartbeat_receivers, - } = TestingEnvBuilder::new().build().await; + let mut env = TestingEnvBuilder::new().build().await; + let failed_region = env.failed_region(1).await; let mut procedure = Box::new(RegionFailoverProcedure::new( failed_region.clone(), - context.clone(), + env.context.clone(), )) as BoxedProcedure; - let mut failed_datanode = heartbeat_receivers + let mut failed_datanode = env + .heartbeat_receivers .remove(&failed_region.datanode_id) .unwrap(); - let mailbox_clone = context.mailbox.clone(); + let mailbox_clone = env.context.mailbox.clone(); let failed_region_clone = failed_region.clone(); common_runtime::spawn_bg(async move { let resp = failed_datanode.recv().await.unwrap().unwrap(); @@ -516,8 +519,8 @@ mod tests { }); let (candidate_tx, mut candidate_rx) = tokio::sync::mpsc::channel(1); - for (datanode_id, mut recv) in heartbeat_receivers.into_iter() { - let mailbox_clone = context.mailbox.clone(); + for (datanode_id, mut recv) in env.heartbeat_receivers.into_iter() { + let mailbox_clone = env.context.mailbox.clone(); let failed_region_clone = failed_region.clone(); let candidate_tx = candidate_tx.clone(); common_runtime::spawn_bg(async move { @@ -575,7 +578,7 @@ mod tests { schema_name: failed_region.schema.clone(), table_name: failed_region.table.clone(), }; - let value = table_routes::get_table_global_value(&context.selector_ctx.kv_store, &key) + let value = table_routes::get_table_global_value(&env.context.selector_ctx.kv_store, &key) .await .unwrap() .unwrap(); @@ -595,18 +598,18 @@ mod tests { #[tokio::test] async fn test_state_serde() { - let TestingEnv { - context, - failed_region, - heartbeat_receivers: _, - } = TestingEnvBuilder::new().build().await; + let env = TestingEnvBuilder::new().build().await; + let failed_region = env.failed_region(1).await; let state = RegionFailoverStart::new(); let node = Node { failed_region, state: Some(Box::new(state)), }; - let procedure = RegionFailoverProcedure { node, context }; + let procedure = RegionFailoverProcedure { + node, + context: env.context, + }; let s = procedure.dump().unwrap(); assert_eq!( diff --git a/src/meta-srv/src/procedure/region_failover/activate_region.rs b/src/meta-srv/src/procedure/region_failover/activate_region.rs index 761a174918..66b8bef58a 100644 --- a/src/meta-srv/src/procedure/region_failover/activate_region.rs +++ b/src/meta-srv/src/procedure/region_failover/activate_region.rs @@ -131,30 +131,27 @@ mod tests { use api::v1::meta::mailbox_message::Payload; use common_meta::instruction::SimpleReply; - use super::super::tests::{TestingEnv, TestingEnvBuilder}; + use super::super::tests::TestingEnvBuilder; use super::*; #[tokio::test] async fn test_activate_region_success() { common_telemetry::init_default_ut_logging(); - let TestingEnv { - context, - failed_region, - mut heartbeat_receivers, - } = TestingEnvBuilder::new().build().await; + let mut env = TestingEnvBuilder::new().build().await; + let failed_region = env.failed_region(1).await; let candidate = 2; let state = ActivateRegion::new(Peer::new(candidate, "")); let mailbox_receiver = state - .send_open_region_message(&context, &failed_region, Duration::from_millis(100)) + .send_open_region_message(&env.context, &failed_region, Duration::from_millis(100)) .await .unwrap(); let message_id = mailbox_receiver.message_id(); // verify that the open region message is sent - let rx = heartbeat_receivers.get_mut(&candidate).unwrap(); + let rx = env.heartbeat_receivers.get_mut(&candidate).unwrap(); let resp = rx.recv().await.unwrap().unwrap(); let received = &resp.mailbox_message.unwrap(); assert_eq!(received.id, message_id); @@ -169,7 +166,7 @@ mod tests { ); // simulating response from Datanode - context + env.context .mailbox .on_recv( message_id, @@ -205,21 +202,18 @@ mod tests { async fn test_activate_region_timeout() { common_telemetry::init_default_ut_logging(); - let TestingEnv { - context, - failed_region, - mut heartbeat_receivers, - } = TestingEnvBuilder::new().build().await; + let mut env = TestingEnvBuilder::new().build().await; + let failed_region = env.failed_region(1).await; let candidate = 2; let state = ActivateRegion::new(Peer::new(candidate, "")); let mailbox_receiver = state - .send_open_region_message(&context, &failed_region, Duration::from_millis(100)) + .send_open_region_message(&env.context, &failed_region, Duration::from_millis(100)) .await .unwrap(); // verify that the open region message is sent - let rx = heartbeat_receivers.get_mut(&candidate).unwrap(); + let rx = env.heartbeat_receivers.get_mut(&candidate).unwrap(); let resp = rx.recv().await.unwrap().unwrap(); let received = &resp.mailbox_message.unwrap(); assert_eq!(received.id, mailbox_receiver.message_id()); diff --git a/src/meta-srv/src/procedure/region_failover/deactivate_region.rs b/src/meta-srv/src/procedure/region_failover/deactivate_region.rs index 49b1095764..b24e188c05 100644 --- a/src/meta-srv/src/procedure/region_failover/deactivate_region.rs +++ b/src/meta-srv/src/procedure/region_failover/deactivate_region.rs @@ -136,29 +136,27 @@ mod tests { use api::v1::meta::mailbox_message::Payload; use common_meta::instruction::SimpleReply; - use super::super::tests::{TestingEnv, TestingEnvBuilder}; + use super::super::tests::TestingEnvBuilder; use super::*; #[tokio::test] async fn test_deactivate_region_success() { common_telemetry::init_default_ut_logging(); - let TestingEnv { - context, - failed_region, - mut heartbeat_receivers, - } = TestingEnvBuilder::new().build().await; + let mut env = TestingEnvBuilder::new().build().await; + let failed_region = env.failed_region(1).await; let state = DeactivateRegion::new(Peer::new(2, "")); let mailbox_receiver = state - .send_close_region_message(&context, &failed_region, Duration::from_millis(100)) + .send_close_region_message(&env.context, &failed_region, Duration::from_millis(100)) .await .unwrap(); let message_id = mailbox_receiver.message_id(); // verify that the close region message is sent - let rx = heartbeat_receivers + let rx = env + .heartbeat_receivers .get_mut(&failed_region.datanode_id) .unwrap(); let resp = rx.recv().await.unwrap().unwrap(); @@ -175,7 +173,7 @@ mod tests { ); // simulating response from Datanode - context + env.context .mailbox .on_recv( message_id, @@ -211,20 +209,18 @@ mod tests { async fn test_deactivate_region_timeout() { common_telemetry::init_default_ut_logging(); - let TestingEnv { - context, - failed_region, - mut heartbeat_receivers, - } = TestingEnvBuilder::new().build().await; + let mut env = TestingEnvBuilder::new().build().await; + let failed_region = env.failed_region(1).await; let state = DeactivateRegion::new(Peer::new(2, "")); let mailbox_receiver = state - .send_close_region_message(&context, &failed_region, Duration::from_millis(100)) + .send_close_region_message(&env.context, &failed_region, Duration::from_millis(100)) .await .unwrap(); // verify that the open region message is sent - let rx = heartbeat_receivers + let rx = env + .heartbeat_receivers .get_mut(&failed_region.datanode_id) .unwrap(); let resp = rx.recv().await.unwrap().unwrap(); diff --git a/src/meta-srv/src/procedure/region_failover/failover_start.rs b/src/meta-srv/src/procedure/region_failover/failover_start.rs index f9ecde0e29..ca2c00d05c 100644 --- a/src/meta-srv/src/procedure/region_failover/failover_start.rs +++ b/src/meta-srv/src/procedure/region_failover/failover_start.rs @@ -101,30 +101,27 @@ impl State for RegionFailoverStart { #[cfg(test)] mod tests { - use super::super::tests::{TestingEnv, TestingEnvBuilder}; + use super::super::tests::TestingEnvBuilder; use super::*; #[tokio::test] async fn test_choose_failover_candidate() { common_telemetry::init_default_ut_logging(); - let TestingEnv { - context, - failed_region, - heartbeat_receivers: _, - } = TestingEnvBuilder::new().build().await; + let env = TestingEnvBuilder::new().build().await; + let failed_region = env.failed_region(1).await; let mut state = RegionFailoverStart::new(); assert!(state.failover_candidate.is_none()); let candidate = state - .choose_candidate(&context, &failed_region) + .choose_candidate(&env.context, &failed_region) .await .unwrap(); assert_ne!(candidate.id, failed_region.datanode_id); let candidate_again = state - .choose_candidate(&context, &failed_region) + .choose_candidate(&env.context, &failed_region) .await .unwrap(); assert_eq!(candidate, candidate_again); diff --git a/src/meta-srv/src/procedure/region_failover/update_metadata.rs b/src/meta-srv/src/procedure/region_failover/update_metadata.rs index 42dce8b427..731f7df18d 100644 --- a/src/meta-srv/src/procedure/region_failover/update_metadata.rs +++ b/src/meta-srv/src/procedure/region_failover/update_metadata.rs @@ -29,6 +29,8 @@ use crate::error::{ TableRouteConversionSnafu, }; use crate::keys::TableRouteKey; +use crate::lock::keys::table_metadata_lock_key; +use crate::lock::Opts; use crate::table_routes; #[derive(Serialize, Deserialize, Debug)] @@ -41,13 +43,29 @@ impl UpdateRegionMetadata { Self { candidate } } - async fn update_meta( + // TODO(LFC): Update the two table metadata values in a batch atomically. + // + // Though the updating of the two metadata values is guarded by a distributed lock, + // it does not robust enough. For example, the lock lease could be expired in the middle of + // one's updating, letting others to start updating concurrently. For now, we set the lease of + // the distributed lock to 10 seconds, which is long enough here to get the job done. + // + // Maybe we should introduce "version" companion values to these two metadata values, and + // use ETCD transaction request to update them? + + /// Updates the metadata of the table. Specifically, the [TableGlobalValue] and [TableRouteValue]. + async fn update_metadata( &self, ctx: &RegionFailoverContext, failed_region: &RegionIdent, ) -> Result<()> { + let key = table_metadata_lock_key(failed_region); + let key = ctx.dist_lock.lock(key, Opts::default()).await?; + self.update_table_global_value(ctx, failed_region).await?; self.update_table_route(ctx, failed_region).await?; + + ctx.dist_lock.unlock(key).await?; Ok(()) } @@ -183,15 +201,17 @@ impl State for UpdateRegionMetadata { ctx: &RegionFailoverContext, failed_region: &RegionIdent, ) -> Result> { - self.update_meta(ctx, failed_region).await.map_err(|e| { - RetryLaterSnafu { - reason: format!( - "Failed to update metadata for failed region: {}, error: {}", - failed_region, e - ), - } - .build() - })?; + self.update_metadata(ctx, failed_region) + .await + .map_err(|e| { + RetryLaterSnafu { + reason: format!( + "Failed to update metadata for failed region: {}, error: {}", + failed_region, e + ), + } + .build() + })?; Ok(Box::new(RegionFailoverEnd)) } } @@ -209,12 +229,8 @@ mod tests { async fn test_update_table_global_value() { common_telemetry::init_default_ut_logging(); - async fn test(env: TestingEnv, candidate: u64) -> TableGlobalValue { - let TestingEnv { - context, - failed_region, - heartbeat_receivers: _, - } = env; + async fn test(env: TestingEnv, failed_region: u32, candidate: u64) -> TableGlobalValue { + let failed_region = env.failed_region(failed_region).await; let key = TableGlobalKey { catalog_name: failed_region.catalog.clone(), @@ -223,19 +239,19 @@ mod tests { }; let original = - table_routes::get_table_global_value(&context.selector_ctx.kv_store, &key) + table_routes::get_table_global_value(&env.context.selector_ctx.kv_store, &key) .await .unwrap() .unwrap(); let state = UpdateRegionMetadata::new(Peer::new(candidate, "")); state - .update_table_global_value(&context, &failed_region) + .update_table_global_value(&env.context, &failed_region) .await .unwrap(); let updated = - table_routes::get_table_global_value(&context.selector_ctx.kv_store, &key) + table_routes::get_table_global_value(&env.context.selector_ctx.kv_store, &key) .await .unwrap() .unwrap(); @@ -253,8 +269,8 @@ mod tests { // 3 => 4 // Testing failed region 1 moves to Datanode 2. - let env = TestingEnvBuilder::new().with_failed_region(1).build().await; - let updated = test(env, 2).await; + let env = TestingEnvBuilder::new().build().await; + let updated = test(env, 1, 2).await; let new_region_id_map = updated.regions_id_map; assert_eq!(new_region_id_map.len(), 3); @@ -263,8 +279,8 @@ mod tests { assert_eq!(new_region_id_map.get(&3), Some(&vec![4])); // Testing failed region 3 moves to Datanode 3. - let env = TestingEnvBuilder::new().with_failed_region(3).build().await; - let updated = test(env, 3).await; + let env = TestingEnvBuilder::new().build().await; + let updated = test(env, 3, 3).await; let new_region_id_map = updated.regions_id_map; assert_eq!(new_region_id_map.len(), 2); @@ -272,8 +288,8 @@ mod tests { assert_eq!(new_region_id_map.get(&3), Some(&vec![4, 3])); // Testing failed region 1 moves to a new Datanode, 4. - let env = TestingEnvBuilder::new().with_failed_region(1).build().await; - let updated = test(env, 4).await; + let env = TestingEnvBuilder::new().build().await; + let updated = test(env, 1, 4).await; let new_region_id_map = updated.regions_id_map; assert_eq!(new_region_id_map.len(), 4); @@ -283,8 +299,8 @@ mod tests { assert_eq!(new_region_id_map.get(&4), Some(&vec![1])); // Testing failed region 3 moves to a new Datanode, 4. - let env = TestingEnvBuilder::new().with_failed_region(3).build().await; - let updated = test(env, 4).await; + let env = TestingEnvBuilder::new().build().await; + let updated = test(env, 3, 4).await; let new_region_id_map = updated.regions_id_map; assert_eq!(new_region_id_map.len(), 3); @@ -297,16 +313,12 @@ mod tests { async fn test_update_table_route() { common_telemetry::init_default_ut_logging(); - async fn test(env: TestingEnv, candidate: u64) -> TableRouteValue { - let TestingEnv { - context, - failed_region, - heartbeat_receivers: _, - } = env; + async fn test(env: TestingEnv, failed_region: u32, candidate: u64) -> TableRouteValue { + let failed_region = env.failed_region(failed_region).await; let state = UpdateRegionMetadata::new(Peer::new(candidate, "")); state - .update_table_route(&context, &failed_region) + .update_table_route(&env.context, &failed_region) .await .unwrap(); @@ -316,7 +328,7 @@ mod tests { schema_name: &failed_region.schema, table_name: &failed_region.table, }; - table_routes::get_table_route_value(&context.selector_ctx.kv_store, &key) + table_routes::get_table_route_value(&env.context.selector_ctx.kv_store, &key) .await .unwrap() } @@ -329,8 +341,8 @@ mod tests { // 4 => 3 // Testing failed region 1 moves to Datanode 2. - let env = TestingEnvBuilder::new().with_failed_region(1).build().await; - let updated = test(env, 2).await; + let env = TestingEnvBuilder::new().build().await; + let updated = test(env, 1, 2).await; let actual = &updated.table_route.as_ref().unwrap().region_routes; // Expected region routes: @@ -341,17 +353,17 @@ mod tests { // 4 => 3 let peers = &updated.peers; assert_eq!(peers.len(), 3); - let expected = vec![ + let expected = &vec![ new_region_route(1, peers, 2), new_region_route(2, peers, 1), new_region_route(3, peers, 2), new_region_route(4, peers, 3), ]; - assert_eq!(actual, &expected); + assert_eq!(actual, expected); // Testing failed region 3 moves to Datanode 3. - let env = TestingEnvBuilder::new().with_failed_region(3).build().await; - let updated = test(env, 3).await; + let env = TestingEnvBuilder::new().build().await; + let updated = test(env, 3, 3).await; let actual = &updated.table_route.as_ref().unwrap().region_routes; // Expected region routes: @@ -362,17 +374,17 @@ mod tests { // 4 => 3 let peers = &updated.peers; assert_eq!(peers.len(), 2); - let expected = vec![ + let expected = &vec![ new_region_route(1, peers, 1), new_region_route(2, peers, 1), new_region_route(3, peers, 3), new_region_route(4, peers, 3), ]; - assert_eq!(actual, &expected); + assert_eq!(actual, expected); // Testing failed region 1 moves to a new Datanode, 4. - let env = TestingEnvBuilder::new().with_failed_region(1).build().await; - let updated = test(env, 4).await; + let env = TestingEnvBuilder::new().build().await; + let updated = test(env, 1, 4).await; let actual = &updated.table_route.as_ref().unwrap().region_routes; // Expected region routes: @@ -383,17 +395,17 @@ mod tests { // 4 => 3 let peers = &updated.peers; assert_eq!(peers.len(), 4); - let expected = vec![ + let expected = &vec![ new_region_route(1, peers, 4), new_region_route(2, peers, 1), new_region_route(3, peers, 2), new_region_route(4, peers, 3), ]; - assert_eq!(actual, &expected); + assert_eq!(actual, expected); // Testing failed region 3 moves to a new Datanode, 4. - let env = TestingEnvBuilder::new().with_failed_region(3).build().await; - let updated = test(env, 4).await; + let env = TestingEnvBuilder::new().build().await; + let updated = test(env, 3, 4).await; let actual = &updated.table_route.as_ref().unwrap().region_routes; // Expected region routes: @@ -404,12 +416,112 @@ mod tests { // 4 => 3 let peers = &updated.peers; assert_eq!(peers.len(), 3); - let expected = vec![ + let expected = &vec![ new_region_route(1, peers, 1), new_region_route(2, peers, 1), new_region_route(3, peers, 4), new_region_route(4, peers, 3), ]; - assert_eq!(actual, &expected); + assert_eq!(actual, expected); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_update_metadata_concurrently() { + common_telemetry::init_default_ut_logging(); + + // Test the correctness of concurrently updating the region distribution in table global + // value, and region routes in table route value. Region 1 moves to Datanode 2; region 2 + // moves to Datanode 3. + // + // Datanode => Regions + // Before: | After: + // 1 => 1, 2 | + // 2 => 3 | 2 => 3, 1 + // 3 => 4 | 3 => 4, 2 + // + // region number => leader node + // Before: | After: + // 1 => 1 | 1 => 2 + // 2 => 1 | 2 => 3 + // 3 => 2 | 3 => 2 + // 4 => 3 | 4 => 3 + // + // Test case runs 10 times to enlarge the possibility of concurrent updating. + for _ in 0..10 { + let env = TestingEnvBuilder::new().build().await; + + let ctx_1 = env.context.clone(); + let ctx_2 = env.context.clone(); + + let failed_region_1 = env.failed_region(1).await; + let failed_region_2 = env.failed_region(2).await; + + let catalog_name = failed_region_1.catalog.clone(); + let schema_name = failed_region_1.schema.clone(); + let table_name = failed_region_1.table.clone(); + let table_id = failed_region_1.table_id as u64; + + futures::future::join_all(vec![ + tokio::spawn(async move { + let state = UpdateRegionMetadata::new(Peer::new(2, "")); + state + .update_metadata(&ctx_1, &failed_region_1) + .await + .unwrap(); + }), + tokio::spawn(async move { + let state = UpdateRegionMetadata::new(Peer::new(3, "")); + state + .update_metadata(&ctx_2, &failed_region_2) + .await + .unwrap(); + }), + ]) + .await; + + let table_route_key = TableRouteKey { + table_id, + catalog_name: &catalog_name, + schema_name: &schema_name, + table_name: &table_name, + }; + let table_route_value = table_routes::get_table_route_value( + &env.context.selector_ctx.kv_store, + &table_route_key, + ) + .await + .unwrap(); + let peers = &table_route_value.peers; + let actual = &table_route_value + .table_route + .as_ref() + .unwrap() + .region_routes; + let expected = &vec![ + new_region_route(1, peers, 2), + new_region_route(2, peers, 3), + new_region_route(3, peers, 2), + new_region_route(4, peers, 3), + ]; + assert_eq!(peers.len(), 2); + assert_eq!(actual, expected); + + let table_global_key = TableGlobalKey { + catalog_name, + schema_name, + table_name, + }; + let table_global_value = table_routes::get_table_global_value( + &env.context.selector_ctx.kv_store, + &table_global_key, + ) + .await + .unwrap() + .unwrap(); + let map = table_global_value.regions_id_map; + assert_eq!(map.len(), 2); + assert_eq!(map.get(&2), Some(&vec![3, 1])); + assert_eq!(map.get(&3), Some(&vec![4, 2])); + } } } diff --git a/src/meta-srv/src/service/lock.rs b/src/meta-srv/src/service/lock.rs index 3be36152cf..81f218027e 100644 --- a/src/meta-srv/src/service/lock.rs +++ b/src/meta-srv/src/service/lock.rs @@ -13,11 +13,9 @@ // limitations under the License. use api::v1::meta::{lock_server, LockRequest, LockResponse, UnlockRequest, UnlockResponse}; -use snafu::OptionExt; use tonic::{Request, Response}; use super::GrpcResult; -use crate::error; use crate::lock::Opts; use crate::metasrv::MetaSrv; @@ -29,8 +27,7 @@ impl lock_server::Lock for MetaSrv { } = request.into_inner(); let expire_secs = Some(expire_secs as u64); - let lock = self.lock().context(error::LockNotConfigSnafu)?; - let key = lock.lock(name, Opts { expire_secs }).await?; + let key = self.lock().lock(name, Opts { expire_secs }).await?; let resp = LockResponse { key, @@ -43,8 +40,7 @@ impl lock_server::Lock for MetaSrv { async fn unlock(&self, request: Request) -> GrpcResult { let UnlockRequest { key, .. } = request.into_inner(); - let lock = self.lock().context(error::LockNotConfigSnafu)?; - let _ = lock.unlock(key).await?; + let _ = self.lock().unlock(key).await?; let resp = UnlockResponse { ..Default::default() diff --git a/src/meta-srv/src/test_util.rs b/src/meta-srv/src/test_util.rs index c3a3a575d2..06737e3528 100644 --- a/src/meta-srv/src/test_util.rs +++ b/src/meta-srv/src/test_util.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use common_procedure::local::{LocalManager, ManagerConfig}; use crate::handler::{HeartbeatMailbox, Pushers}; +use crate::lock::memory::MemLock; use crate::metasrv::SelectorContext; use crate::procedure::region_failover::RegionFailoverManager; use crate::procedure::state_store::MetaStateStore; @@ -48,5 +49,6 @@ pub(crate) fn create_region_failover_manager() -> Arc { procedure_manager, selector, selector_ctx, + Arc::new(MemLock::default()), )) } diff --git a/tests-integration/tests/region_failover.rs b/tests-integration/tests/region_failover.rs index 706fdfd1a6..83af81f2c2 100644 --- a/tests-integration/tests/region_failover.rs +++ b/tests-integration/tests/region_failover.rs @@ -134,6 +134,7 @@ async fn run_region_failover_procedure(cluster: &GreptimeDbCluster, failed_regio catalog: None, schema: None, }, + dist_lock: meta_srv.lock().clone(), }, ); let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));