From 856c0280f53d3d70ee6f5b0e71568f549f6a5479 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Sat, 12 Oct 2024 17:04:22 +0800 Subject: [PATCH] feat: remove the distributed lock (#4825) * feat: remove the distributed lock as we do not need it any more * chore: delete todo comment * chore: remove unused error --- src/meta-client/examples/lock.rs | 129 -------------------- src/meta-client/src/client.rs | 35 ------ src/meta-client/src/client/lock.rs | 178 ---------------------------- src/meta-srv/src/bootstrap.rs | 22 +--- src/meta-srv/src/error.rs | 27 ----- src/meta-srv/src/lib.rs | 1 - src/meta-srv/src/lock.rs | 99 ---------------- src/meta-srv/src/lock/etcd.rs | 93 --------------- src/meta-srv/src/lock/memory.rs | 112 ----------------- src/meta-srv/src/metasrv.rs | 6 - src/meta-srv/src/metasrv/builder.rs | 12 -- src/meta-srv/src/service.rs | 1 - src/meta-srv/src/service/lock.rs | 51 -------- 13 files changed, 4 insertions(+), 762 deletions(-) delete mode 100644 src/meta-client/examples/lock.rs delete mode 100644 src/meta-client/src/client/lock.rs delete mode 100644 src/meta-srv/src/lock.rs delete mode 100644 src/meta-srv/src/lock/etcd.rs delete mode 100644 src/meta-srv/src/lock/memory.rs delete mode 100644 src/meta-srv/src/service/lock.rs diff --git a/src/meta-client/examples/lock.rs b/src/meta-client/examples/lock.rs deleted file mode 100644 index c8a8b61d60..0000000000 --- a/src/meta-client/examples/lock.rs +++ /dev/null @@ -1,129 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; -use std::time::Duration; - -use api::v1::meta::Role; -use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; -use common_meta::rpc::lock::{LockRequest, UnlockRequest}; -use meta_client::client::MetaClientBuilder; -use meta_client::MetaClientRef; -use tracing::{info, subscriber}; -use tracing_subscriber::FmtSubscriber; - -fn main() { - subscriber::set_global_default(FmtSubscriber::builder().finish()).unwrap(); - run(); -} - -#[tokio::main] -async fn run() { - let id = (1000u64, 2000u64); - let config = ChannelConfig::new() - .timeout(Duration::from_secs(30)) - .connect_timeout(Duration::from_secs(5)) - .tcp_nodelay(true); - let channel_manager = ChannelManager::with_config(config); - let mut meta_client = MetaClientBuilder::new(id.0, id.1, Role::Datanode) - .enable_lock() - .channel_manager(channel_manager) - .build(); - meta_client.start(&["127.0.0.1:3002"]).await.unwrap(); - let meta_client = Arc::new(meta_client); - - run_normal(meta_client.clone()).await; - - run_multi_thread(meta_client.clone()).await; - - run_multi_thread_with_one_timeout(meta_client).await; -} - -async fn run_normal(meta_client: MetaClientRef) { - let name = "lock_name".as_bytes().to_vec(); - let expire_secs = 60; - - let lock_req = LockRequest { name, expire_secs }; - - let lock_result = meta_client.lock(lock_req).await.unwrap(); - let key = lock_result.key; - info!( - "lock success! Returned key: {}", - String::from_utf8(key.clone()).unwrap() - ); - - // It is recommended that time of holding lock is less than the timeout of the grpc channel - info!("do some work, take 3 seconds"); - tokio::time::sleep(Duration::from_secs(3)).await; - - let unlock_req = UnlockRequest { key }; - - meta_client.unlock(unlock_req).await.unwrap(); - info!("unlock success!"); -} - -async fn run_multi_thread(meta_client: MetaClientRef) { - let meta_client_clone = meta_client.clone(); - let join1 = tokio::spawn(async move { - run_normal(meta_client_clone.clone()).await; - }); - - tokio::time::sleep(Duration::from_secs(1)).await; - - let join2 = tokio::spawn(async move { - run_normal(meta_client).await; - }); - - join1.await.unwrap(); - join2.await.unwrap(); -} - -async fn run_multi_thread_with_one_timeout(meta_client: MetaClientRef) { - let meta_client_clone = meta_client.clone(); - let join1 = tokio::spawn(async move { - run_with_timeout(meta_client_clone.clone()).await; - }); - - tokio::time::sleep(Duration::from_secs(1)).await; - - let join2 = tokio::spawn(async move { - run_normal(meta_client).await; - }); - - join1.await.unwrap(); - join2.await.unwrap(); -} - -async fn run_with_timeout(meta_client: MetaClientRef) { - let name = "lock_name".as_bytes().to_vec(); - let expire_secs = 5; - - let lock_req = LockRequest { name, expire_secs }; - - let lock_result = meta_client.lock(lock_req).await.unwrap(); - let key = lock_result.key; - info!( - "lock success! Returned key: {}", - String::from_utf8(key.clone()).unwrap() - ); - - // It is recommended that time of holding lock is less than the timeout of the grpc channel - info!("do some work, take 20 seconds"); - tokio::time::sleep(Duration::from_secs(20)).await; - - let unlock_req = UnlockRequest { key }; - - meta_client.unlock(unlock_req).await.unwrap(); - info!("unlock success!"); -} diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 1c5b96a684..ed6fdf13fb 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -15,7 +15,6 @@ mod ask_leader; mod heartbeat; mod load_balance; -mod lock; mod procedure; mod cluster; @@ -33,7 +32,6 @@ use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, RegionStat}; use common_meta::ddl::{ExecutorContext, ProcedureExecutor}; use common_meta::error::{self as meta_error, Result as MetaResult}; use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse}; -use common_meta::rpc::lock::{LockRequest, LockResponse, UnlockRequest}; use common_meta::rpc::procedure::{ MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse, }; @@ -45,7 +43,6 @@ use common_meta::rpc::store::{ use common_meta::ClusterId; use common_telemetry::info; use heartbeat::Client as HeartbeatClient; -use lock::Client as LockClient; use procedure::Client as ProcedureClient; use snafu::{OptionExt, ResultExt}; use store::Client as StoreClient; @@ -67,7 +64,6 @@ pub struct MetaClientBuilder { role: Role, enable_heartbeat: bool, enable_store: bool, - enable_lock: bool, enable_procedure: bool, enable_access_cluster_info: bool, channel_manager: Option, @@ -123,13 +119,6 @@ impl MetaClientBuilder { } } - pub fn enable_lock(self) -> Self { - Self { - enable_lock: true, - ..self - } - } - pub fn enable_procedure(self) -> Self { Self { enable_procedure: true, @@ -188,10 +177,6 @@ impl MetaClientBuilder { client.store = Some(StoreClient::new(self.id, self.role, mgr.clone())); } - if self.enable_lock { - client.lock = Some(LockClient::new(self.id, self.role, mgr.clone())); - } - if self.enable_procedure { let mgr = self.ddl_channel_manager.unwrap_or(mgr.clone()); client.procedure = Some(ProcedureClient::new( @@ -221,7 +206,6 @@ pub struct MetaClient { channel_manager: ChannelManager, heartbeat: Option, store: Option, - lock: Option, procedure: Option, cluster: Option, } @@ -383,10 +367,6 @@ impl MetaClient { client.start(urls.clone()).await?; info!("Store client started"); } - if let Some(client) = &mut self.lock { - client.start(urls.clone()).await?; - info!("Lock client started"); - } if let Some(client) = &mut self.procedure { client.start(urls.clone()).await?; info!("DDL client started"); @@ -482,15 +462,6 @@ impl MetaClient { .context(ConvertMetaResponseSnafu) } - pub async fn lock(&self, req: LockRequest) -> Result { - self.lock_client()?.lock(req.into()).await.map(Into::into) - } - - pub async fn unlock(&self, req: UnlockRequest) -> Result<()> { - let _ = self.lock_client()?.unlock(req.into()).await?; - Ok(()) - } - /// Query the procedure state by its id. pub async fn query_procedure_state(&self, pid: &str) -> Result { self.procedure_client()?.query_procedure_state(pid).await @@ -538,12 +509,6 @@ impl MetaClient { }) } - pub fn lock_client(&self) -> Result { - self.lock.clone().context(NotStartedSnafu { - name: "lock_client", - }) - } - pub fn procedure_client(&self) -> Result { self.procedure.clone().context(NotStartedSnafu { name: "procedure_client", diff --git a/src/meta-client/src/client/lock.rs b/src/meta-client/src/client/lock.rs deleted file mode 100644 index 66fe077c22..0000000000 --- a/src/meta-client/src/client/lock.rs +++ /dev/null @@ -1,178 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashSet; -use std::sync::Arc; - -use api::v1::meta::lock_client::LockClient; -use api::v1::meta::{LockRequest, LockResponse, Role, UnlockRequest, UnlockResponse}; -use common_grpc::channel_manager::ChannelManager; -use common_telemetry::tracing_context::TracingContext; -use snafu::{ensure, OptionExt, ResultExt}; -use tokio::sync::RwLock; -use tonic::transport::Channel; - -use crate::client::{load_balance, Id}; -use crate::error; -use crate::error::Result; - -#[derive(Clone, Debug)] -pub struct Client { - inner: Arc>, -} - -impl Client { - pub fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self { - let inner = Arc::new(RwLock::new(Inner { - id, - role, - channel_manager, - peers: vec![], - })); - - Self { inner } - } - - pub async fn start(&mut self, urls: A) -> Result<()> - where - U: AsRef, - A: AsRef<[U]>, - { - let mut inner = self.inner.write().await; - inner.start(urls).await - } - - pub async fn lock(&self, req: LockRequest) -> Result { - let inner = self.inner.read().await; - inner.lock(req).await - } - - pub async fn unlock(&self, req: UnlockRequest) -> Result { - let inner = self.inner.read().await; - inner.unlock(req).await - } -} - -#[derive(Debug)] -struct Inner { - id: Id, - role: Role, - channel_manager: ChannelManager, - peers: Vec, -} - -impl Inner { - async fn start(&mut self, urls: A) -> Result<()> - where - U: AsRef, - A: AsRef<[U]>, - { - ensure!( - !self.is_started(), - error::IllegalGrpcClientStateSnafu { - err_msg: "Lock client already started", - } - ); - - self.peers = urls - .as_ref() - .iter() - .map(|url| url.as_ref().to_string()) - .collect::>() - .drain() - .collect::>(); - - Ok(()) - } - - fn random_client(&self) -> Result> { - let len = self.peers.len(); - let peer = load_balance::random_get(len, |i| Some(&self.peers[i])).context( - error::IllegalGrpcClientStateSnafu { - err_msg: "Empty peers, lock client may not start yet", - }, - )?; - - self.make_client(peer) - } - - fn make_client(&self, addr: impl AsRef) -> Result> { - let channel = self - .channel_manager - .get(addr) - .context(error::CreateChannelSnafu)?; - - Ok(LockClient::new(channel)) - } - - #[inline] - fn is_started(&self) -> bool { - !self.peers.is_empty() - } - - async fn lock(&self, mut req: LockRequest) -> Result { - let mut client = self.random_client()?; - req.set_header( - self.id, - self.role, - TracingContext::from_current_span().to_w3c(), - ); - let res = client.lock(req).await.map_err(error::Error::from)?; - - Ok(res.into_inner()) - } - - async fn unlock(&self, mut req: UnlockRequest) -> Result { - let mut client = self.random_client()?; - req.set_header( - self.id, - self.role, - TracingContext::from_current_span().to_w3c(), - ); - let res = client.unlock(req).await.map_err(error::Error::from)?; - - Ok(res.into_inner()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[tokio::test] - async fn test_already_start() { - let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default()); - client - .start(&["127.0.0.1:1000", "127.0.0.1:1001"]) - .await - .unwrap(); - let res = client.start(&["127.0.0.1:1002"]).await; - assert!(res.is_err()); - assert!(matches!( - res.err(), - Some(error::Error::IllegalGrpcClientState { .. }) - )); - } - - #[tokio::test] - async fn test_start_with_duplicate_peers() { - let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default()); - client - .start(&["127.0.0.1:1000", "127.0.0.1:1000", "127.0.0.1:1000"]) - .await - .unwrap(); - - assert_eq!(1, client.inner.write().await.peers.len()); - } -} diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index dcfac253f3..d10b22c610 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -16,7 +16,6 @@ use std::sync::Arc; use api::v1::meta::cluster_server::ClusterServer; use api::v1::meta::heartbeat_server::HeartbeatServer; -use api::v1::meta::lock_server::LockServer; use api::v1::meta::procedure_service_server::ProcedureServiceServer; use api::v1::meta::store_server::StoreServer; use common_base::Plugins; @@ -48,8 +47,6 @@ use crate::election::etcd::EtcdElection; #[cfg(feature = "pg_kvbackend")] use crate::error::InvalidArgumentsSnafu; use crate::error::{InitExportMetricsTaskSnafu, TomlFormatSnafu}; -use crate::lock::etcd::EtcdLock; -use crate::lock::memory::MemLock; use crate::metasrv::builder::MetasrvBuilder; use crate::metasrv::{BackendImpl, Metasrv, MetasrvOptions, SelectorRef}; use crate::selector::lease_based::LeaseBasedSelector; @@ -183,7 +180,6 @@ pub fn router(metasrv: Metasrv) -> Router { .add_service(HeartbeatServer::new(metasrv.clone())) .add_service(StoreServer::new(metasrv.clone())) .add_service(ClusterServer::new(metasrv.clone())) - .add_service(LockServer::new(metasrv.clone())) .add_service(ProcedureServiceServer::new(metasrv.clone())) .add_service(admin::make_admin_service(metasrv)) } @@ -193,13 +189,9 @@ pub async fn metasrv_builder( plugins: Plugins, kv_backend: Option, ) -> Result { - let (kv_backend, election, lock) = match (kv_backend, &opts.backend) { - (Some(kv_backend), _) => (kv_backend, None, Some(Arc::new(MemLock::default()) as _)), - (None, BackendImpl::MemoryStore) => ( - Arc::new(MemoryKvBackend::new()) as _, - None, - Some(Arc::new(MemLock::default()) as _), - ), + let (kv_backend, election) = match (kv_backend, &opts.backend) { + (Some(kv_backend), _) => (kv_backend, None), + (None, BackendImpl::MemoryStore) => (Arc::new(MemoryKvBackend::new()) as _, None), (None, BackendImpl::EtcdStore) => { let etcd_client = create_etcd_client(opts).await?; let kv_backend = { @@ -224,18 +216,13 @@ pub async fn metasrv_builder( ) .await?, ), - Some(EtcdLock::with_etcd_client( - etcd_client, - opts.store_key_prefix.clone(), - )?), ) } #[cfg(feature = "pg_kvbackend")] (None, BackendImpl::PostgresStore) => { let pg_client = create_postgres_client(opts).await?; let kv_backend = PgStore::with_pg_client(pg_client).await.unwrap(); - // TODO: implement locking and leader election for pg backend. - (kv_backend, None, None) + (kv_backend, None) } }; @@ -253,7 +240,6 @@ pub async fn metasrv_builder( .in_memory(in_memory) .selector(selector) .election(election) - .lock(lock) .plugins(plugins)) } diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 4c3d974c0a..6bcac5db2a 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -448,30 +448,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to lock based on etcd"))] - Lock { - #[snafu(source)] - error: etcd_client::Error, - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("Failed to unlock based on etcd"))] - Unlock { - #[snafu(source)] - error: etcd_client::Error, - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("Failed to grant lease"))] - LeaseGrant { - #[snafu(source)] - error: etcd_client::Error, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Invalid utf-8 value"))] InvalidUtf8Value { #[snafu(source)] @@ -770,9 +746,6 @@ impl ErrorExt for Error { | Error::ResponseHeaderNotFound { .. } | Error::IsNotLeader { .. } | Error::InvalidHttpBody { .. } - | Error::Lock { .. } - | Error::Unlock { .. } - | Error::LeaseGrant { .. } | Error::ExceededRetryLimit { .. } | Error::SendShutdownSignal { .. } | Error::PusherNotFound { .. } diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index 01b48f1da0..a438f4255f 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -28,7 +28,6 @@ pub mod flow_meta_alloc; pub mod handler; pub mod key; pub mod lease; -pub mod lock; pub mod metasrv; mod metrics; #[cfg(feature = "mock")] diff --git a/src/meta-srv/src/lock.rs b/src/meta-srv/src/lock.rs deleted file mode 100644 index 53451591da..0000000000 --- a/src/meta-srv/src/lock.rs +++ /dev/null @@ -1,99 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -pub mod etcd; -pub(crate) mod memory; - -use std::sync::Arc; - -use common_telemetry::error; - -use crate::error::Result; - -pub type Key = Vec; - -pub const DEFAULT_EXPIRE_TIME_SECS: u64 = 10; - -pub struct Opts { - // If the expiration time is exceeded and currently holds the lock, the lock is - // automatically released. - pub expire_secs: Option, -} - -impl Default for Opts { - fn default() -> Self { - Opts { - expire_secs: Some(DEFAULT_EXPIRE_TIME_SECS), - } - } -} - -#[async_trait::async_trait] -pub trait DistLock: Send + Sync { - // Lock acquires a distributed shared lock on a given named lock. On success, it - // will return a unique key that exists so long as the lock is held by the caller. - async fn lock(&self, name: Vec, opts: Opts) -> Result; - - // Unlock takes a key returned by Lock and releases the hold on lock. - async fn unlock(&self, key: Vec) -> Result<()>; -} - -pub type DistLockRef = Arc; - -pub struct DistLockGuard<'a> { - lock: &'a DistLockRef, - name: Vec, - key: Option, -} - -impl<'a> DistLockGuard<'a> { - pub fn new(lock: &'a DistLockRef, name: Vec) -> Self { - Self { - lock, - name, - key: None, - } - } - - pub async fn lock(&mut self) -> Result<()> { - if self.key.is_some() { - return Ok(()); - } - let key = self - .lock - .lock( - self.name.clone(), - Opts { - expire_secs: Some(2), - }, - ) - .await?; - self.key = Some(key); - Ok(()) - } -} - -impl Drop for DistLockGuard<'_> { - fn drop(&mut self) { - if let Some(key) = self.key.take() { - let lock = self.lock.clone(); - let name = self.name.clone(); - let _handle = common_runtime::spawn_global(async move { - if let Err(e) = lock.unlock(key).await { - error!(e; "Failed to unlock '{}'", String::from_utf8_lossy(&name)); - } - }); - } - } -} diff --git a/src/meta-srv/src/lock/etcd.rs b/src/meta-srv/src/lock/etcd.rs deleted file mode 100644 index 3f53b40e15..0000000000 --- a/src/meta-srv/src/lock/etcd.rs +++ /dev/null @@ -1,93 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use etcd_client::{Client, LockOptions}; -use snafu::ResultExt; - -use super::{DistLock, DistLockRef, Opts, DEFAULT_EXPIRE_TIME_SECS}; -use crate::error; -use crate::error::Result; - -/// A implementation of distributed lock based on etcd. The Clone of EtcdLock is cheap. -#[derive(Clone)] -pub struct EtcdLock { - client: Client, - store_key_prefix: String, -} - -impl EtcdLock { - pub async fn with_endpoints(endpoints: S, store_key_prefix: String) -> Result - where - E: AsRef, - S: AsRef<[E]>, - { - let client = Client::connect(endpoints, None) - .await - .context(error::ConnectEtcdSnafu)?; - - Self::with_etcd_client(client, store_key_prefix) - } - - pub fn with_etcd_client(client: Client, store_key_prefix: String) -> Result { - Ok(Arc::new(EtcdLock { - client, - store_key_prefix, - })) - } - - fn lock_key(&self, key: Vec) -> Vec { - if self.store_key_prefix.is_empty() { - key - } else { - let mut prefix = self.store_key_prefix.as_bytes().to_vec(); - prefix.extend_from_slice(&key); - prefix - } - } -} - -#[async_trait::async_trait] -impl DistLock for EtcdLock { - async fn lock(&self, key: Vec, opts: Opts) -> Result> { - let expire = opts.expire_secs.unwrap_or(DEFAULT_EXPIRE_TIME_SECS) as i64; - - let mut client = self.client.clone(); - - let resp = client - .lease_grant(expire, None) - .await - .context(error::LeaseGrantSnafu)?; - - let lease_id = resp.id(); - let lock_opts = LockOptions::new().with_lease(lease_id); - - let resp = client - .lock(self.lock_key(key), Some(lock_opts)) - .await - .context(error::LockSnafu)?; - - Ok(resp.key().to_vec()) - } - - async fn unlock(&self, key: Vec) -> Result<()> { - let mut client = self.client.clone(); - let _ = client - .unlock(self.lock_key(key)) - .await - .context(error::UnlockSnafu)?; - Ok(()) - } -} diff --git a/src/meta-srv/src/lock/memory.rs b/src/meta-srv/src/lock/memory.rs deleted file mode 100644 index d312700d12..0000000000 --- a/src/meta-srv/src/lock/memory.rs +++ /dev/null @@ -1,112 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use async_trait::async_trait; -use dashmap::DashMap; -use tokio::sync::{Mutex, OwnedMutexGuard}; - -use crate::error::Result; -use crate::lock::{DistLock, Key, Opts}; - -#[derive(Default)] -pub(crate) struct MemLock { - mutexes: DashMap>>, - guards: DashMap>, -} - -#[async_trait] -impl DistLock for MemLock { - async fn lock(&self, key: Vec, _opts: Opts) -> Result { - let mutex = self - .mutexes - .entry(key.clone()) - .or_insert_with(|| Arc::new(Mutex::new(()))) - .clone(); - - let guard = mutex.lock_owned().await; - - let _ = self.guards.insert(key.clone(), guard); - Ok(key) - } - - async fn unlock(&self, key: Vec) -> Result<()> { - // drop the guard, so that the mutex can be unlocked, - // effectively make the `mutex.lock_owned` in `lock` method to proceed - let _ = self.guards.remove(&key); - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use std::collections::HashMap; - use std::sync::atomic::{AtomicU32, Ordering}; - - use rand::seq::SliceRandom; - - use super::*; - - #[tokio::test(flavor = "multi_thread")] - async fn test_mem_lock_concurrently() { - let lock = Arc::new(MemLock::default()); - - let keys = (0..10) - .map(|i| format!("my-lock-{i}").into_bytes()) - .collect::>(); - let counters: [(Key, AtomicU32); 10] = keys - .iter() - .map(|x| (x.clone(), AtomicU32::new(0))) - .collect::>() - .try_into() - .unwrap(); - let counters = Arc::new(HashMap::from(counters)); - - let tasks = (0..100) - .map(|_| { - let mut keys = keys.clone(); - keys.shuffle(&mut rand::thread_rng()); - - let lock_clone = lock.clone(); - let counters_clone = counters.clone(); - tokio::spawn(async move { - // every key counter will be added by 1 for 10 times - for i in 0..100 { - let key = &keys[i % keys.len()]; - assert!(lock_clone - .lock(key.clone(), Opts { expire_secs: None }) - .await - .is_ok()); - - // Intentionally create a critical section: - // if our MemLock is flawed, the resulting counter is wrong. - // - // Note that AtomicU32 is only used to enable the updates from multiple tasks, - // does not make any guarantee about the correctness of the result. - - let counter = counters_clone.get(key).unwrap(); - let v = counter.load(Ordering::Relaxed); - counter.store(v + 1, Ordering::Relaxed); - - lock_clone.unlock(key.clone()).await.unwrap(); - } - }) - }) - .collect::>(); - let _ = futures::future::join_all(tasks).await; - - assert!(counters.values().all(|x| x.load(Ordering::Relaxed) == 1000)); - } -} diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 3cdb64e1e0..c43eec60d4 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -57,7 +57,6 @@ use crate::error::{ use crate::failure_detector::PhiAccrualFailureDetectorOptions; use crate::handler::HeartbeatHandlerGroupRef; use crate::lease::lookup_datanode_peer; -use crate::lock::DistLockRef; use crate::procedure::region_migration::manager::RegionMigrationManagerRef; use crate::procedure::ProcedureManagerListenerAdapter; use crate::pubsub::{PublisherRef, SubscriptionManagerRef}; @@ -356,7 +355,6 @@ pub struct Metasrv { flow_selector: SelectorRef, handler_group: HeartbeatHandlerGroupRef, election: Option, - lock: DistLockRef, procedure_manager: ProcedureManagerRef, mailbox: MailboxRef, procedure_executor: ProcedureExecutorRef, @@ -568,10 +566,6 @@ impl Metasrv { self.election.as_ref() } - pub fn lock(&self) -> &DistLockRef { - &self.lock - } - pub fn mailbox(&self) -> &MailboxRef { &self.mailbox } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 050f825174..db01b3ec9d 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -52,8 +52,6 @@ use crate::handler::{ HeartbeatHandlerGroup, HeartbeatHandlerGroupBuilder, HeartbeatMailbox, Pushers, }; use crate::lease::MetaPeerLookupService; -use crate::lock::memory::MemLock; -use crate::lock::DistLockRef; use crate::metasrv::{ ElectionRef, Metasrv, MetasrvInfo, MetasrvOptions, SelectorContext, SelectorRef, TABLE_ID_SEQ, }; @@ -79,7 +77,6 @@ pub struct MetasrvBuilder { handler_group: Option, election: Option, meta_peer_client: Option, - lock: Option, node_manager: Option, plugins: Option, table_metadata_allocator: Option, @@ -95,7 +92,6 @@ impl MetasrvBuilder { meta_peer_client: None, election: None, options: None, - lock: None, node_manager: None, plugins: None, table_metadata_allocator: None, @@ -137,11 +133,6 @@ impl MetasrvBuilder { self } - pub fn lock(mut self, lock: Option) -> Self { - self.lock = lock; - self - } - pub fn node_manager(mut self, node_manager: NodeManagerRef) -> Self { self.node_manager = Some(node_manager); self @@ -171,7 +162,6 @@ impl MetasrvBuilder { in_memory, selector, handler_group, - lock, node_manager, plugins, table_metadata_allocator, @@ -205,7 +195,6 @@ impl MetasrvBuilder { let flow_metadata_manager = Arc::new(FlowMetadataManager::new( leader_cached_kv_backend.clone() as _, )); - let lock = lock.unwrap_or_else(|| Arc::new(MemLock::default())); let selector_ctx = SelectorContext { server_addr: options.server_addr.clone(), datanode_lease_secs: distributed_time_constants::DATANODE_LEASE_SECS, @@ -384,7 +373,6 @@ impl MetasrvBuilder { flow_selector: Arc::new(RoundRobinSelector::new(SelectTarget::Flownode)), handler_group: Arc::new(handler_group), election, - lock, procedure_manager, mailbox, procedure_executor: ddl_manager, diff --git a/src/meta-srv/src/service.rs b/src/meta-srv/src/service.rs index c3e2e781e8..e260b8b980 100644 --- a/src/meta-srv/src/service.rs +++ b/src/meta-srv/src/service.rs @@ -20,7 +20,6 @@ use tonic::{Response, Status}; pub mod admin; pub mod cluster; mod heartbeat; -pub mod lock; pub mod mailbox; pub mod procedure; pub mod store; diff --git a/src/meta-srv/src/service/lock.rs b/src/meta-srv/src/service/lock.rs deleted file mode 100644 index 4334bdfc37..0000000000 --- a/src/meta-srv/src/service/lock.rs +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use api::v1::meta::{lock_server, LockRequest, LockResponse, UnlockRequest, UnlockResponse}; -use tonic::{Request, Response}; - -use super::GrpcResult; -use crate::lock::Opts; -use crate::metasrv::Metasrv; - -#[async_trait::async_trait] -impl lock_server::Lock for Metasrv { - async fn lock(&self, request: Request) -> GrpcResult { - let LockRequest { - name, expire_secs, .. - } = request.into_inner(); - let expire_secs = Some(expire_secs as u64); - - let key = self.lock().lock(name, Opts { expire_secs }).await?; - - let resp = LockResponse { - key, - ..Default::default() - }; - - Ok(Response::new(resp)) - } - - async fn unlock(&self, request: Request) -> GrpcResult { - let UnlockRequest { key, .. } = request.into_inner(); - - let _ = self.lock().unlock(key).await?; - - let resp = UnlockResponse { - ..Default::default() - }; - - Ok(Response::new(resp)) - } -}