feat: meta provides the ability to distribute lock (#961)

* add DistLock trait and a implement based etcd

wip

impl lock grpc service for meta-srv

reuse the etcd client instead of repeatedly creating etcd client

add some docs and comments

add some comment

meta client support distribute lock

fix: dead lock

self-cr

* cr

* rename "expire" -> "expire_secs"
This commit is contained in:
fys
2023-02-13 15:58:30 +08:00
committed by GitHub
parent be897efd01
commit c1a9f84c7f
18 changed files with 712 additions and 12 deletions

2
Cargo.lock generated
View File

@@ -2973,7 +2973,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=966161508646f575801bcf05f47ed283ec231d68#966161508646f575801bcf05f47ed283ec231d68"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=3e6349be127b65a8b42a38cda9d527ec423ca77d#3e6349be127b65a8b42a38cda9d527ec423ca77d"
dependencies = [
"prost 0.11.6",
"tonic",

View File

@@ -10,7 +10,7 @@ common-base = { path = "../common/base" }
common-error = { path = "../common/error" }
common-time = { path = "../common/time" }
datatypes = { path = "../datatypes" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "966161508646f575801bcf05f47ed283ec231d68" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "3e6349be127b65a8b42a38cda9d527ec423ca77d" }
prost.workspace = true
snafu = { version = "0.7", features = ["backtraces"] }
tonic.workspace = true

View File

@@ -0,0 +1,125 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use meta_client::client::{MetaClient, MetaClientBuilder};
use meta_client::rpc::lock::{LockRequest, UnlockRequest};
use tracing::{info, subscriber};
use tracing_subscriber::FmtSubscriber;
fn main() {
subscriber::set_global_default(FmtSubscriber::builder().finish()).unwrap();
run();
}
#[tokio::main]
async fn run() {
let id = (1000u64, 2000u64);
let config = ChannelConfig::new()
.timeout(Duration::from_secs(30))
.connect_timeout(Duration::from_secs(5))
.tcp_nodelay(true);
let channel_manager = ChannelManager::with_config(config);
let mut meta_client = MetaClientBuilder::new(id.0, id.1)
.enable_lock()
.channel_manager(channel_manager)
.build();
meta_client.start(&["127.0.0.1:3002"]).await.unwrap();
run_normal(meta_client.clone()).await;
run_multi_thread(meta_client.clone()).await;
run_multi_thread_with_one_timeout(meta_client).await;
}
async fn run_normal(meta_client: MetaClient) {
let name = "lock_name".as_bytes().to_vec();
let expire_secs = 60;
let lock_req = LockRequest { name, expire_secs };
let lock_result = meta_client.lock(lock_req).await.unwrap();
let key = lock_result.key;
info!(
"lock success! Returned key: {}",
String::from_utf8(key.clone()).unwrap()
);
// It is recommended that time of holding lock is less than the timeout of the grpc channel
info!("do some work, take 3 seconds");
tokio::time::sleep(Duration::from_secs(3)).await;
let unlock_req = UnlockRequest { key };
meta_client.unlock(unlock_req).await.unwrap();
info!("unlock success!");
}
async fn run_multi_thread(meta_client: MetaClient) {
let meta_client_clone = meta_client.clone();
let join1 = tokio::spawn(async move {
run_normal(meta_client_clone.clone()).await;
});
tokio::time::sleep(Duration::from_secs(1)).await;
let join2 = tokio::spawn(async move {
run_normal(meta_client).await;
});
join1.await.unwrap();
join2.await.unwrap();
}
async fn run_multi_thread_with_one_timeout(meta_client: MetaClient) {
let meta_client_clone = meta_client.clone();
let join1 = tokio::spawn(async move {
run_with_timeout(meta_client_clone.clone()).await;
});
tokio::time::sleep(Duration::from_secs(1)).await;
let join2 = tokio::spawn(async move {
run_normal(meta_client).await;
});
join1.await.unwrap();
join2.await.unwrap();
}
async fn run_with_timeout(meta_client: MetaClient) {
let name = "lock_name".as_bytes().to_vec();
let expire_secs = 5;
let lock_req = LockRequest { name, expire_secs };
let lock_result = meta_client.lock(lock_req).await.unwrap();
let key = lock_result.key;
info!(
"lock success! Returned key: {}",
String::from_utf8(key.clone()).unwrap()
);
// It is recommended that time of holding lock is less than the timeout of the grpc channel
info!("do some work, take 20 seconds");
tokio::time::sleep(Duration::from_secs(20)).await;
let unlock_req = UnlockRequest { key };
meta_client.unlock(unlock_req).await.unwrap();
info!("unlock success!");
}

View File

@@ -14,12 +14,14 @@
mod heartbeat;
mod load_balance;
mod lock;
mod router;
mod store;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_telemetry::info;
use heartbeat::Client as HeartbeatClient;
use lock::Client as LockClient;
use router::Client as RouterClient;
use snafu::OptionExt;
use store::Client as StoreClient;
@@ -27,6 +29,7 @@ use store::Client as StoreClient;
pub use self::heartbeat::{HeartbeatSender, HeartbeatStream};
use crate::error;
use crate::error::Result;
use crate::rpc::lock::{LockRequest, LockResponse, UnlockRequest};
use crate::rpc::router::DeleteRequest;
use crate::rpc::{
BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, CreateRequest,
@@ -42,6 +45,7 @@ pub struct MetaClientBuilder {
enable_heartbeat: bool,
enable_router: bool,
enable_store: bool,
enable_lock: bool,
channel_manager: Option<ChannelManager>,
}
@@ -74,6 +78,13 @@ impl MetaClientBuilder {
}
}
pub fn enable_lock(self) -> Self {
Self {
enable_lock: true,
..self
}
}
pub fn channel_manager(self, channel_manager: ChannelManager) -> Self {
Self {
channel_manager: Some(channel_manager),
@@ -88,9 +99,7 @@ impl MetaClientBuilder {
MetaClient::new(self.id)
};
if let (false, false, false) =
(self.enable_heartbeat, self.enable_router, self.enable_store)
{
if !(self.enable_heartbeat || self.enable_router || self.enable_store || self.enable_lock) {
panic!("At least one client needs to be enabled.")
}
@@ -103,7 +112,10 @@ impl MetaClientBuilder {
client.router = Some(RouterClient::new(self.id, mgr.clone()));
}
if self.enable_store {
client.store = Some(StoreClient::new(self.id, mgr));
client.store = Some(StoreClient::new(self.id, mgr.clone()));
}
if self.enable_lock {
client.lock = Some(LockClient::new(self.id, mgr));
}
client
@@ -117,6 +129,7 @@ pub struct MetaClient {
heartbeat: Option<HeartbeatClient>,
router: Option<RouterClient>,
store: Option<StoreClient>,
lock: Option<LockClient>,
}
impl MetaClient {
@@ -151,10 +164,15 @@ impl MetaClient {
info!("Router client started");
}
if let Some(client) = &mut self.store {
client.start(urls).await?;
client.start(urls.clone()).await?;
info!("Store client started");
}
if let Some(client) = &mut self.lock {
client.start(urls).await?;
info!("Lock client started");
}
Ok(())
}
@@ -260,6 +278,15 @@ impl MetaClient {
.try_into()
}
pub async fn lock(&self, req: LockRequest) -> Result<LockResponse> {
self.lock_client()?.lock(req.into()).await.map(Into::into)
}
pub async fn unlock(&self, req: UnlockRequest) -> Result<()> {
self.lock_client()?.unlock(req.into()).await?;
Ok(())
}
#[inline]
pub fn heartbeat_client(&self) -> Result<HeartbeatClient> {
self.heartbeat.clone().context(error::NotStartedSnafu {
@@ -281,6 +308,13 @@ impl MetaClient {
})
}
#[inline]
pub fn lock_client(&self) -> Result<LockClient> {
self.lock.clone().context(error::NotStartedSnafu {
name: "lock_client",
})
}
#[inline]
pub fn channel_config(&self) -> &ChannelConfig {
self.channel_manager.config()

View File

@@ -0,0 +1,184 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::sync::Arc;
use api::v1::meta::lock_client::LockClient;
use api::v1::meta::{LockRequest, LockResponse, UnlockRequest, UnlockResponse};
use common_grpc::channel_manager::ChannelManager;
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::RwLock;
use tonic::transport::Channel;
use crate::client::{load_balance, Id};
use crate::error;
use crate::error::Result;
#[derive(Clone, Debug)]
pub struct Client {
inner: Arc<RwLock<Inner>>,
}
impl Client {
pub fn new(id: Id, channel_manager: ChannelManager) -> Self {
let inner = Arc::new(RwLock::new(Inner {
id,
channel_manager,
peers: vec![],
}));
Self { inner }
}
pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
where
U: AsRef<str>,
A: AsRef<[U]>,
{
let mut inner = self.inner.write().await;
inner.start(urls).await
}
pub async fn is_started(&self) -> bool {
let inner = self.inner.read().await;
inner.is_started()
}
pub async fn lock(&self, req: LockRequest) -> Result<LockResponse> {
let inner = self.inner.read().await;
inner.lock(req).await
}
pub async fn unlock(&self, req: UnlockRequest) -> Result<UnlockResponse> {
let inner = self.inner.read().await;
inner.unlock(req).await
}
}
#[derive(Debug)]
struct Inner {
id: Id,
channel_manager: ChannelManager,
peers: Vec<String>,
}
impl Inner {
async fn start<U, A>(&mut self, urls: A) -> Result<()>
where
U: AsRef<str>,
A: AsRef<[U]>,
{
ensure!(
!self.is_started(),
error::IllegalGrpcClientStateSnafu {
err_msg: "Lock client already started",
}
);
self.peers = urls
.as_ref()
.iter()
.map(|url| url.as_ref().to_string())
.collect::<HashSet<_>>()
.drain()
.collect::<Vec<_>>();
Ok(())
}
fn random_client(&self) -> Result<LockClient<Channel>> {
let len = self.peers.len();
let peer = load_balance::random_get(len, |i| Some(&self.peers[i])).context(
error::IllegalGrpcClientStateSnafu {
err_msg: "Empty peers, lock client may not start yet",
},
)?;
self.make_client(peer)
}
fn make_client(&self, addr: impl AsRef<str>) -> Result<LockClient<Channel>> {
let channel = self
.channel_manager
.get(addr)
.context(error::CreateChannelSnafu)?;
Ok(LockClient::new(channel))
}
#[inline]
fn is_started(&self) -> bool {
!self.peers.is_empty()
}
async fn lock(&self, mut req: LockRequest) -> Result<LockResponse> {
let mut client = self.random_client()?;
req.set_header(self.id);
let res = client.lock(req).await.context(error::TonicStatusSnafu)?;
Ok(res.into_inner())
}
async fn unlock(&self, mut req: UnlockRequest) -> Result<UnlockResponse> {
let mut client = self.random_client()?;
req.set_header(self.id);
let res = client.unlock(req).await.context(error::TonicStatusSnafu)?;
Ok(res.into_inner())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_start_client() {
let mut client = Client::new((0, 0), ChannelManager::default());
assert!(!client.is_started().await);
client
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
.await
.unwrap();
assert!(client.is_started().await);
}
#[tokio::test]
async fn test_already_start() {
let mut client = Client::new((0, 0), ChannelManager::default());
client
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
.await
.unwrap();
assert!(client.is_started().await);
let res = client.start(&["127.0.0.1:1002"]).await;
assert!(res.is_err());
assert!(matches!(
res.err(),
Some(error::Error::IllegalGrpcClientState { .. })
));
}
#[tokio::test]
async fn test_start_with_duplicate_peers() {
let mut client = Client::new((0, 0), ChannelManager::default());
client
.start(&["127.0.0.1:1000", "127.0.0.1:1000", "127.0.0.1:1000"])
.await
.unwrap();
assert_eq!(1, client.inner.write().await.peers.len());
}
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod lock;
pub mod router;
mod store;
pub mod util;

View File

@@ -0,0 +1,115 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::{
LockRequest as PbLockRequest, LockResponse as PbLockResponse, UnlockRequest as PbUnlockRequest,
};
#[derive(Debug)]
pub struct LockRequest {
pub name: Vec<u8>,
pub expire_secs: i64,
}
impl From<LockRequest> for PbLockRequest {
fn from(req: LockRequest) -> Self {
Self {
header: None,
name: req.name,
expire_secs: req.expire_secs,
}
}
}
#[derive(Debug)]
pub struct LockResponse {
pub key: Vec<u8>,
}
impl From<PbLockResponse> for LockResponse {
fn from(resp: PbLockResponse) -> Self {
Self { key: resp.key }
}
}
#[derive(Debug)]
pub struct UnlockRequest {
pub key: Vec<u8>,
}
impl From<UnlockRequest> for PbUnlockRequest {
fn from(req: UnlockRequest) -> Self {
Self {
header: None,
key: req.key.to_vec(),
}
}
}
#[cfg(test)]
mod tests {
use api::v1::meta::{
LockRequest as PbLockRequest, LockResponse as PbLockResponse,
UnlockRequest as PbUnlockRequest,
};
use super::LockRequest;
use crate::rpc::lock::{LockResponse, UnlockRequest};
#[test]
fn test_convert_lock_req() {
let lock_req = LockRequest {
name: "lock_1".as_bytes().to_vec(),
expire_secs: 1,
};
let pb_lock_req: PbLockRequest = lock_req.into();
let expected = PbLockRequest {
header: None,
name: "lock_1".as_bytes().to_vec(),
expire_secs: 1,
};
assert_eq!(expected, pb_lock_req);
}
#[test]
fn test_convert_unlock_req() {
let unlock_req = UnlockRequest {
key: "lock_1_12378123".as_bytes().to_vec(),
};
let pb_unlock_req: PbUnlockRequest = unlock_req.into();
let expected = PbUnlockRequest {
header: None,
key: "lock_1_12378123".as_bytes().to_vec(),
};
assert_eq!(expected, pb_unlock_req);
}
#[test]
fn test_convert_lock_response() {
let pb_lock_resp = PbLockResponse {
header: None,
key: "lock_1_12378123".as_bytes().to_vec(),
};
let lock_resp: LockResponse = pb_lock_resp.into();
let expected_key = "lock_1_12378123".as_bytes().to_vec();
assert_eq!(expected_key, lock_resp.key);
}
}

View File

@@ -16,8 +16,10 @@ use std::sync::Arc;
use api::v1::meta::cluster_server::ClusterServer;
use api::v1::meta::heartbeat_server::HeartbeatServer;
use api::v1::meta::lock_server::LockServer;
use api::v1::meta::router_server::RouterServer;
use api::v1::meta::store_server::StoreServer;
use etcd_client::Client;
use snafu::ResultExt;
use tokio::net::TcpListener;
use tokio_stream::wrappers::TcpListenerStream;
@@ -25,6 +27,7 @@ use tonic::transport::server::Router;
use crate::cluster::MetaPeerClient;
use crate::election::etcd::EtcdElection;
use crate::lock::etcd::EtcdLock;
use crate::metasrv::builder::MetaSrvBuilder;
use crate::metasrv::{MetaSrv, MetaSrvOptions, SelectorRef};
use crate::selector::lease_based::LeaseBasedSelector;
@@ -65,16 +68,25 @@ pub fn router(meta_srv: MetaSrv) -> Router {
.add_service(RouterServer::new(meta_srv.clone()))
.add_service(StoreServer::new(meta_srv.clone()))
.add_service(ClusterServer::new(meta_srv.clone()))
.add_service(LockServer::new(meta_srv.clone()))
.add_service(admin::make_admin_service(meta_srv))
}
pub async fn make_meta_srv(opts: MetaSrvOptions) -> Result<MetaSrv> {
let (kv_store, election) = if opts.use_memory_store {
(Arc::new(MemStore::new()) as _, None)
let (kv_store, election, lock) = if opts.use_memory_store {
(Arc::new(MemStore::new()) as _, None, None)
} else {
let etcd_endpoints = [&opts.store_addr];
let etcd_client = Client::connect(etcd_endpoints, None)
.await
.context(error::ConnectEtcdSnafu)?;
(
EtcdStore::with_endpoints([&opts.store_addr]).await?,
Some(EtcdElection::with_endpoints(&opts.server_addr, [&opts.store_addr]).await?),
EtcdStore::with_etcd_client(etcd_client.clone())?,
Some(EtcdElection::with_etcd_client(
&opts.server_addr,
etcd_client.clone(),
)?),
Some(EtcdLock::with_etcd_client(etcd_client)?),
)
};
@@ -95,6 +107,7 @@ pub async fn make_meta_srv(opts: MetaSrvOptions) -> Result<MetaSrv> {
.selector(selector)
.election(election)
.meta_peer_client(meta_peer_client)
.lock(lock)
.build()
.await;

View File

@@ -38,11 +38,19 @@ impl EtcdElection {
E: AsRef<str>,
S: AsRef<[E]>,
{
let leader_value = leader_value.as_ref().into();
let client = Client::connect(endpoints, None)
.await
.context(error::ConnectEtcdSnafu)?;
Self::with_etcd_client(leader_value, client)
}
pub fn with_etcd_client<E>(leader_value: E, client: Client) -> Result<ElectionRef>
where
E: AsRef<str>,
{
let leader_value = leader_value.as_ref().into();
Ok(Arc::new(Self {
leader_value,
client,

View File

@@ -218,6 +218,27 @@ pub enum Error {
#[snafu(backtrace)]
source: BoxedError,
},
#[snafu(display("Failed to lock based on etcd, source: {}", source))]
Lock {
source: etcd_client::Error,
backtrace: Backtrace,
},
#[snafu(display("Failed to unlock based on etcd, source: {}", source))]
Unlock {
source: etcd_client::Error,
backtrace: Backtrace,
},
#[snafu(display("Failed to grant lease, source: {}", source))]
LeaseGrant {
source: etcd_client::Error,
backtrace: Backtrace,
},
#[snafu(display("Distributed lock is not configured"))]
LockNotConfig { backtrace: Backtrace },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -254,6 +275,10 @@ impl ErrorExt for Error {
| Error::IsNotLeader { .. }
| Error::NoMetaPeerClient { .. }
| Error::InvalidHttpBody { .. }
| Error::Lock { .. }
| Error::Unlock { .. }
| Error::LeaseGrant { .. }
| Error::LockNotConfig { .. }
| Error::StartGrpc { .. } => StatusCode::Internal,
Error::EmptyKey { .. }
| Error::EmptyTableName { .. }

View File

@@ -20,6 +20,7 @@ pub mod error;
pub mod handler;
pub mod keys;
pub mod lease;
pub mod lock;
pub mod metasrv;
#[cfg(feature = "mock")]
pub mod mocks;

41
src/meta-srv/src/lock.rs Normal file
View File

@@ -0,0 +1,41 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod etcd;
use std::sync::Arc;
use crate::error::Result;
pub type Key = Vec<u8>;
pub const DEFAULT_EXPIRE_TIME_SECS: u64 = 10;
pub struct Opts {
// If the expiration time is exceeded and currently holds the lock, the lock is
// automatically released.
pub expire_secs: Option<u64>,
}
#[async_trait::async_trait]
pub trait DistLock: Send + Sync {
// Lock acquires a distributed shared lock on a given named lock. On success, it
// will return a unique key that exists so long as the lock is held by the caller.
async fn lock(&self, name: Vec<u8>, opts: Opts) -> Result<Key>;
// Unlock takes a key returned by Lock and releases the hold on lock.
async fn unlock(&self, key: Vec<u8>) -> Result<()>;
}
pub type DistLockRef = Arc<dyn DistLock>;

View File

@@ -0,0 +1,76 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use etcd_client::{Client, LockOptions};
use snafu::ResultExt;
use super::{DistLock, DistLockRef, Opts, DEFAULT_EXPIRE_TIME_SECS};
use crate::error;
use crate::error::Result;
/// A implementation of distributed lock based on etcd. The Clone of EtcdLock is cheap.
#[derive(Clone)]
pub struct EtcdLock {
client: Client,
}
impl EtcdLock {
pub async fn with_endpoints<E, S>(endpoints: S) -> Result<DistLockRef>
where
E: AsRef<str>,
S: AsRef<[E]>,
{
let client = Client::connect(endpoints, None)
.await
.context(error::ConnectEtcdSnafu)?;
Self::with_etcd_client(client)
}
pub fn with_etcd_client(client: Client) -> Result<DistLockRef> {
Ok(Arc::new(EtcdLock { client }))
}
}
#[async_trait::async_trait]
impl DistLock for EtcdLock {
async fn lock(&self, name: Vec<u8>, opts: Opts) -> Result<Vec<u8>> {
let expire = opts.expire_secs.unwrap_or(DEFAULT_EXPIRE_TIME_SECS) as i64;
let mut client = self.client.clone();
let resp = client
.lease_grant(expire, None)
.await
.context(error::LeaseGrantSnafu)?;
let lease_id = resp.id();
let lock_opts = LockOptions::new().with_lease(lease_id);
let resp = client
.lock(name, Some(lock_opts))
.await
.context(error::LockSnafu)?;
Ok(resp.key().to_vec())
}
async fn unlock(&self, key: Vec<u8>) -> Result<()> {
let mut client = self.client.clone();
let _ = client.unlock(key).await.context(error::UnlockSnafu)?;
Ok(())
}
}

View File

@@ -24,6 +24,7 @@ use serde::{Deserialize, Serialize};
use crate::cluster::MetaPeerClient;
use crate::election::Election;
use crate::handler::HeartbeatHandlerGroup;
use crate::lock::DistLockRef;
use crate::selector::{Selector, SelectorType};
use crate::sequence::SequenceRef;
use crate::service::store::kv::{KvStoreRef, ResetableKvStoreRef};
@@ -99,6 +100,7 @@ pub struct MetaSrv {
handler_group: HeartbeatHandlerGroup,
election: Option<ElectionRef>,
meta_peer_client: Option<MetaPeerClient>,
lock: Option<DistLockRef>,
}
impl MetaSrv {
@@ -174,6 +176,11 @@ impl MetaSrv {
self.meta_peer_client.clone()
}
#[inline]
pub fn lock(&self) -> Option<DistLockRef> {
self.lock.clone()
}
#[inline]
pub fn new_ctx(&self) -> Context {
let datanode_lease_secs = self.options().datanode_lease_secs;

View File

@@ -20,6 +20,7 @@ use crate::handler::{
CheckLeaderHandler, CollectStatsHandler, HeartbeatHandlerGroup, KeepLeaseHandler,
OnLeaderStartHandler, PersistStatsHandler, ResponseHeaderHandler,
};
use crate::lock::DistLockRef;
use crate::metasrv::{ElectionRef, MetaSrv, MetaSrvOptions, SelectorRef, TABLE_ID_SEQ};
use crate::selector::lease_based::LeaseBasedSelector;
use crate::sequence::Sequence;
@@ -35,6 +36,7 @@ pub struct MetaSrvBuilder {
handler_group: Option<HeartbeatHandlerGroup>,
election: Option<ElectionRef>,
meta_peer_client: Option<MetaPeerClient>,
lock: Option<DistLockRef>,
}
impl MetaSrvBuilder {
@@ -47,6 +49,7 @@ impl MetaSrvBuilder {
meta_peer_client: None,
election: None,
options: None,
lock: None,
}
}
@@ -85,6 +88,11 @@ impl MetaSrvBuilder {
self
}
pub fn lock(mut self, lock: Option<DistLockRef>) -> Self {
self.lock = lock;
self
}
pub async fn build(self) -> MetaSrv {
let started = Arc::new(AtomicBool::new(false));
@@ -96,6 +104,7 @@ impl MetaSrvBuilder {
in_memory,
selector,
handler_group,
lock,
} = self;
let options = options.unwrap_or_default();
@@ -136,6 +145,7 @@ impl MetaSrvBuilder {
handler_group,
election,
meta_peer_client,
lock,
}
}
}

View File

@@ -20,6 +20,7 @@ use tonic::{Response, Status};
pub mod admin;
pub mod cluster;
mod heartbeat;
pub mod lock;
pub mod router;
pub mod store;

View File

@@ -0,0 +1,55 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::{lock_server, LockRequest, LockResponse, UnlockRequest, UnlockResponse};
use snafu::OptionExt;
use tonic::{Request, Response};
use super::GrpcResult;
use crate::error;
use crate::lock::Opts;
use crate::metasrv::MetaSrv;
#[async_trait::async_trait]
impl lock_server::Lock for MetaSrv {
async fn lock(&self, request: Request<LockRequest>) -> GrpcResult<LockResponse> {
let LockRequest {
name, expire_secs, ..
} = request.into_inner();
let expire_secs = Some(expire_secs as u64);
let lock = self.lock().context(error::LockNotConfigSnafu)?;
let key = lock.lock(name, Opts { expire_secs }).await?;
let resp = LockResponse {
key,
..Default::default()
};
Ok(Response::new(resp))
}
async fn unlock(&self, request: Request<UnlockRequest>) -> GrpcResult<UnlockResponse> {
let UnlockRequest { key, .. } = request.into_inner();
let lock = self.lock().context(error::LockNotConfigSnafu)?;
let _ = lock.unlock(key).await?;
let resp = UnlockResponse {
..Default::default()
};
Ok(Response::new(resp))
}
}

View File

@@ -43,6 +43,10 @@ impl EtcdStore {
.await
.context(error::ConnectEtcdSnafu)?;
Self::with_etcd_client(client)
}
pub fn with_etcd_client(client: Client) -> Result<KvStoreRef> {
Ok(Arc::new(Self { client }))
}
}