mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-04 20:32:56 +00:00
Compare commits
1 Commits
cache-logi
...
v0.10.0-ni
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
856c0280f5 |
@@ -1,129 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use api::v1::meta::Role;
|
||||
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
|
||||
use common_meta::rpc::lock::{LockRequest, UnlockRequest};
|
||||
use meta_client::client::MetaClientBuilder;
|
||||
use meta_client::MetaClientRef;
|
||||
use tracing::{info, subscriber};
|
||||
use tracing_subscriber::FmtSubscriber;
|
||||
|
||||
fn main() {
|
||||
subscriber::set_global_default(FmtSubscriber::builder().finish()).unwrap();
|
||||
run();
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn run() {
|
||||
let id = (1000u64, 2000u64);
|
||||
let config = ChannelConfig::new()
|
||||
.timeout(Duration::from_secs(30))
|
||||
.connect_timeout(Duration::from_secs(5))
|
||||
.tcp_nodelay(true);
|
||||
let channel_manager = ChannelManager::with_config(config);
|
||||
let mut meta_client = MetaClientBuilder::new(id.0, id.1, Role::Datanode)
|
||||
.enable_lock()
|
||||
.channel_manager(channel_manager)
|
||||
.build();
|
||||
meta_client.start(&["127.0.0.1:3002"]).await.unwrap();
|
||||
let meta_client = Arc::new(meta_client);
|
||||
|
||||
run_normal(meta_client.clone()).await;
|
||||
|
||||
run_multi_thread(meta_client.clone()).await;
|
||||
|
||||
run_multi_thread_with_one_timeout(meta_client).await;
|
||||
}
|
||||
|
||||
async fn run_normal(meta_client: MetaClientRef) {
|
||||
let name = "lock_name".as_bytes().to_vec();
|
||||
let expire_secs = 60;
|
||||
|
||||
let lock_req = LockRequest { name, expire_secs };
|
||||
|
||||
let lock_result = meta_client.lock(lock_req).await.unwrap();
|
||||
let key = lock_result.key;
|
||||
info!(
|
||||
"lock success! Returned key: {}",
|
||||
String::from_utf8(key.clone()).unwrap()
|
||||
);
|
||||
|
||||
// It is recommended that time of holding lock is less than the timeout of the grpc channel
|
||||
info!("do some work, take 3 seconds");
|
||||
tokio::time::sleep(Duration::from_secs(3)).await;
|
||||
|
||||
let unlock_req = UnlockRequest { key };
|
||||
|
||||
meta_client.unlock(unlock_req).await.unwrap();
|
||||
info!("unlock success!");
|
||||
}
|
||||
|
||||
async fn run_multi_thread(meta_client: MetaClientRef) {
|
||||
let meta_client_clone = meta_client.clone();
|
||||
let join1 = tokio::spawn(async move {
|
||||
run_normal(meta_client_clone.clone()).await;
|
||||
});
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
|
||||
let join2 = tokio::spawn(async move {
|
||||
run_normal(meta_client).await;
|
||||
});
|
||||
|
||||
join1.await.unwrap();
|
||||
join2.await.unwrap();
|
||||
}
|
||||
|
||||
async fn run_multi_thread_with_one_timeout(meta_client: MetaClientRef) {
|
||||
let meta_client_clone = meta_client.clone();
|
||||
let join1 = tokio::spawn(async move {
|
||||
run_with_timeout(meta_client_clone.clone()).await;
|
||||
});
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
|
||||
let join2 = tokio::spawn(async move {
|
||||
run_normal(meta_client).await;
|
||||
});
|
||||
|
||||
join1.await.unwrap();
|
||||
join2.await.unwrap();
|
||||
}
|
||||
|
||||
async fn run_with_timeout(meta_client: MetaClientRef) {
|
||||
let name = "lock_name".as_bytes().to_vec();
|
||||
let expire_secs = 5;
|
||||
|
||||
let lock_req = LockRequest { name, expire_secs };
|
||||
|
||||
let lock_result = meta_client.lock(lock_req).await.unwrap();
|
||||
let key = lock_result.key;
|
||||
info!(
|
||||
"lock success! Returned key: {}",
|
||||
String::from_utf8(key.clone()).unwrap()
|
||||
);
|
||||
|
||||
// It is recommended that time of holding lock is less than the timeout of the grpc channel
|
||||
info!("do some work, take 20 seconds");
|
||||
tokio::time::sleep(Duration::from_secs(20)).await;
|
||||
|
||||
let unlock_req = UnlockRequest { key };
|
||||
|
||||
meta_client.unlock(unlock_req).await.unwrap();
|
||||
info!("unlock success!");
|
||||
}
|
||||
@@ -15,7 +15,6 @@
|
||||
mod ask_leader;
|
||||
mod heartbeat;
|
||||
mod load_balance;
|
||||
mod lock;
|
||||
mod procedure;
|
||||
|
||||
mod cluster;
|
||||
@@ -33,7 +32,6 @@ use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, RegionStat};
|
||||
use common_meta::ddl::{ExecutorContext, ProcedureExecutor};
|
||||
use common_meta::error::{self as meta_error, Result as MetaResult};
|
||||
use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
|
||||
use common_meta::rpc::lock::{LockRequest, LockResponse, UnlockRequest};
|
||||
use common_meta::rpc::procedure::{
|
||||
MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
|
||||
};
|
||||
@@ -45,7 +43,6 @@ use common_meta::rpc::store::{
|
||||
use common_meta::ClusterId;
|
||||
use common_telemetry::info;
|
||||
use heartbeat::Client as HeartbeatClient;
|
||||
use lock::Client as LockClient;
|
||||
use procedure::Client as ProcedureClient;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store::Client as StoreClient;
|
||||
@@ -67,7 +64,6 @@ pub struct MetaClientBuilder {
|
||||
role: Role,
|
||||
enable_heartbeat: bool,
|
||||
enable_store: bool,
|
||||
enable_lock: bool,
|
||||
enable_procedure: bool,
|
||||
enable_access_cluster_info: bool,
|
||||
channel_manager: Option<ChannelManager>,
|
||||
@@ -123,13 +119,6 @@ impl MetaClientBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn enable_lock(self) -> Self {
|
||||
Self {
|
||||
enable_lock: true,
|
||||
..self
|
||||
}
|
||||
}
|
||||
|
||||
pub fn enable_procedure(self) -> Self {
|
||||
Self {
|
||||
enable_procedure: true,
|
||||
@@ -188,10 +177,6 @@ impl MetaClientBuilder {
|
||||
client.store = Some(StoreClient::new(self.id, self.role, mgr.clone()));
|
||||
}
|
||||
|
||||
if self.enable_lock {
|
||||
client.lock = Some(LockClient::new(self.id, self.role, mgr.clone()));
|
||||
}
|
||||
|
||||
if self.enable_procedure {
|
||||
let mgr = self.ddl_channel_manager.unwrap_or(mgr.clone());
|
||||
client.procedure = Some(ProcedureClient::new(
|
||||
@@ -221,7 +206,6 @@ pub struct MetaClient {
|
||||
channel_manager: ChannelManager,
|
||||
heartbeat: Option<HeartbeatClient>,
|
||||
store: Option<StoreClient>,
|
||||
lock: Option<LockClient>,
|
||||
procedure: Option<ProcedureClient>,
|
||||
cluster: Option<ClusterClient>,
|
||||
}
|
||||
@@ -383,10 +367,6 @@ impl MetaClient {
|
||||
client.start(urls.clone()).await?;
|
||||
info!("Store client started");
|
||||
}
|
||||
if let Some(client) = &mut self.lock {
|
||||
client.start(urls.clone()).await?;
|
||||
info!("Lock client started");
|
||||
}
|
||||
if let Some(client) = &mut self.procedure {
|
||||
client.start(urls.clone()).await?;
|
||||
info!("DDL client started");
|
||||
@@ -482,15 +462,6 @@ impl MetaClient {
|
||||
.context(ConvertMetaResponseSnafu)
|
||||
}
|
||||
|
||||
pub async fn lock(&self, req: LockRequest) -> Result<LockResponse> {
|
||||
self.lock_client()?.lock(req.into()).await.map(Into::into)
|
||||
}
|
||||
|
||||
pub async fn unlock(&self, req: UnlockRequest) -> Result<()> {
|
||||
let _ = self.lock_client()?.unlock(req.into()).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Query the procedure state by its id.
|
||||
pub async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse> {
|
||||
self.procedure_client()?.query_procedure_state(pid).await
|
||||
@@ -538,12 +509,6 @@ impl MetaClient {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn lock_client(&self) -> Result<LockClient> {
|
||||
self.lock.clone().context(NotStartedSnafu {
|
||||
name: "lock_client",
|
||||
})
|
||||
}
|
||||
|
||||
pub fn procedure_client(&self) -> Result<ProcedureClient> {
|
||||
self.procedure.clone().context(NotStartedSnafu {
|
||||
name: "procedure_client",
|
||||
|
||||
@@ -1,178 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::meta::lock_client::LockClient;
|
||||
use api::v1::meta::{LockRequest, LockResponse, Role, UnlockRequest, UnlockResponse};
|
||||
use common_grpc::channel_manager::ChannelManager;
|
||||
use common_telemetry::tracing_context::TracingContext;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use tokio::sync::RwLock;
|
||||
use tonic::transport::Channel;
|
||||
|
||||
use crate::client::{load_balance, Id};
|
||||
use crate::error;
|
||||
use crate::error::Result;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Client {
|
||||
inner: Arc<RwLock<Inner>>,
|
||||
}
|
||||
|
||||
impl Client {
|
||||
pub fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self {
|
||||
let inner = Arc::new(RwLock::new(Inner {
|
||||
id,
|
||||
role,
|
||||
channel_manager,
|
||||
peers: vec![],
|
||||
}));
|
||||
|
||||
Self { inner }
|
||||
}
|
||||
|
||||
pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
|
||||
where
|
||||
U: AsRef<str>,
|
||||
A: AsRef<[U]>,
|
||||
{
|
||||
let mut inner = self.inner.write().await;
|
||||
inner.start(urls).await
|
||||
}
|
||||
|
||||
pub async fn lock(&self, req: LockRequest) -> Result<LockResponse> {
|
||||
let inner = self.inner.read().await;
|
||||
inner.lock(req).await
|
||||
}
|
||||
|
||||
pub async fn unlock(&self, req: UnlockRequest) -> Result<UnlockResponse> {
|
||||
let inner = self.inner.read().await;
|
||||
inner.unlock(req).await
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Inner {
|
||||
id: Id,
|
||||
role: Role,
|
||||
channel_manager: ChannelManager,
|
||||
peers: Vec<String>,
|
||||
}
|
||||
|
||||
impl Inner {
|
||||
async fn start<U, A>(&mut self, urls: A) -> Result<()>
|
||||
where
|
||||
U: AsRef<str>,
|
||||
A: AsRef<[U]>,
|
||||
{
|
||||
ensure!(
|
||||
!self.is_started(),
|
||||
error::IllegalGrpcClientStateSnafu {
|
||||
err_msg: "Lock client already started",
|
||||
}
|
||||
);
|
||||
|
||||
self.peers = urls
|
||||
.as_ref()
|
||||
.iter()
|
||||
.map(|url| url.as_ref().to_string())
|
||||
.collect::<HashSet<_>>()
|
||||
.drain()
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn random_client(&self) -> Result<LockClient<Channel>> {
|
||||
let len = self.peers.len();
|
||||
let peer = load_balance::random_get(len, |i| Some(&self.peers[i])).context(
|
||||
error::IllegalGrpcClientStateSnafu {
|
||||
err_msg: "Empty peers, lock client may not start yet",
|
||||
},
|
||||
)?;
|
||||
|
||||
self.make_client(peer)
|
||||
}
|
||||
|
||||
fn make_client(&self, addr: impl AsRef<str>) -> Result<LockClient<Channel>> {
|
||||
let channel = self
|
||||
.channel_manager
|
||||
.get(addr)
|
||||
.context(error::CreateChannelSnafu)?;
|
||||
|
||||
Ok(LockClient::new(channel))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn is_started(&self) -> bool {
|
||||
!self.peers.is_empty()
|
||||
}
|
||||
|
||||
async fn lock(&self, mut req: LockRequest) -> Result<LockResponse> {
|
||||
let mut client = self.random_client()?;
|
||||
req.set_header(
|
||||
self.id,
|
||||
self.role,
|
||||
TracingContext::from_current_span().to_w3c(),
|
||||
);
|
||||
let res = client.lock(req).await.map_err(error::Error::from)?;
|
||||
|
||||
Ok(res.into_inner())
|
||||
}
|
||||
|
||||
async fn unlock(&self, mut req: UnlockRequest) -> Result<UnlockResponse> {
|
||||
let mut client = self.random_client()?;
|
||||
req.set_header(
|
||||
self.id,
|
||||
self.role,
|
||||
TracingContext::from_current_span().to_w3c(),
|
||||
);
|
||||
let res = client.unlock(req).await.map_err(error::Error::from)?;
|
||||
|
||||
Ok(res.into_inner())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_already_start() {
|
||||
let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default());
|
||||
client
|
||||
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
|
||||
.await
|
||||
.unwrap();
|
||||
let res = client.start(&["127.0.0.1:1002"]).await;
|
||||
assert!(res.is_err());
|
||||
assert!(matches!(
|
||||
res.err(),
|
||||
Some(error::Error::IllegalGrpcClientState { .. })
|
||||
));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_start_with_duplicate_peers() {
|
||||
let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default());
|
||||
client
|
||||
.start(&["127.0.0.1:1000", "127.0.0.1:1000", "127.0.0.1:1000"])
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(1, client.inner.write().await.peers.len());
|
||||
}
|
||||
}
|
||||
@@ -16,7 +16,6 @@ use std::sync::Arc;
|
||||
|
||||
use api::v1::meta::cluster_server::ClusterServer;
|
||||
use api::v1::meta::heartbeat_server::HeartbeatServer;
|
||||
use api::v1::meta::lock_server::LockServer;
|
||||
use api::v1::meta::procedure_service_server::ProcedureServiceServer;
|
||||
use api::v1::meta::store_server::StoreServer;
|
||||
use common_base::Plugins;
|
||||
@@ -48,8 +47,6 @@ use crate::election::etcd::EtcdElection;
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
use crate::error::InvalidArgumentsSnafu;
|
||||
use crate::error::{InitExportMetricsTaskSnafu, TomlFormatSnafu};
|
||||
use crate::lock::etcd::EtcdLock;
|
||||
use crate::lock::memory::MemLock;
|
||||
use crate::metasrv::builder::MetasrvBuilder;
|
||||
use crate::metasrv::{BackendImpl, Metasrv, MetasrvOptions, SelectorRef};
|
||||
use crate::selector::lease_based::LeaseBasedSelector;
|
||||
@@ -183,7 +180,6 @@ pub fn router(metasrv: Metasrv) -> Router {
|
||||
.add_service(HeartbeatServer::new(metasrv.clone()))
|
||||
.add_service(StoreServer::new(metasrv.clone()))
|
||||
.add_service(ClusterServer::new(metasrv.clone()))
|
||||
.add_service(LockServer::new(metasrv.clone()))
|
||||
.add_service(ProcedureServiceServer::new(metasrv.clone()))
|
||||
.add_service(admin::make_admin_service(metasrv))
|
||||
}
|
||||
@@ -193,13 +189,9 @@ pub async fn metasrv_builder(
|
||||
plugins: Plugins,
|
||||
kv_backend: Option<KvBackendRef>,
|
||||
) -> Result<MetasrvBuilder> {
|
||||
let (kv_backend, election, lock) = match (kv_backend, &opts.backend) {
|
||||
(Some(kv_backend), _) => (kv_backend, None, Some(Arc::new(MemLock::default()) as _)),
|
||||
(None, BackendImpl::MemoryStore) => (
|
||||
Arc::new(MemoryKvBackend::new()) as _,
|
||||
None,
|
||||
Some(Arc::new(MemLock::default()) as _),
|
||||
),
|
||||
let (kv_backend, election) = match (kv_backend, &opts.backend) {
|
||||
(Some(kv_backend), _) => (kv_backend, None),
|
||||
(None, BackendImpl::MemoryStore) => (Arc::new(MemoryKvBackend::new()) as _, None),
|
||||
(None, BackendImpl::EtcdStore) => {
|
||||
let etcd_client = create_etcd_client(opts).await?;
|
||||
let kv_backend = {
|
||||
@@ -224,18 +216,13 @@ pub async fn metasrv_builder(
|
||||
)
|
||||
.await?,
|
||||
),
|
||||
Some(EtcdLock::with_etcd_client(
|
||||
etcd_client,
|
||||
opts.store_key_prefix.clone(),
|
||||
)?),
|
||||
)
|
||||
}
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
(None, BackendImpl::PostgresStore) => {
|
||||
let pg_client = create_postgres_client(opts).await?;
|
||||
let kv_backend = PgStore::with_pg_client(pg_client).await.unwrap();
|
||||
// TODO: implement locking and leader election for pg backend.
|
||||
(kv_backend, None, None)
|
||||
(kv_backend, None)
|
||||
}
|
||||
};
|
||||
|
||||
@@ -253,7 +240,6 @@ pub async fn metasrv_builder(
|
||||
.in_memory(in_memory)
|
||||
.selector(selector)
|
||||
.election(election)
|
||||
.lock(lock)
|
||||
.plugins(plugins))
|
||||
}
|
||||
|
||||
|
||||
@@ -448,30 +448,6 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to lock based on etcd"))]
|
||||
Lock {
|
||||
#[snafu(source)]
|
||||
error: etcd_client::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to unlock based on etcd"))]
|
||||
Unlock {
|
||||
#[snafu(source)]
|
||||
error: etcd_client::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to grant lease"))]
|
||||
LeaseGrant {
|
||||
#[snafu(source)]
|
||||
error: etcd_client::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid utf-8 value"))]
|
||||
InvalidUtf8Value {
|
||||
#[snafu(source)]
|
||||
@@ -770,9 +746,6 @@ impl ErrorExt for Error {
|
||||
| Error::ResponseHeaderNotFound { .. }
|
||||
| Error::IsNotLeader { .. }
|
||||
| Error::InvalidHttpBody { .. }
|
||||
| Error::Lock { .. }
|
||||
| Error::Unlock { .. }
|
||||
| Error::LeaseGrant { .. }
|
||||
| Error::ExceededRetryLimit { .. }
|
||||
| Error::SendShutdownSignal { .. }
|
||||
| Error::PusherNotFound { .. }
|
||||
|
||||
@@ -28,7 +28,6 @@ pub mod flow_meta_alloc;
|
||||
pub mod handler;
|
||||
pub mod key;
|
||||
pub mod lease;
|
||||
pub mod lock;
|
||||
pub mod metasrv;
|
||||
mod metrics;
|
||||
#[cfg(feature = "mock")]
|
||||
|
||||
@@ -1,99 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
pub mod etcd;
|
||||
pub(crate) mod memory;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_telemetry::error;
|
||||
|
||||
use crate::error::Result;
|
||||
|
||||
pub type Key = Vec<u8>;
|
||||
|
||||
pub const DEFAULT_EXPIRE_TIME_SECS: u64 = 10;
|
||||
|
||||
pub struct Opts {
|
||||
// If the expiration time is exceeded and currently holds the lock, the lock is
|
||||
// automatically released.
|
||||
pub expire_secs: Option<u64>,
|
||||
}
|
||||
|
||||
impl Default for Opts {
|
||||
fn default() -> Self {
|
||||
Opts {
|
||||
expire_secs: Some(DEFAULT_EXPIRE_TIME_SECS),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait DistLock: Send + Sync {
|
||||
// Lock acquires a distributed shared lock on a given named lock. On success, it
|
||||
// will return a unique key that exists so long as the lock is held by the caller.
|
||||
async fn lock(&self, name: Vec<u8>, opts: Opts) -> Result<Key>;
|
||||
|
||||
// Unlock takes a key returned by Lock and releases the hold on lock.
|
||||
async fn unlock(&self, key: Vec<u8>) -> Result<()>;
|
||||
}
|
||||
|
||||
pub type DistLockRef = Arc<dyn DistLock>;
|
||||
|
||||
pub struct DistLockGuard<'a> {
|
||||
lock: &'a DistLockRef,
|
||||
name: Vec<u8>,
|
||||
key: Option<Key>,
|
||||
}
|
||||
|
||||
impl<'a> DistLockGuard<'a> {
|
||||
pub fn new(lock: &'a DistLockRef, name: Vec<u8>) -> Self {
|
||||
Self {
|
||||
lock,
|
||||
name,
|
||||
key: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn lock(&mut self) -> Result<()> {
|
||||
if self.key.is_some() {
|
||||
return Ok(());
|
||||
}
|
||||
let key = self
|
||||
.lock
|
||||
.lock(
|
||||
self.name.clone(),
|
||||
Opts {
|
||||
expire_secs: Some(2),
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
self.key = Some(key);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for DistLockGuard<'_> {
|
||||
fn drop(&mut self) {
|
||||
if let Some(key) = self.key.take() {
|
||||
let lock = self.lock.clone();
|
||||
let name = self.name.clone();
|
||||
let _handle = common_runtime::spawn_global(async move {
|
||||
if let Err(e) = lock.unlock(key).await {
|
||||
error!(e; "Failed to unlock '{}'", String::from_utf8_lossy(&name));
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,93 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use etcd_client::{Client, LockOptions};
|
||||
use snafu::ResultExt;
|
||||
|
||||
use super::{DistLock, DistLockRef, Opts, DEFAULT_EXPIRE_TIME_SECS};
|
||||
use crate::error;
|
||||
use crate::error::Result;
|
||||
|
||||
/// A implementation of distributed lock based on etcd. The Clone of EtcdLock is cheap.
|
||||
#[derive(Clone)]
|
||||
pub struct EtcdLock {
|
||||
client: Client,
|
||||
store_key_prefix: String,
|
||||
}
|
||||
|
||||
impl EtcdLock {
|
||||
pub async fn with_endpoints<E, S>(endpoints: S, store_key_prefix: String) -> Result<DistLockRef>
|
||||
where
|
||||
E: AsRef<str>,
|
||||
S: AsRef<[E]>,
|
||||
{
|
||||
let client = Client::connect(endpoints, None)
|
||||
.await
|
||||
.context(error::ConnectEtcdSnafu)?;
|
||||
|
||||
Self::with_etcd_client(client, store_key_prefix)
|
||||
}
|
||||
|
||||
pub fn with_etcd_client(client: Client, store_key_prefix: String) -> Result<DistLockRef> {
|
||||
Ok(Arc::new(EtcdLock {
|
||||
client,
|
||||
store_key_prefix,
|
||||
}))
|
||||
}
|
||||
|
||||
fn lock_key(&self, key: Vec<u8>) -> Vec<u8> {
|
||||
if self.store_key_prefix.is_empty() {
|
||||
key
|
||||
} else {
|
||||
let mut prefix = self.store_key_prefix.as_bytes().to_vec();
|
||||
prefix.extend_from_slice(&key);
|
||||
prefix
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl DistLock for EtcdLock {
|
||||
async fn lock(&self, key: Vec<u8>, opts: Opts) -> Result<Vec<u8>> {
|
||||
let expire = opts.expire_secs.unwrap_or(DEFAULT_EXPIRE_TIME_SECS) as i64;
|
||||
|
||||
let mut client = self.client.clone();
|
||||
|
||||
let resp = client
|
||||
.lease_grant(expire, None)
|
||||
.await
|
||||
.context(error::LeaseGrantSnafu)?;
|
||||
|
||||
let lease_id = resp.id();
|
||||
let lock_opts = LockOptions::new().with_lease(lease_id);
|
||||
|
||||
let resp = client
|
||||
.lock(self.lock_key(key), Some(lock_opts))
|
||||
.await
|
||||
.context(error::LockSnafu)?;
|
||||
|
||||
Ok(resp.key().to_vec())
|
||||
}
|
||||
|
||||
async fn unlock(&self, key: Vec<u8>) -> Result<()> {
|
||||
let mut client = self.client.clone();
|
||||
let _ = client
|
||||
.unlock(self.lock_key(key))
|
||||
.await
|
||||
.context(error::UnlockSnafu)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -1,112 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use dashmap::DashMap;
|
||||
use tokio::sync::{Mutex, OwnedMutexGuard};
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::lock::{DistLock, Key, Opts};
|
||||
|
||||
#[derive(Default)]
|
||||
pub(crate) struct MemLock {
|
||||
mutexes: DashMap<Key, Arc<Mutex<()>>>,
|
||||
guards: DashMap<Key, OwnedMutexGuard<()>>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl DistLock for MemLock {
|
||||
async fn lock(&self, key: Vec<u8>, _opts: Opts) -> Result<Key> {
|
||||
let mutex = self
|
||||
.mutexes
|
||||
.entry(key.clone())
|
||||
.or_insert_with(|| Arc::new(Mutex::new(())))
|
||||
.clone();
|
||||
|
||||
let guard = mutex.lock_owned().await;
|
||||
|
||||
let _ = self.guards.insert(key.clone(), guard);
|
||||
Ok(key)
|
||||
}
|
||||
|
||||
async fn unlock(&self, key: Vec<u8>) -> Result<()> {
|
||||
// drop the guard, so that the mutex can be unlocked,
|
||||
// effectively make the `mutex.lock_owned` in `lock` method to proceed
|
||||
let _ = self.guards.remove(&key);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::HashMap;
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
|
||||
use rand::seq::SliceRandom;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_mem_lock_concurrently() {
|
||||
let lock = Arc::new(MemLock::default());
|
||||
|
||||
let keys = (0..10)
|
||||
.map(|i| format!("my-lock-{i}").into_bytes())
|
||||
.collect::<Vec<Key>>();
|
||||
let counters: [(Key, AtomicU32); 10] = keys
|
||||
.iter()
|
||||
.map(|x| (x.clone(), AtomicU32::new(0)))
|
||||
.collect::<Vec<_>>()
|
||||
.try_into()
|
||||
.unwrap();
|
||||
let counters = Arc::new(HashMap::from(counters));
|
||||
|
||||
let tasks = (0..100)
|
||||
.map(|_| {
|
||||
let mut keys = keys.clone();
|
||||
keys.shuffle(&mut rand::thread_rng());
|
||||
|
||||
let lock_clone = lock.clone();
|
||||
let counters_clone = counters.clone();
|
||||
tokio::spawn(async move {
|
||||
// every key counter will be added by 1 for 10 times
|
||||
for i in 0..100 {
|
||||
let key = &keys[i % keys.len()];
|
||||
assert!(lock_clone
|
||||
.lock(key.clone(), Opts { expire_secs: None })
|
||||
.await
|
||||
.is_ok());
|
||||
|
||||
// Intentionally create a critical section:
|
||||
// if our MemLock is flawed, the resulting counter is wrong.
|
||||
//
|
||||
// Note that AtomicU32 is only used to enable the updates from multiple tasks,
|
||||
// does not make any guarantee about the correctness of the result.
|
||||
|
||||
let counter = counters_clone.get(key).unwrap();
|
||||
let v = counter.load(Ordering::Relaxed);
|
||||
counter.store(v + 1, Ordering::Relaxed);
|
||||
|
||||
lock_clone.unlock(key.clone()).await.unwrap();
|
||||
}
|
||||
})
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let _ = futures::future::join_all(tasks).await;
|
||||
|
||||
assert!(counters.values().all(|x| x.load(Ordering::Relaxed) == 1000));
|
||||
}
|
||||
}
|
||||
@@ -57,7 +57,6 @@ use crate::error::{
|
||||
use crate::failure_detector::PhiAccrualFailureDetectorOptions;
|
||||
use crate::handler::HeartbeatHandlerGroupRef;
|
||||
use crate::lease::lookup_datanode_peer;
|
||||
use crate::lock::DistLockRef;
|
||||
use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
|
||||
use crate::procedure::ProcedureManagerListenerAdapter;
|
||||
use crate::pubsub::{PublisherRef, SubscriptionManagerRef};
|
||||
@@ -356,7 +355,6 @@ pub struct Metasrv {
|
||||
flow_selector: SelectorRef,
|
||||
handler_group: HeartbeatHandlerGroupRef,
|
||||
election: Option<ElectionRef>,
|
||||
lock: DistLockRef,
|
||||
procedure_manager: ProcedureManagerRef,
|
||||
mailbox: MailboxRef,
|
||||
procedure_executor: ProcedureExecutorRef,
|
||||
@@ -568,10 +566,6 @@ impl Metasrv {
|
||||
self.election.as_ref()
|
||||
}
|
||||
|
||||
pub fn lock(&self) -> &DistLockRef {
|
||||
&self.lock
|
||||
}
|
||||
|
||||
pub fn mailbox(&self) -> &MailboxRef {
|
||||
&self.mailbox
|
||||
}
|
||||
|
||||
@@ -52,8 +52,6 @@ use crate::handler::{
|
||||
HeartbeatHandlerGroup, HeartbeatHandlerGroupBuilder, HeartbeatMailbox, Pushers,
|
||||
};
|
||||
use crate::lease::MetaPeerLookupService;
|
||||
use crate::lock::memory::MemLock;
|
||||
use crate::lock::DistLockRef;
|
||||
use crate::metasrv::{
|
||||
ElectionRef, Metasrv, MetasrvInfo, MetasrvOptions, SelectorContext, SelectorRef, TABLE_ID_SEQ,
|
||||
};
|
||||
@@ -79,7 +77,6 @@ pub struct MetasrvBuilder {
|
||||
handler_group: Option<HeartbeatHandlerGroup>,
|
||||
election: Option<ElectionRef>,
|
||||
meta_peer_client: Option<MetaPeerClientRef>,
|
||||
lock: Option<DistLockRef>,
|
||||
node_manager: Option<NodeManagerRef>,
|
||||
plugins: Option<Plugins>,
|
||||
table_metadata_allocator: Option<TableMetadataAllocatorRef>,
|
||||
@@ -95,7 +92,6 @@ impl MetasrvBuilder {
|
||||
meta_peer_client: None,
|
||||
election: None,
|
||||
options: None,
|
||||
lock: None,
|
||||
node_manager: None,
|
||||
plugins: None,
|
||||
table_metadata_allocator: None,
|
||||
@@ -137,11 +133,6 @@ impl MetasrvBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn lock(mut self, lock: Option<DistLockRef>) -> Self {
|
||||
self.lock = lock;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn node_manager(mut self, node_manager: NodeManagerRef) -> Self {
|
||||
self.node_manager = Some(node_manager);
|
||||
self
|
||||
@@ -171,7 +162,6 @@ impl MetasrvBuilder {
|
||||
in_memory,
|
||||
selector,
|
||||
handler_group,
|
||||
lock,
|
||||
node_manager,
|
||||
plugins,
|
||||
table_metadata_allocator,
|
||||
@@ -205,7 +195,6 @@ impl MetasrvBuilder {
|
||||
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(
|
||||
leader_cached_kv_backend.clone() as _,
|
||||
));
|
||||
let lock = lock.unwrap_or_else(|| Arc::new(MemLock::default()));
|
||||
let selector_ctx = SelectorContext {
|
||||
server_addr: options.server_addr.clone(),
|
||||
datanode_lease_secs: distributed_time_constants::DATANODE_LEASE_SECS,
|
||||
@@ -384,7 +373,6 @@ impl MetasrvBuilder {
|
||||
flow_selector: Arc::new(RoundRobinSelector::new(SelectTarget::Flownode)),
|
||||
handler_group: Arc::new(handler_group),
|
||||
election,
|
||||
lock,
|
||||
procedure_manager,
|
||||
mailbox,
|
||||
procedure_executor: ddl_manager,
|
||||
|
||||
@@ -20,7 +20,6 @@ use tonic::{Response, Status};
|
||||
pub mod admin;
|
||||
pub mod cluster;
|
||||
mod heartbeat;
|
||||
pub mod lock;
|
||||
pub mod mailbox;
|
||||
pub mod procedure;
|
||||
pub mod store;
|
||||
|
||||
@@ -1,51 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use api::v1::meta::{lock_server, LockRequest, LockResponse, UnlockRequest, UnlockResponse};
|
||||
use tonic::{Request, Response};
|
||||
|
||||
use super::GrpcResult;
|
||||
use crate::lock::Opts;
|
||||
use crate::metasrv::Metasrv;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl lock_server::Lock for Metasrv {
|
||||
async fn lock(&self, request: Request<LockRequest>) -> GrpcResult<LockResponse> {
|
||||
let LockRequest {
|
||||
name, expire_secs, ..
|
||||
} = request.into_inner();
|
||||
let expire_secs = Some(expire_secs as u64);
|
||||
|
||||
let key = self.lock().lock(name, Opts { expire_secs }).await?;
|
||||
|
||||
let resp = LockResponse {
|
||||
key,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
Ok(Response::new(resp))
|
||||
}
|
||||
|
||||
async fn unlock(&self, request: Request<UnlockRequest>) -> GrpcResult<UnlockResponse> {
|
||||
let UnlockRequest { key, .. } = request.into_inner();
|
||||
|
||||
let _ = self.lock().unlock(key).await?;
|
||||
|
||||
let resp = UnlockResponse {
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
Ok(Response::new(resp))
|
||||
}
|
||||
}
|
||||
@@ -64,19 +64,15 @@ impl MetricEngineInner {
|
||||
/// Return the physical region id behind this logical region
|
||||
async fn alter_logical_region(
|
||||
&self,
|
||||
logical_region_id: RegionId,
|
||||
region_id: RegionId,
|
||||
request: RegionAlterRequest,
|
||||
) -> Result<RegionId> {
|
||||
let physical_region_id = {
|
||||
let state = &self.state.read().unwrap();
|
||||
state
|
||||
.get_physical_region_id(logical_region_id)
|
||||
.with_context(|| {
|
||||
error!("Trying to alter an nonexistent region {logical_region_id}");
|
||||
LogicalRegionNotFoundSnafu {
|
||||
region_id: logical_region_id,
|
||||
}
|
||||
})?
|
||||
state.get_physical_region_id(region_id).with_context(|| {
|
||||
error!("Trying to alter an nonexistent region {region_id}");
|
||||
LogicalRegionNotFoundSnafu { region_id }
|
||||
})?
|
||||
};
|
||||
|
||||
// only handle adding column
|
||||
@@ -91,7 +87,7 @@ impl MetricEngineInner {
|
||||
.metadata_region
|
||||
.column_semantic_type(
|
||||
metadata_region_id,
|
||||
logical_region_id,
|
||||
region_id,
|
||||
&col.column_metadata.column_schema.name,
|
||||
)
|
||||
.await?
|
||||
@@ -106,7 +102,7 @@ impl MetricEngineInner {
|
||||
self.add_columns_to_physical_data_region(
|
||||
data_region_id,
|
||||
metadata_region_id,
|
||||
logical_region_id,
|
||||
region_id,
|
||||
columns_to_add,
|
||||
)
|
||||
.await?;
|
||||
@@ -114,16 +110,10 @@ impl MetricEngineInner {
|
||||
// register columns to logical region
|
||||
for col in columns {
|
||||
self.metadata_region
|
||||
.add_column(metadata_region_id, logical_region_id, &col.column_metadata)
|
||||
.add_column(metadata_region_id, region_id, &col.column_metadata)
|
||||
.await?;
|
||||
}
|
||||
|
||||
// invalid logical column cache
|
||||
self.state
|
||||
.write()
|
||||
.unwrap()
|
||||
.invalid_logical_column_cache(logical_region_id);
|
||||
|
||||
Ok(physical_region_id)
|
||||
}
|
||||
|
||||
|
||||
@@ -169,11 +169,11 @@ impl MetricEngineInner {
|
||||
) -> Result<Vec<usize>> {
|
||||
// project on logical columns
|
||||
let all_logical_columns = self
|
||||
.load_logical_column_names(physical_region_id, logical_region_id)
|
||||
.load_logical_columns(physical_region_id, logical_region_id)
|
||||
.await?;
|
||||
let projected_logical_names = origin_projection
|
||||
.iter()
|
||||
.map(|i| all_logical_columns[*i].clone())
|
||||
.map(|i| all_logical_columns[*i].column_schema.name.clone())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// generate physical projection
|
||||
@@ -200,8 +200,10 @@ impl MetricEngineInner {
|
||||
logical_region_id: RegionId,
|
||||
) -> Result<Vec<usize>> {
|
||||
let logical_columns = self
|
||||
.load_logical_column_names(physical_region_id, logical_region_id)
|
||||
.await?;
|
||||
.load_logical_columns(physical_region_id, logical_region_id)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|col| col.column_schema.name);
|
||||
let mut projection = Vec::with_capacity(logical_columns.len());
|
||||
let data_region_id = utils::to_data_region_id(physical_region_id);
|
||||
let physical_metadata = self
|
||||
|
||||
@@ -23,25 +23,13 @@ use crate::error::Result;
|
||||
impl MetricEngineInner {
|
||||
/// Load column metadata of a logical region.
|
||||
///
|
||||
/// The return value is ordered on column name.
|
||||
/// The return value is ordered on [ColumnId].
|
||||
pub async fn load_logical_columns(
|
||||
&self,
|
||||
physical_region_id: RegionId,
|
||||
logical_region_id: RegionId,
|
||||
) -> Result<Vec<ColumnMetadata>> {
|
||||
// First try to load from state cache
|
||||
if let Some(columns) = self
|
||||
.state
|
||||
.read()
|
||||
.unwrap()
|
||||
.logical_columns()
|
||||
.get(&logical_region_id)
|
||||
{
|
||||
return Ok(columns.clone());
|
||||
}
|
||||
|
||||
// Else load from metadata region and update the cache.
|
||||
// Load logical and physical columns, and intersect them to get logical column metadata.
|
||||
// load logical and physical columns, and intersect them to get logical column metadata
|
||||
let mut logical_column_metadata = self
|
||||
.metadata_region
|
||||
.logical_columns(physical_region_id, logical_region_id)
|
||||
@@ -49,48 +37,11 @@ impl MetricEngineInner {
|
||||
.into_iter()
|
||||
.map(|(_, column_metadata)| column_metadata)
|
||||
.collect::<Vec<_>>();
|
||||
// Sort columns on column name to ensure the order
|
||||
|
||||
// sort columns on column id to ensure the order
|
||||
logical_column_metadata
|
||||
.sort_unstable_by(|c1, c2| c1.column_schema.name.cmp(&c2.column_schema.name));
|
||||
// Update cache
|
||||
self.state
|
||||
.write()
|
||||
.unwrap()
|
||||
.add_logical_columns(logical_region_id, logical_column_metadata.clone());
|
||||
|
||||
Ok(logical_column_metadata)
|
||||
}
|
||||
|
||||
/// Load logical column names of a logical region.
|
||||
///
|
||||
/// The return value is ordered on column name alphabetically.
|
||||
pub async fn load_logical_column_names(
|
||||
&self,
|
||||
physical_region_id: RegionId,
|
||||
logical_region_id: RegionId,
|
||||
) -> Result<Vec<String>> {
|
||||
// First try to load from state cache
|
||||
if let Some(columns) = self
|
||||
.state
|
||||
.read()
|
||||
.unwrap()
|
||||
.logical_columns()
|
||||
.get(&logical_region_id)
|
||||
{
|
||||
return Ok(columns
|
||||
.iter()
|
||||
.map(|c| c.column_schema.name.clone())
|
||||
.collect());
|
||||
}
|
||||
|
||||
// Else load from metadata region
|
||||
let columns = self
|
||||
.load_logical_columns(physical_region_id, logical_region_id)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|c| c.column_schema.name)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
Ok(columns)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,7 +17,6 @@
|
||||
use std::collections::{HashMap, HashSet};
|
||||
|
||||
use snafu::OptionExt;
|
||||
use store_api::metadata::ColumnMetadata;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::error::{PhysicalRegionNotFoundSnafu, Result};
|
||||
@@ -36,10 +35,6 @@ pub(crate) struct MetricEngineState {
|
||||
/// Cache for the columns of physical regions.
|
||||
/// The region id in key is the data region id.
|
||||
physical_columns: HashMap<RegionId, HashSet<String>>,
|
||||
/// Cache for the column metadata of logical regions.
|
||||
/// The column order is the same with the order in the metadata, which is
|
||||
/// alphabetically ordered on column name.
|
||||
logical_columns: HashMap<RegionId, Vec<ColumnMetadata>>,
|
||||
}
|
||||
|
||||
impl MetricEngineState {
|
||||
@@ -85,21 +80,6 @@ impl MetricEngineState {
|
||||
.insert(logical_region_id, physical_region_id);
|
||||
}
|
||||
|
||||
/// Add and reorder logical columns.
|
||||
///
|
||||
/// Caller should make sure:
|
||||
/// 1. there is no duplicate columns
|
||||
/// 2. the column order is the same with the order in the metadata, which is
|
||||
/// alphabetically ordered on column name.
|
||||
pub fn add_logical_columns(
|
||||
&mut self,
|
||||
logical_region_id: RegionId,
|
||||
new_columns: impl IntoIterator<Item = ColumnMetadata>,
|
||||
) {
|
||||
let columns = self.logical_columns.entry(logical_region_id).or_default();
|
||||
columns.extend(new_columns);
|
||||
}
|
||||
|
||||
pub fn get_physical_region_id(&self, logical_region_id: RegionId) -> Option<RegionId> {
|
||||
self.logical_regions.get(&logical_region_id).copied()
|
||||
}
|
||||
@@ -108,10 +88,6 @@ impl MetricEngineState {
|
||||
&self.physical_columns
|
||||
}
|
||||
|
||||
pub fn logical_columns(&self) -> &HashMap<RegionId, Vec<ColumnMetadata>> {
|
||||
&self.logical_columns
|
||||
}
|
||||
|
||||
pub fn physical_regions(&self) -> &HashMap<RegionId, HashSet<RegionId>> {
|
||||
&self.physical_regions
|
||||
}
|
||||
@@ -153,15 +129,9 @@ impl MetricEngineState {
|
||||
.unwrap() // Safety: physical_region_id is got from physical_regions
|
||||
.remove(&logical_region_id);
|
||||
|
||||
self.logical_columns.remove(&logical_region_id);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn invalid_logical_column_cache(&mut self, logical_region_id: RegionId) {
|
||||
self.logical_columns.remove(&logical_region_id);
|
||||
}
|
||||
|
||||
pub fn is_logical_region_exist(&self, logical_region_id: RegionId) -> bool {
|
||||
self.logical_regions().contains_key(&logical_region_id)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user