Compare commits

..

1 Commits

Author SHA1 Message Date
jeremyhi
856c0280f5 feat: remove the distributed lock (#4825)
* feat: remove the distributed lock as we do not need it any more

* chore: delete todo comment

* chore: remove unused error
2024-10-12 09:04:22 +00:00
17 changed files with 22 additions and 867 deletions

View File

@@ -1,129 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::Role;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::rpc::lock::{LockRequest, UnlockRequest};
use meta_client::client::MetaClientBuilder;
use meta_client::MetaClientRef;
use tracing::{info, subscriber};
use tracing_subscriber::FmtSubscriber;
fn main() {
subscriber::set_global_default(FmtSubscriber::builder().finish()).unwrap();
run();
}
#[tokio::main]
async fn run() {
let id = (1000u64, 2000u64);
let config = ChannelConfig::new()
.timeout(Duration::from_secs(30))
.connect_timeout(Duration::from_secs(5))
.tcp_nodelay(true);
let channel_manager = ChannelManager::with_config(config);
let mut meta_client = MetaClientBuilder::new(id.0, id.1, Role::Datanode)
.enable_lock()
.channel_manager(channel_manager)
.build();
meta_client.start(&["127.0.0.1:3002"]).await.unwrap();
let meta_client = Arc::new(meta_client);
run_normal(meta_client.clone()).await;
run_multi_thread(meta_client.clone()).await;
run_multi_thread_with_one_timeout(meta_client).await;
}
async fn run_normal(meta_client: MetaClientRef) {
let name = "lock_name".as_bytes().to_vec();
let expire_secs = 60;
let lock_req = LockRequest { name, expire_secs };
let lock_result = meta_client.lock(lock_req).await.unwrap();
let key = lock_result.key;
info!(
"lock success! Returned key: {}",
String::from_utf8(key.clone()).unwrap()
);
// It is recommended that time of holding lock is less than the timeout of the grpc channel
info!("do some work, take 3 seconds");
tokio::time::sleep(Duration::from_secs(3)).await;
let unlock_req = UnlockRequest { key };
meta_client.unlock(unlock_req).await.unwrap();
info!("unlock success!");
}
async fn run_multi_thread(meta_client: MetaClientRef) {
let meta_client_clone = meta_client.clone();
let join1 = tokio::spawn(async move {
run_normal(meta_client_clone.clone()).await;
});
tokio::time::sleep(Duration::from_secs(1)).await;
let join2 = tokio::spawn(async move {
run_normal(meta_client).await;
});
join1.await.unwrap();
join2.await.unwrap();
}
async fn run_multi_thread_with_one_timeout(meta_client: MetaClientRef) {
let meta_client_clone = meta_client.clone();
let join1 = tokio::spawn(async move {
run_with_timeout(meta_client_clone.clone()).await;
});
tokio::time::sleep(Duration::from_secs(1)).await;
let join2 = tokio::spawn(async move {
run_normal(meta_client).await;
});
join1.await.unwrap();
join2.await.unwrap();
}
async fn run_with_timeout(meta_client: MetaClientRef) {
let name = "lock_name".as_bytes().to_vec();
let expire_secs = 5;
let lock_req = LockRequest { name, expire_secs };
let lock_result = meta_client.lock(lock_req).await.unwrap();
let key = lock_result.key;
info!(
"lock success! Returned key: {}",
String::from_utf8(key.clone()).unwrap()
);
// It is recommended that time of holding lock is less than the timeout of the grpc channel
info!("do some work, take 20 seconds");
tokio::time::sleep(Duration::from_secs(20)).await;
let unlock_req = UnlockRequest { key };
meta_client.unlock(unlock_req).await.unwrap();
info!("unlock success!");
}

View File

@@ -15,7 +15,6 @@
mod ask_leader;
mod heartbeat;
mod load_balance;
mod lock;
mod procedure;
mod cluster;
@@ -33,7 +32,6 @@ use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, RegionStat};
use common_meta::ddl::{ExecutorContext, ProcedureExecutor};
use common_meta::error::{self as meta_error, Result as MetaResult};
use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use common_meta::rpc::lock::{LockRequest, LockResponse, UnlockRequest};
use common_meta::rpc::procedure::{
MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
};
@@ -45,7 +43,6 @@ use common_meta::rpc::store::{
use common_meta::ClusterId;
use common_telemetry::info;
use heartbeat::Client as HeartbeatClient;
use lock::Client as LockClient;
use procedure::Client as ProcedureClient;
use snafu::{OptionExt, ResultExt};
use store::Client as StoreClient;
@@ -67,7 +64,6 @@ pub struct MetaClientBuilder {
role: Role,
enable_heartbeat: bool,
enable_store: bool,
enable_lock: bool,
enable_procedure: bool,
enable_access_cluster_info: bool,
channel_manager: Option<ChannelManager>,
@@ -123,13 +119,6 @@ impl MetaClientBuilder {
}
}
pub fn enable_lock(self) -> Self {
Self {
enable_lock: true,
..self
}
}
pub fn enable_procedure(self) -> Self {
Self {
enable_procedure: true,
@@ -188,10 +177,6 @@ impl MetaClientBuilder {
client.store = Some(StoreClient::new(self.id, self.role, mgr.clone()));
}
if self.enable_lock {
client.lock = Some(LockClient::new(self.id, self.role, mgr.clone()));
}
if self.enable_procedure {
let mgr = self.ddl_channel_manager.unwrap_or(mgr.clone());
client.procedure = Some(ProcedureClient::new(
@@ -221,7 +206,6 @@ pub struct MetaClient {
channel_manager: ChannelManager,
heartbeat: Option<HeartbeatClient>,
store: Option<StoreClient>,
lock: Option<LockClient>,
procedure: Option<ProcedureClient>,
cluster: Option<ClusterClient>,
}
@@ -383,10 +367,6 @@ impl MetaClient {
client.start(urls.clone()).await?;
info!("Store client started");
}
if let Some(client) = &mut self.lock {
client.start(urls.clone()).await?;
info!("Lock client started");
}
if let Some(client) = &mut self.procedure {
client.start(urls.clone()).await?;
info!("DDL client started");
@@ -482,15 +462,6 @@ impl MetaClient {
.context(ConvertMetaResponseSnafu)
}
pub async fn lock(&self, req: LockRequest) -> Result<LockResponse> {
self.lock_client()?.lock(req.into()).await.map(Into::into)
}
pub async fn unlock(&self, req: UnlockRequest) -> Result<()> {
let _ = self.lock_client()?.unlock(req.into()).await?;
Ok(())
}
/// Query the procedure state by its id.
pub async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse> {
self.procedure_client()?.query_procedure_state(pid).await
@@ -538,12 +509,6 @@ impl MetaClient {
})
}
pub fn lock_client(&self) -> Result<LockClient> {
self.lock.clone().context(NotStartedSnafu {
name: "lock_client",
})
}
pub fn procedure_client(&self) -> Result<ProcedureClient> {
self.procedure.clone().context(NotStartedSnafu {
name: "procedure_client",

View File

@@ -1,178 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::sync::Arc;
use api::v1::meta::lock_client::LockClient;
use api::v1::meta::{LockRequest, LockResponse, Role, UnlockRequest, UnlockResponse};
use common_grpc::channel_manager::ChannelManager;
use common_telemetry::tracing_context::TracingContext;
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::RwLock;
use tonic::transport::Channel;
use crate::client::{load_balance, Id};
use crate::error;
use crate::error::Result;
#[derive(Clone, Debug)]
pub struct Client {
inner: Arc<RwLock<Inner>>,
}
impl Client {
pub fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self {
let inner = Arc::new(RwLock::new(Inner {
id,
role,
channel_manager,
peers: vec![],
}));
Self { inner }
}
pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
where
U: AsRef<str>,
A: AsRef<[U]>,
{
let mut inner = self.inner.write().await;
inner.start(urls).await
}
pub async fn lock(&self, req: LockRequest) -> Result<LockResponse> {
let inner = self.inner.read().await;
inner.lock(req).await
}
pub async fn unlock(&self, req: UnlockRequest) -> Result<UnlockResponse> {
let inner = self.inner.read().await;
inner.unlock(req).await
}
}
#[derive(Debug)]
struct Inner {
id: Id,
role: Role,
channel_manager: ChannelManager,
peers: Vec<String>,
}
impl Inner {
async fn start<U, A>(&mut self, urls: A) -> Result<()>
where
U: AsRef<str>,
A: AsRef<[U]>,
{
ensure!(
!self.is_started(),
error::IllegalGrpcClientStateSnafu {
err_msg: "Lock client already started",
}
);
self.peers = urls
.as_ref()
.iter()
.map(|url| url.as_ref().to_string())
.collect::<HashSet<_>>()
.drain()
.collect::<Vec<_>>();
Ok(())
}
fn random_client(&self) -> Result<LockClient<Channel>> {
let len = self.peers.len();
let peer = load_balance::random_get(len, |i| Some(&self.peers[i])).context(
error::IllegalGrpcClientStateSnafu {
err_msg: "Empty peers, lock client may not start yet",
},
)?;
self.make_client(peer)
}
fn make_client(&self, addr: impl AsRef<str>) -> Result<LockClient<Channel>> {
let channel = self
.channel_manager
.get(addr)
.context(error::CreateChannelSnafu)?;
Ok(LockClient::new(channel))
}
#[inline]
fn is_started(&self) -> bool {
!self.peers.is_empty()
}
async fn lock(&self, mut req: LockRequest) -> Result<LockResponse> {
let mut client = self.random_client()?;
req.set_header(
self.id,
self.role,
TracingContext::from_current_span().to_w3c(),
);
let res = client.lock(req).await.map_err(error::Error::from)?;
Ok(res.into_inner())
}
async fn unlock(&self, mut req: UnlockRequest) -> Result<UnlockResponse> {
let mut client = self.random_client()?;
req.set_header(
self.id,
self.role,
TracingContext::from_current_span().to_w3c(),
);
let res = client.unlock(req).await.map_err(error::Error::from)?;
Ok(res.into_inner())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_already_start() {
let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default());
client
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
.await
.unwrap();
let res = client.start(&["127.0.0.1:1002"]).await;
assert!(res.is_err());
assert!(matches!(
res.err(),
Some(error::Error::IllegalGrpcClientState { .. })
));
}
#[tokio::test]
async fn test_start_with_duplicate_peers() {
let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default());
client
.start(&["127.0.0.1:1000", "127.0.0.1:1000", "127.0.0.1:1000"])
.await
.unwrap();
assert_eq!(1, client.inner.write().await.peers.len());
}
}

View File

@@ -16,7 +16,6 @@ use std::sync::Arc;
use api::v1::meta::cluster_server::ClusterServer;
use api::v1::meta::heartbeat_server::HeartbeatServer;
use api::v1::meta::lock_server::LockServer;
use api::v1::meta::procedure_service_server::ProcedureServiceServer;
use api::v1::meta::store_server::StoreServer;
use common_base::Plugins;
@@ -48,8 +47,6 @@ use crate::election::etcd::EtcdElection;
#[cfg(feature = "pg_kvbackend")]
use crate::error::InvalidArgumentsSnafu;
use crate::error::{InitExportMetricsTaskSnafu, TomlFormatSnafu};
use crate::lock::etcd::EtcdLock;
use crate::lock::memory::MemLock;
use crate::metasrv::builder::MetasrvBuilder;
use crate::metasrv::{BackendImpl, Metasrv, MetasrvOptions, SelectorRef};
use crate::selector::lease_based::LeaseBasedSelector;
@@ -183,7 +180,6 @@ pub fn router(metasrv: Metasrv) -> Router {
.add_service(HeartbeatServer::new(metasrv.clone()))
.add_service(StoreServer::new(metasrv.clone()))
.add_service(ClusterServer::new(metasrv.clone()))
.add_service(LockServer::new(metasrv.clone()))
.add_service(ProcedureServiceServer::new(metasrv.clone()))
.add_service(admin::make_admin_service(metasrv))
}
@@ -193,13 +189,9 @@ pub async fn metasrv_builder(
plugins: Plugins,
kv_backend: Option<KvBackendRef>,
) -> Result<MetasrvBuilder> {
let (kv_backend, election, lock) = match (kv_backend, &opts.backend) {
(Some(kv_backend), _) => (kv_backend, None, Some(Arc::new(MemLock::default()) as _)),
(None, BackendImpl::MemoryStore) => (
Arc::new(MemoryKvBackend::new()) as _,
None,
Some(Arc::new(MemLock::default()) as _),
),
let (kv_backend, election) = match (kv_backend, &opts.backend) {
(Some(kv_backend), _) => (kv_backend, None),
(None, BackendImpl::MemoryStore) => (Arc::new(MemoryKvBackend::new()) as _, None),
(None, BackendImpl::EtcdStore) => {
let etcd_client = create_etcd_client(opts).await?;
let kv_backend = {
@@ -224,18 +216,13 @@ pub async fn metasrv_builder(
)
.await?,
),
Some(EtcdLock::with_etcd_client(
etcd_client,
opts.store_key_prefix.clone(),
)?),
)
}
#[cfg(feature = "pg_kvbackend")]
(None, BackendImpl::PostgresStore) => {
let pg_client = create_postgres_client(opts).await?;
let kv_backend = PgStore::with_pg_client(pg_client).await.unwrap();
// TODO: implement locking and leader election for pg backend.
(kv_backend, None, None)
(kv_backend, None)
}
};
@@ -253,7 +240,6 @@ pub async fn metasrv_builder(
.in_memory(in_memory)
.selector(selector)
.election(election)
.lock(lock)
.plugins(plugins))
}

View File

@@ -448,30 +448,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to lock based on etcd"))]
Lock {
#[snafu(source)]
error: etcd_client::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to unlock based on etcd"))]
Unlock {
#[snafu(source)]
error: etcd_client::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to grant lease"))]
LeaseGrant {
#[snafu(source)]
error: etcd_client::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid utf-8 value"))]
InvalidUtf8Value {
#[snafu(source)]
@@ -770,9 +746,6 @@ impl ErrorExt for Error {
| Error::ResponseHeaderNotFound { .. }
| Error::IsNotLeader { .. }
| Error::InvalidHttpBody { .. }
| Error::Lock { .. }
| Error::Unlock { .. }
| Error::LeaseGrant { .. }
| Error::ExceededRetryLimit { .. }
| Error::SendShutdownSignal { .. }
| Error::PusherNotFound { .. }

View File

@@ -28,7 +28,6 @@ pub mod flow_meta_alloc;
pub mod handler;
pub mod key;
pub mod lease;
pub mod lock;
pub mod metasrv;
mod metrics;
#[cfg(feature = "mock")]

View File

@@ -1,99 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod etcd;
pub(crate) mod memory;
use std::sync::Arc;
use common_telemetry::error;
use crate::error::Result;
pub type Key = Vec<u8>;
pub const DEFAULT_EXPIRE_TIME_SECS: u64 = 10;
pub struct Opts {
// If the expiration time is exceeded and currently holds the lock, the lock is
// automatically released.
pub expire_secs: Option<u64>,
}
impl Default for Opts {
fn default() -> Self {
Opts {
expire_secs: Some(DEFAULT_EXPIRE_TIME_SECS),
}
}
}
#[async_trait::async_trait]
pub trait DistLock: Send + Sync {
// Lock acquires a distributed shared lock on a given named lock. On success, it
// will return a unique key that exists so long as the lock is held by the caller.
async fn lock(&self, name: Vec<u8>, opts: Opts) -> Result<Key>;
// Unlock takes a key returned by Lock and releases the hold on lock.
async fn unlock(&self, key: Vec<u8>) -> Result<()>;
}
pub type DistLockRef = Arc<dyn DistLock>;
pub struct DistLockGuard<'a> {
lock: &'a DistLockRef,
name: Vec<u8>,
key: Option<Key>,
}
impl<'a> DistLockGuard<'a> {
pub fn new(lock: &'a DistLockRef, name: Vec<u8>) -> Self {
Self {
lock,
name,
key: None,
}
}
pub async fn lock(&mut self) -> Result<()> {
if self.key.is_some() {
return Ok(());
}
let key = self
.lock
.lock(
self.name.clone(),
Opts {
expire_secs: Some(2),
},
)
.await?;
self.key = Some(key);
Ok(())
}
}
impl Drop for DistLockGuard<'_> {
fn drop(&mut self) {
if let Some(key) = self.key.take() {
let lock = self.lock.clone();
let name = self.name.clone();
let _handle = common_runtime::spawn_global(async move {
if let Err(e) = lock.unlock(key).await {
error!(e; "Failed to unlock '{}'", String::from_utf8_lossy(&name));
}
});
}
}
}

View File

@@ -1,93 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use etcd_client::{Client, LockOptions};
use snafu::ResultExt;
use super::{DistLock, DistLockRef, Opts, DEFAULT_EXPIRE_TIME_SECS};
use crate::error;
use crate::error::Result;
/// A implementation of distributed lock based on etcd. The Clone of EtcdLock is cheap.
#[derive(Clone)]
pub struct EtcdLock {
client: Client,
store_key_prefix: String,
}
impl EtcdLock {
pub async fn with_endpoints<E, S>(endpoints: S, store_key_prefix: String) -> Result<DistLockRef>
where
E: AsRef<str>,
S: AsRef<[E]>,
{
let client = Client::connect(endpoints, None)
.await
.context(error::ConnectEtcdSnafu)?;
Self::with_etcd_client(client, store_key_prefix)
}
pub fn with_etcd_client(client: Client, store_key_prefix: String) -> Result<DistLockRef> {
Ok(Arc::new(EtcdLock {
client,
store_key_prefix,
}))
}
fn lock_key(&self, key: Vec<u8>) -> Vec<u8> {
if self.store_key_prefix.is_empty() {
key
} else {
let mut prefix = self.store_key_prefix.as_bytes().to_vec();
prefix.extend_from_slice(&key);
prefix
}
}
}
#[async_trait::async_trait]
impl DistLock for EtcdLock {
async fn lock(&self, key: Vec<u8>, opts: Opts) -> Result<Vec<u8>> {
let expire = opts.expire_secs.unwrap_or(DEFAULT_EXPIRE_TIME_SECS) as i64;
let mut client = self.client.clone();
let resp = client
.lease_grant(expire, None)
.await
.context(error::LeaseGrantSnafu)?;
let lease_id = resp.id();
let lock_opts = LockOptions::new().with_lease(lease_id);
let resp = client
.lock(self.lock_key(key), Some(lock_opts))
.await
.context(error::LockSnafu)?;
Ok(resp.key().to_vec())
}
async fn unlock(&self, key: Vec<u8>) -> Result<()> {
let mut client = self.client.clone();
let _ = client
.unlock(self.lock_key(key))
.await
.context(error::UnlockSnafu)?;
Ok(())
}
}

View File

@@ -1,112 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use async_trait::async_trait;
use dashmap::DashMap;
use tokio::sync::{Mutex, OwnedMutexGuard};
use crate::error::Result;
use crate::lock::{DistLock, Key, Opts};
#[derive(Default)]
pub(crate) struct MemLock {
mutexes: DashMap<Key, Arc<Mutex<()>>>,
guards: DashMap<Key, OwnedMutexGuard<()>>,
}
#[async_trait]
impl DistLock for MemLock {
async fn lock(&self, key: Vec<u8>, _opts: Opts) -> Result<Key> {
let mutex = self
.mutexes
.entry(key.clone())
.or_insert_with(|| Arc::new(Mutex::new(())))
.clone();
let guard = mutex.lock_owned().await;
let _ = self.guards.insert(key.clone(), guard);
Ok(key)
}
async fn unlock(&self, key: Vec<u8>) -> Result<()> {
// drop the guard, so that the mutex can be unlocked,
// effectively make the `mutex.lock_owned` in `lock` method to proceed
let _ = self.guards.remove(&key);
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::atomic::{AtomicU32, Ordering};
use rand::seq::SliceRandom;
use super::*;
#[tokio::test(flavor = "multi_thread")]
async fn test_mem_lock_concurrently() {
let lock = Arc::new(MemLock::default());
let keys = (0..10)
.map(|i| format!("my-lock-{i}").into_bytes())
.collect::<Vec<Key>>();
let counters: [(Key, AtomicU32); 10] = keys
.iter()
.map(|x| (x.clone(), AtomicU32::new(0)))
.collect::<Vec<_>>()
.try_into()
.unwrap();
let counters = Arc::new(HashMap::from(counters));
let tasks = (0..100)
.map(|_| {
let mut keys = keys.clone();
keys.shuffle(&mut rand::thread_rng());
let lock_clone = lock.clone();
let counters_clone = counters.clone();
tokio::spawn(async move {
// every key counter will be added by 1 for 10 times
for i in 0..100 {
let key = &keys[i % keys.len()];
assert!(lock_clone
.lock(key.clone(), Opts { expire_secs: None })
.await
.is_ok());
// Intentionally create a critical section:
// if our MemLock is flawed, the resulting counter is wrong.
//
// Note that AtomicU32 is only used to enable the updates from multiple tasks,
// does not make any guarantee about the correctness of the result.
let counter = counters_clone.get(key).unwrap();
let v = counter.load(Ordering::Relaxed);
counter.store(v + 1, Ordering::Relaxed);
lock_clone.unlock(key.clone()).await.unwrap();
}
})
})
.collect::<Vec<_>>();
let _ = futures::future::join_all(tasks).await;
assert!(counters.values().all(|x| x.load(Ordering::Relaxed) == 1000));
}
}

View File

@@ -57,7 +57,6 @@ use crate::error::{
use crate::failure_detector::PhiAccrualFailureDetectorOptions;
use crate::handler::HeartbeatHandlerGroupRef;
use crate::lease::lookup_datanode_peer;
use crate::lock::DistLockRef;
use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
use crate::procedure::ProcedureManagerListenerAdapter;
use crate::pubsub::{PublisherRef, SubscriptionManagerRef};
@@ -356,7 +355,6 @@ pub struct Metasrv {
flow_selector: SelectorRef,
handler_group: HeartbeatHandlerGroupRef,
election: Option<ElectionRef>,
lock: DistLockRef,
procedure_manager: ProcedureManagerRef,
mailbox: MailboxRef,
procedure_executor: ProcedureExecutorRef,
@@ -568,10 +566,6 @@ impl Metasrv {
self.election.as_ref()
}
pub fn lock(&self) -> &DistLockRef {
&self.lock
}
pub fn mailbox(&self) -> &MailboxRef {
&self.mailbox
}

View File

@@ -52,8 +52,6 @@ use crate::handler::{
HeartbeatHandlerGroup, HeartbeatHandlerGroupBuilder, HeartbeatMailbox, Pushers,
};
use crate::lease::MetaPeerLookupService;
use crate::lock::memory::MemLock;
use crate::lock::DistLockRef;
use crate::metasrv::{
ElectionRef, Metasrv, MetasrvInfo, MetasrvOptions, SelectorContext, SelectorRef, TABLE_ID_SEQ,
};
@@ -79,7 +77,6 @@ pub struct MetasrvBuilder {
handler_group: Option<HeartbeatHandlerGroup>,
election: Option<ElectionRef>,
meta_peer_client: Option<MetaPeerClientRef>,
lock: Option<DistLockRef>,
node_manager: Option<NodeManagerRef>,
plugins: Option<Plugins>,
table_metadata_allocator: Option<TableMetadataAllocatorRef>,
@@ -95,7 +92,6 @@ impl MetasrvBuilder {
meta_peer_client: None,
election: None,
options: None,
lock: None,
node_manager: None,
plugins: None,
table_metadata_allocator: None,
@@ -137,11 +133,6 @@ impl MetasrvBuilder {
self
}
pub fn lock(mut self, lock: Option<DistLockRef>) -> Self {
self.lock = lock;
self
}
pub fn node_manager(mut self, node_manager: NodeManagerRef) -> Self {
self.node_manager = Some(node_manager);
self
@@ -171,7 +162,6 @@ impl MetasrvBuilder {
in_memory,
selector,
handler_group,
lock,
node_manager,
plugins,
table_metadata_allocator,
@@ -205,7 +195,6 @@ impl MetasrvBuilder {
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(
leader_cached_kv_backend.clone() as _,
));
let lock = lock.unwrap_or_else(|| Arc::new(MemLock::default()));
let selector_ctx = SelectorContext {
server_addr: options.server_addr.clone(),
datanode_lease_secs: distributed_time_constants::DATANODE_LEASE_SECS,
@@ -384,7 +373,6 @@ impl MetasrvBuilder {
flow_selector: Arc::new(RoundRobinSelector::new(SelectTarget::Flownode)),
handler_group: Arc::new(handler_group),
election,
lock,
procedure_manager,
mailbox,
procedure_executor: ddl_manager,

View File

@@ -20,7 +20,6 @@ use tonic::{Response, Status};
pub mod admin;
pub mod cluster;
mod heartbeat;
pub mod lock;
pub mod mailbox;
pub mod procedure;
pub mod store;

View File

@@ -1,51 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::{lock_server, LockRequest, LockResponse, UnlockRequest, UnlockResponse};
use tonic::{Request, Response};
use super::GrpcResult;
use crate::lock::Opts;
use crate::metasrv::Metasrv;
#[async_trait::async_trait]
impl lock_server::Lock for Metasrv {
async fn lock(&self, request: Request<LockRequest>) -> GrpcResult<LockResponse> {
let LockRequest {
name, expire_secs, ..
} = request.into_inner();
let expire_secs = Some(expire_secs as u64);
let key = self.lock().lock(name, Opts { expire_secs }).await?;
let resp = LockResponse {
key,
..Default::default()
};
Ok(Response::new(resp))
}
async fn unlock(&self, request: Request<UnlockRequest>) -> GrpcResult<UnlockResponse> {
let UnlockRequest { key, .. } = request.into_inner();
let _ = self.lock().unlock(key).await?;
let resp = UnlockResponse {
..Default::default()
};
Ok(Response::new(resp))
}
}

View File

@@ -64,19 +64,15 @@ impl MetricEngineInner {
/// Return the physical region id behind this logical region
async fn alter_logical_region(
&self,
logical_region_id: RegionId,
region_id: RegionId,
request: RegionAlterRequest,
) -> Result<RegionId> {
let physical_region_id = {
let state = &self.state.read().unwrap();
state
.get_physical_region_id(logical_region_id)
.with_context(|| {
error!("Trying to alter an nonexistent region {logical_region_id}");
LogicalRegionNotFoundSnafu {
region_id: logical_region_id,
}
})?
state.get_physical_region_id(region_id).with_context(|| {
error!("Trying to alter an nonexistent region {region_id}");
LogicalRegionNotFoundSnafu { region_id }
})?
};
// only handle adding column
@@ -91,7 +87,7 @@ impl MetricEngineInner {
.metadata_region
.column_semantic_type(
metadata_region_id,
logical_region_id,
region_id,
&col.column_metadata.column_schema.name,
)
.await?
@@ -106,7 +102,7 @@ impl MetricEngineInner {
self.add_columns_to_physical_data_region(
data_region_id,
metadata_region_id,
logical_region_id,
region_id,
columns_to_add,
)
.await?;
@@ -114,16 +110,10 @@ impl MetricEngineInner {
// register columns to logical region
for col in columns {
self.metadata_region
.add_column(metadata_region_id, logical_region_id, &col.column_metadata)
.add_column(metadata_region_id, region_id, &col.column_metadata)
.await?;
}
// invalid logical column cache
self.state
.write()
.unwrap()
.invalid_logical_column_cache(logical_region_id);
Ok(physical_region_id)
}

View File

@@ -169,11 +169,11 @@ impl MetricEngineInner {
) -> Result<Vec<usize>> {
// project on logical columns
let all_logical_columns = self
.load_logical_column_names(physical_region_id, logical_region_id)
.load_logical_columns(physical_region_id, logical_region_id)
.await?;
let projected_logical_names = origin_projection
.iter()
.map(|i| all_logical_columns[*i].clone())
.map(|i| all_logical_columns[*i].column_schema.name.clone())
.collect::<Vec<_>>();
// generate physical projection
@@ -200,8 +200,10 @@ impl MetricEngineInner {
logical_region_id: RegionId,
) -> Result<Vec<usize>> {
let logical_columns = self
.load_logical_column_names(physical_region_id, logical_region_id)
.await?;
.load_logical_columns(physical_region_id, logical_region_id)
.await?
.into_iter()
.map(|col| col.column_schema.name);
let mut projection = Vec::with_capacity(logical_columns.len());
let data_region_id = utils::to_data_region_id(physical_region_id);
let physical_metadata = self

View File

@@ -23,25 +23,13 @@ use crate::error::Result;
impl MetricEngineInner {
/// Load column metadata of a logical region.
///
/// The return value is ordered on column name.
/// The return value is ordered on [ColumnId].
pub async fn load_logical_columns(
&self,
physical_region_id: RegionId,
logical_region_id: RegionId,
) -> Result<Vec<ColumnMetadata>> {
// First try to load from state cache
if let Some(columns) = self
.state
.read()
.unwrap()
.logical_columns()
.get(&logical_region_id)
{
return Ok(columns.clone());
}
// Else load from metadata region and update the cache.
// Load logical and physical columns, and intersect them to get logical column metadata.
// load logical and physical columns, and intersect them to get logical column metadata
let mut logical_column_metadata = self
.metadata_region
.logical_columns(physical_region_id, logical_region_id)
@@ -49,48 +37,11 @@ impl MetricEngineInner {
.into_iter()
.map(|(_, column_metadata)| column_metadata)
.collect::<Vec<_>>();
// Sort columns on column name to ensure the order
// sort columns on column id to ensure the order
logical_column_metadata
.sort_unstable_by(|c1, c2| c1.column_schema.name.cmp(&c2.column_schema.name));
// Update cache
self.state
.write()
.unwrap()
.add_logical_columns(logical_region_id, logical_column_metadata.clone());
Ok(logical_column_metadata)
}
/// Load logical column names of a logical region.
///
/// The return value is ordered on column name alphabetically.
pub async fn load_logical_column_names(
&self,
physical_region_id: RegionId,
logical_region_id: RegionId,
) -> Result<Vec<String>> {
// First try to load from state cache
if let Some(columns) = self
.state
.read()
.unwrap()
.logical_columns()
.get(&logical_region_id)
{
return Ok(columns
.iter()
.map(|c| c.column_schema.name.clone())
.collect());
}
// Else load from metadata region
let columns = self
.load_logical_columns(physical_region_id, logical_region_id)
.await?
.into_iter()
.map(|c| c.column_schema.name)
.collect::<Vec<_>>();
Ok(columns)
}
}

View File

@@ -17,7 +17,6 @@
use std::collections::{HashMap, HashSet};
use snafu::OptionExt;
use store_api::metadata::ColumnMetadata;
use store_api::storage::RegionId;
use crate::error::{PhysicalRegionNotFoundSnafu, Result};
@@ -36,10 +35,6 @@ pub(crate) struct MetricEngineState {
/// Cache for the columns of physical regions.
/// The region id in key is the data region id.
physical_columns: HashMap<RegionId, HashSet<String>>,
/// Cache for the column metadata of logical regions.
/// The column order is the same with the order in the metadata, which is
/// alphabetically ordered on column name.
logical_columns: HashMap<RegionId, Vec<ColumnMetadata>>,
}
impl MetricEngineState {
@@ -85,21 +80,6 @@ impl MetricEngineState {
.insert(logical_region_id, physical_region_id);
}
/// Add and reorder logical columns.
///
/// Caller should make sure:
/// 1. there is no duplicate columns
/// 2. the column order is the same with the order in the metadata, which is
/// alphabetically ordered on column name.
pub fn add_logical_columns(
&mut self,
logical_region_id: RegionId,
new_columns: impl IntoIterator<Item = ColumnMetadata>,
) {
let columns = self.logical_columns.entry(logical_region_id).or_default();
columns.extend(new_columns);
}
pub fn get_physical_region_id(&self, logical_region_id: RegionId) -> Option<RegionId> {
self.logical_regions.get(&logical_region_id).copied()
}
@@ -108,10 +88,6 @@ impl MetricEngineState {
&self.physical_columns
}
pub fn logical_columns(&self) -> &HashMap<RegionId, Vec<ColumnMetadata>> {
&self.logical_columns
}
pub fn physical_regions(&self) -> &HashMap<RegionId, HashSet<RegionId>> {
&self.physical_regions
}
@@ -153,15 +129,9 @@ impl MetricEngineState {
.unwrap() // Safety: physical_region_id is got from physical_regions
.remove(&logical_region_id);
self.logical_columns.remove(&logical_region_id);
Ok(())
}
pub fn invalid_logical_column_cache(&mut self, logical_region_id: RegionId) {
self.logical_columns.remove(&logical_region_id);
}
pub fn is_logical_region_exist(&self, logical_region_id: RegionId) -> bool {
self.logical_regions().contains_key(&logical_region_id)
}