fix: revoke meta kv writes outside metasrv leader (#8060)

* fix: revoke meta kv writes outside metasrv leader

Signed-off-by: QuakeWang <wangfuzheng0814@foxmail.com>

* fix: address meta kv write guard review comments

Signed-off-by: QuakeWang <wangfuzheng0814@foxmail.com>

* fix: change visibility of new_writable method to super

Signed-off-by: QuakeWang <wangfuzheng0814@foxmail.com>

* fix: gate direct meta store writes to tests

Signed-off-by: QuakeWang <wangfuzheng0814@foxmail.com>

* fix: keep meta store server path unchanged

Signed-off-by: QuakeWang <wangfuzheng0814@foxmail.com>

---------

Signed-off-by: QuakeWang <wangfuzheng0814@foxmail.com>
This commit is contained in:
QuakeWang
2026-05-11 10:33:35 +08:00
committed by GitHub
parent 7a285c2890
commit e203ff9e1f
16 changed files with 639 additions and 112 deletions

View File

@@ -20,6 +20,6 @@ mod table_cache;
pub use builder::{
CatalogManagerConfigurator, CatalogManagerConfiguratorRef, KvBackendCatalogManagerBuilder,
};
pub use client::{CachedKvBackend, CachedKvBackendBuilder, MetaKvBackend};
pub use client::{CachedKvBackend, CachedKvBackendBuilder, new_read_only_meta_kv_backend};
pub use manager::KvBackendCatalogManager;
pub use table_cache::{TableCache, TableCacheRef, new_table_cache};

View File

@@ -21,7 +21,10 @@ use std::time::Duration;
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::KvCacheInvalidator;
use common_meta::error::Error::CacheNotGet;
use common_meta::error::{CacheNotGetSnafu, Error, ExternalSnafu, GetKvCacheSnafu, Result};
use common_meta::error::{
CacheNotGetSnafu, Error, ExternalSnafu, GetKvCacheSnafu, Result, UnsupportedSnafu,
};
use common_meta::kv_backend::read_only::ReadOnlyKvBackend;
use common_meta::kv_backend::txn::{Txn, TxnResponse};
use common_meta::kv_backend::{KvBackend, KvBackendRef, TxnService};
use common_meta::rpc::KeyValue;
@@ -357,19 +360,35 @@ impl CachedKvBackend {
}
#[derive(Debug)]
pub struct MetaKvBackend {
pub client: Arc<MetaClient>,
pub(crate) struct MetaKvBackend {
client: Arc<MetaClient>,
}
impl MetaKvBackend {
/// Constructs a [MetaKvBackend].
pub fn new(client: Arc<MetaClient>) -> MetaKvBackend {
fn new(client: Arc<MetaClient>) -> MetaKvBackend {
MetaKvBackend { client }
}
}
pub fn new_read_only_meta_kv_backend(client: Arc<MetaClient>) -> KvBackendRef {
Arc::new(ReadOnlyKvBackend::new(Arc::new(MetaKvBackend::new(client))))
}
#[async_trait::async_trait]
impl TxnService for MetaKvBackend {
type Error = Error;
async fn txn(&self, _txn: Txn) -> Result<TxnResponse> {
UnsupportedSnafu {
operation: "MetaKvBackend txn",
}
.fail()
}
fn max_txn_ops(&self) -> usize {
usize::MAX
}
}
/// Implement `KvBackend` trait for `MetaKvBackend` instead of opendal's `Accessor` since
@@ -465,6 +484,9 @@ mod tests {
use std::sync::atomic::{AtomicU32, Ordering};
use async_trait::async_trait;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::read_only::ReadOnlyKvBackend;
use common_meta::kv_backend::txn::{Txn, TxnOp};
use common_meta::kv_backend::{KvBackend, TxnService};
use common_meta::rpc::KeyValue;
use common_meta::rpc::store::{
@@ -473,8 +495,9 @@ mod tests {
PutResponse, RangeRequest, RangeResponse,
};
use dashmap::DashMap;
use meta_client::client::MetaClientBuilder;
use super::CachedKvBackend;
use super::{CachedKvBackend, new_read_only_meta_kv_backend};
#[derive(Default)]
pub struct SimpleKvBackend {
@@ -579,6 +602,62 @@ mod tests {
}
}
#[tokio::test]
async fn test_cached_kv_backend_rejects_writes_with_read_only_inner() {
let inner = Arc::new(MemoryKvBackend::<common_meta::error::Error>::new());
let cached_kv = CachedKvBackend::wrap(Arc::new(ReadOnlyKvBackend::new(inner)));
let err = cached_kv
.put(PutRequest {
key: b"k1".to_vec(),
value: b"v1".to_vec(),
prev_kv: false,
})
.await
.unwrap_err();
assert!(matches!(
err,
common_meta::error::Error::ReadOnlyKvBackend { .. }
));
}
#[tokio::test]
async fn test_read_only_meta_kv_backend_rejects_writes() {
let meta_client = Arc::new(MetaClientBuilder::frontend_default_options().build());
let backend = new_read_only_meta_kv_backend(meta_client);
let err = backend
.put(PutRequest {
key: b"k1".to_vec(),
value: b"v1".to_vec(),
prev_kv: false,
})
.await
.unwrap_err();
assert!(matches!(
err,
common_meta::error::Error::ReadOnlyKvBackend { .. }
));
}
#[tokio::test]
async fn test_read_only_meta_kv_backend_does_not_emulate_txn() {
let meta_client = Arc::new(MetaClientBuilder::frontend_default_options().build());
let backend = new_read_only_meta_kv_backend(meta_client);
let result = backend
.txn(Txn::new().and_then(vec![TxnOp::Get(b"k1".to_vec())]))
.await;
let err = match result {
Ok(_) => panic!("expected unsupported txn error"),
Err(err) => err,
};
assert!(matches!(err, common_meta::error::Error::Unsupported { .. }));
}
async fn add_some_vals(kv_backend: &impl KvBackend) {
kv_backend
.put(PutRequest {

View File

@@ -15,7 +15,7 @@
use std::sync::Arc;
use cache::build_datanode_cache_registry;
use catalog::kvbackend::MetaKvBackend;
use catalog::kvbackend::new_read_only_meta_kv_backend;
use common_base::Plugins;
use common_meta::cache::LayeredCacheRegistryBuilder;
use common_telemetry::info;
@@ -99,9 +99,7 @@ impl InstanceBuilder {
.await
.context(MetaClientInitSnafu)?;
let backend = Arc::new(MetaKvBackend {
client: client.clone(),
});
let backend = new_read_only_meta_kv_backend(client.clone());
let mut builder = DatanodeBuilder::new(dn_opts.clone(), plugins.clone(), backend.clone());
let registry = Arc::new(

View File

@@ -19,7 +19,9 @@ use std::time::Duration;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::information_extension::DistributedInformationExtension;
use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManagerBuilder, MetaKvBackend};
use catalog::kvbackend::{
CachedKvBackendBuilder, KvBackendCatalogManagerBuilder, new_read_only_meta_kv_backend,
};
use clap::Parser;
use client::client_manager::NodeClients;
use common_base::Plugins;
@@ -46,8 +48,8 @@ use snafu::{OptionExt, ResultExt, ensure};
use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{
BuildCacheRegistrySnafu, InitMetadataSnafu, LoadLayeredConfigSnafu, MetaClientInitSnafu,
MissingConfigSnafu, OtherSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu,
BuildCacheRegistrySnafu, LoadLayeredConfigSnafu, MetaClientInitSnafu, MissingConfigSnafu,
OtherSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu,
};
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{App, create_resource_limit_metrics, log_versions, maybe_activate_heap_profile};
@@ -300,13 +302,14 @@ impl StartCommand {
let cache_ttl = meta_config.metadata_cache_ttl;
let cache_tti = meta_config.metadata_cache_tti;
let readonly_meta_backend = new_read_only_meta_kv_backend(meta_client.clone());
// TODO(discord9): add helper function to ease the creation of cache registry&such
let cached_meta_backend =
CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone())))
.cache_max_capacity(cache_max_capacity)
.cache_ttl(cache_ttl)
.cache_tti(cache_tti)
.build();
let cached_meta_backend = CachedKvBackendBuilder::new(readonly_meta_backend.clone())
.cache_max_capacity(cache_max_capacity)
.cache_ttl(cache_ttl)
.cache_tti(cache_tti)
.build();
let cached_meta_backend = Arc::new(cached_meta_backend);
// Builds cache registry
@@ -316,7 +319,7 @@ impl StartCommand {
.build(),
);
let fundamental_cache_registry =
build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone())));
build_fundamental_cache_registry(readonly_meta_backend.clone());
let layered_cache_registry = Arc::new(
with_default_composite_cache_registry(
layered_cache_builder.add_cache_registry(fundamental_cache_registry),
@@ -346,10 +349,6 @@ impl StartCommand {
let table_metadata_manager =
Arc::new(TableMetadataManager::new(cached_meta_backend.clone()));
table_metadata_manager
.init()
.await
.context(InitMetadataSnafu)?;
let executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler),

View File

@@ -23,7 +23,7 @@ use catalog::information_extension::DistributedInformationExtension;
use catalog::information_schema::InformationExtensionRef;
use catalog::kvbackend::{
CachedKvBackendBuilder, CatalogManagerConfiguratorRef, KvBackendCatalogManagerBuilder,
MetaKvBackend,
new_read_only_meta_kv_backend,
};
use catalog::process_manager::ProcessManager;
use clap::Parser;
@@ -393,13 +393,14 @@ impl StartCommand {
.await
.context(error::StartFrontendSnafu)?;
let readonly_meta_backend = new_read_only_meta_kv_backend(meta_client.clone());
// TODO(discord9): add helper function to ease the creation of cache registry&such
let cached_meta_backend =
CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone())))
.cache_max_capacity(cache_max_capacity)
.cache_ttl(cache_ttl)
.cache_tti(cache_tti)
.build();
let cached_meta_backend = CachedKvBackendBuilder::new(readonly_meta_backend.clone())
.cache_max_capacity(cache_max_capacity)
.cache_ttl(cache_ttl)
.cache_tti(cache_tti)
.build();
let cached_meta_backend = Arc::new(cached_meta_backend);
// Builds cache registry
@@ -409,7 +410,7 @@ impl StartCommand {
.build(),
);
let fundamental_cache_registry =
build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone())));
build_fundamental_cache_registry(readonly_meta_backend.clone());
let layered_cache_registry = Arc::new(
with_default_composite_cache_registry(
layered_cache_builder.add_cache_registry(fundamental_cache_registry),

View File

@@ -155,6 +155,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Trying to write to a read-only kv backend: {}", name))]
ReadOnlyKvBackend {
name: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to get procedure state receiver, procedure id: {procedure_id}"))]
ProcedureStateReceiver {
procedure_id: ProcedureId,
@@ -1146,7 +1153,7 @@ impl ErrorExt for Error {
| ColumnIdMismatch { .. }
| TimestampMismatch { .. } => StatusCode::Unexpected,
Unsupported { .. } => StatusCode::Unsupported,
Unsupported { .. } | ReadOnlyKvBackend { .. } => StatusCode::Unsupported,
WriteObject { .. } | ReadObject { .. } => StatusCode::StorageUnavailable,
SerdeJson { .. }

View File

@@ -1660,6 +1660,7 @@ mod tests {
};
use crate::kv_backend::KvBackend;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::kv_backend::read_only::ReadOnlyKvBackend;
use crate::peer::Peer;
use crate::rpc::router::{LeaderState, Region, RegionRoute, region_distribution};
use crate::rpc::store::{PutRequest, RangeRequest};
@@ -1948,6 +1949,46 @@ mod tests {
assert_eq!(region_routes[0].follower_peers[0].addr, "new-a3");
}
#[tokio::test]
async fn test_get_full_table_info_with_read_only_kv_backend() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let writable_manager = TableMetadataManager::new(mem_kv.clone());
let region_routes = vec![new_test_region_route()];
let table_info = new_test_table_info();
let table_id = table_info.ident.table_id;
create_physical_table_metadata(
&writable_manager,
table_info.clone(),
region_routes.clone(),
HashMap::new(),
)
.await
.unwrap();
let read_only_kv = Arc::new(ReadOnlyKvBackend::new(mem_kv));
let read_only_manager = TableMetadataManager::new(read_only_kv);
let (remote_table_info, remote_table_route) = read_only_manager
.get_full_table_info(table_id)
.await
.unwrap();
assert_eq!(
remote_table_info.unwrap().into_inner().table_info,
table_info
);
assert_eq!(
remote_table_route
.unwrap()
.into_inner()
.region_routes()
.unwrap(),
&region_routes
);
}
#[tokio::test]
async fn test_create_logic_tables_metadata() {
let mem_kv = Arc::new(MemoryKvBackend::default());

View File

@@ -33,6 +33,7 @@ pub mod etcd;
pub mod memory;
#[cfg(any(feature = "mysql_kvbackend", feature = "pg_kvbackend"))]
pub mod rds;
pub mod read_only;
pub mod test;
#[cfg(any(test, feature = "testing"))]
pub mod test_util;

View File

@@ -0,0 +1,348 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use crate::error::{ReadOnlyKvBackendSnafu, Result};
use crate::kv_backend::txn::{Txn, TxnOp, TxnResponse};
use crate::kv_backend::{KvBackend, KvBackendRef, TxnService};
use crate::rpc::KeyValue;
use crate::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
};
/// A [`KvBackend`] wrapper that forwards reads and rejects writes.
pub struct ReadOnlyKvBackend {
inner: KvBackendRef,
name: String,
}
impl ReadOnlyKvBackend {
pub fn new(inner: KvBackendRef) -> Self {
let name = format!("ReadOnlyKvBackend({})", inner.name());
Self { inner, name }
}
fn read_only<T>(&self) -> Result<T> {
ReadOnlyKvBackendSnafu {
name: self.name.clone(),
}
.fail()
}
fn validate_read_only_ops(&self, ops: &[TxnOp]) -> Result<()> {
if ops
.iter()
.any(|op| matches!(op, TxnOp::Put(_, _) | TxnOp::Delete(_)))
{
self.read_only()
} else {
Ok(())
}
}
fn validate_read_only_txn(&self, txn: &Txn) -> Result<()> {
self.validate_read_only_ops(&txn.req.success)?;
self.validate_read_only_ops(&txn.req.failure)
}
}
#[async_trait::async_trait]
impl TxnService for ReadOnlyKvBackend {
type Error = crate::error::Error;
async fn txn(&self, txn: Txn) -> Result<TxnResponse> {
self.validate_read_only_txn(&txn)?;
self.inner.txn(txn).await
}
fn max_txn_ops(&self) -> usize {
self.inner.max_txn_ops()
}
}
#[async_trait::async_trait]
impl KvBackend for ReadOnlyKvBackend {
fn name(&self) -> &str {
&self.name
}
fn as_any(&self) -> &dyn Any {
self
}
async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
self.inner.range(req).await
}
async fn put(&self, _req: PutRequest) -> Result<PutResponse> {
self.read_only()
}
async fn batch_put(&self, _req: BatchPutRequest) -> Result<BatchPutResponse> {
self.read_only()
}
async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
self.inner.batch_get(req).await
}
async fn compare_and_put(&self, _req: CompareAndPutRequest) -> Result<CompareAndPutResponse> {
self.read_only()
}
async fn delete_range(&self, _req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
self.read_only()
}
async fn batch_delete(&self, _req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
self.read_only()
}
async fn get(&self, key: &[u8]) -> Result<Option<KeyValue>> {
self.inner.get(key).await
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use super::*;
use crate::error::Error;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::kv_backend::txn::{Compare, CompareOp, TxnOpResponse};
use crate::rpc::store::{
BatchDeleteRequest, BatchGetRequest, BatchPutRequest, CompareAndPutRequest,
DeleteRangeRequest, PutRequest,
};
async fn read_only_backend() -> ReadOnlyKvBackend {
let inner = Arc::new(MemoryKvBackend::<Error>::new());
inner
.put(PutRequest::new().with_key(b"k1").with_value(b"v1"))
.await
.unwrap();
inner
.put(PutRequest::new().with_key(b"k2").with_value(b"v2"))
.await
.unwrap();
ReadOnlyKvBackend::new(inner)
}
fn assert_read_only<T>(result: Result<T>) {
let err = match result {
Ok(_) => panic!("expected read-only error"),
Err(err) => err,
};
assert!(matches!(err, Error::ReadOnlyKvBackend { .. }));
assert_eq!(err.status_code(), StatusCode::Unsupported);
}
struct TxnOnlyBackend;
#[async_trait::async_trait]
impl TxnService for TxnOnlyBackend {
type Error = Error;
async fn txn(&self, _txn: Txn) -> Result<TxnResponse> {
Ok(TxnResponse {
succeeded: true,
responses: vec![TxnOpResponse::ResponseGet(RangeResponse {
kvs: vec![KeyValue {
key: b"k1".to_vec(),
value: b"v1".to_vec(),
}],
more: false,
})],
})
}
fn max_txn_ops(&self) -> usize {
7
}
}
#[async_trait::async_trait]
impl KvBackend for TxnOnlyBackend {
fn name(&self) -> &str {
"TxnOnlyBackend"
}
fn as_any(&self) -> &dyn Any {
self
}
async fn range(&self, _req: RangeRequest) -> Result<RangeResponse> {
unimplemented!("read-only txn should delegate to inner txn")
}
async fn put(&self, _req: PutRequest) -> Result<PutResponse> {
unimplemented!("read-only txn should delegate to inner txn")
}
async fn batch_put(&self, _req: BatchPutRequest) -> Result<BatchPutResponse> {
unimplemented!("read-only txn should delegate to inner txn")
}
async fn batch_get(&self, _req: BatchGetRequest) -> Result<BatchGetResponse> {
unimplemented!("read-only txn should delegate to inner txn")
}
async fn delete_range(&self, _req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
unimplemented!("read-only txn should delegate to inner txn")
}
async fn batch_delete(&self, _req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
unimplemented!("read-only txn should delegate to inner txn")
}
}
#[tokio::test]
async fn test_read_only_backend_forwards_reads() {
let backend = read_only_backend().await;
let range = backend
.range(RangeRequest::new().with_key(b"k1"))
.await
.unwrap();
assert_eq!(range.kvs.len(), 1);
assert_eq!(range.kvs[0].value, b"v1");
let kv = backend.get(b"k2").await.unwrap().unwrap();
assert_eq!(kv.value, b"v2");
let batch = backend
.batch_get(BatchGetRequest::new().add_key(b"k1").add_key(b"k2"))
.await
.unwrap();
assert_eq!(batch.kvs.len(), 2);
}
#[tokio::test]
async fn test_read_only_backend_rejects_writes() {
let backend = read_only_backend().await;
assert_read_only(
backend
.put(PutRequest::new().with_key(b"k3").with_value(b"v3"))
.await,
);
assert_read_only(
backend
.batch_put(BatchPutRequest::new().add_kv(b"k3", b"v3"))
.await,
);
assert_read_only(
backend
.compare_and_put(
CompareAndPutRequest::new()
.with_key(b"k1")
.with_expect(b"v1")
.with_value(b"v3"),
)
.await,
);
assert_read_only(
backend
.delete_range(DeleteRangeRequest::new().with_key(b"k1"))
.await,
);
assert_read_only(
backend
.batch_delete(BatchDeleteRequest::new().add_key(b"k1"))
.await,
);
}
#[tokio::test]
async fn test_read_only_backend_rejects_write_txn() {
let backend = read_only_backend().await;
assert_eq!(backend.max_txn_ops(), usize::MAX);
assert_read_only(
backend
.txn(Txn::put_if_not_exists(b"k3".to_vec(), b"v3".to_vec()))
.await,
);
}
#[tokio::test]
async fn test_read_only_backend_delegates_read_txn() {
let backend = ReadOnlyKvBackend::new(Arc::new(TxnOnlyBackend));
assert_eq!(backend.max_txn_ops(), 7);
let resp = backend
.txn(Txn::new().and_then(vec![TxnOp::Get(b"k1".to_vec())]))
.await
.unwrap();
assert!(resp.succeeded);
let TxnOpResponse::ResponseGet(range) = &resp.responses[0] else {
panic!("expected get response");
};
assert_eq!(range.kvs[0].value, b"v1");
}
#[tokio::test]
async fn test_read_only_backend_allows_get_only_txn() {
let backend = read_only_backend().await;
let resp = backend
.txn(Txn::new().and_then(vec![TxnOp::Get(b"k1".to_vec()), TxnOp::Get(b"k2".to_vec())]))
.await
.unwrap();
assert!(resp.succeeded);
assert_eq!(resp.responses.len(), 2);
let TxnOpResponse::ResponseGet(range) = &resp.responses[0] else {
panic!("expected get response");
};
assert_eq!(range.kvs.len(), 1);
assert_eq!(range.kvs[0].value, b"v1");
let TxnOpResponse::ResponseGet(range) = &resp.responses[1] else {
panic!("expected get response");
};
assert_eq!(range.kvs.len(), 1);
assert_eq!(range.kvs[0].value, b"v2");
}
#[tokio::test]
async fn test_read_only_backend_allows_compare_and_get_txn() {
let backend = read_only_backend().await;
let txn = Txn::new()
.when(vec![Compare::with_value(
b"k1".to_vec(),
CompareOp::Equal,
b"v1".to_vec(),
)])
.and_then(vec![TxnOp::Get(b"k2".to_vec())])
.or_else(vec![TxnOp::Get(b"k1".to_vec())]);
let resp = backend.txn(txn).await.unwrap();
assert!(resp.succeeded);
let TxnOpResponse::ResponseGet(range) = &resp.responses[0] else {
panic!("expected get response");
};
assert_eq!(range.kvs.len(), 1);
assert_eq!(range.kvs[0].value, b"v2");
}
}

View File

@@ -16,10 +16,7 @@ use std::time::Duration;
use api::v1::meta::{HeartbeatRequest, Peer};
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::rpc::store::{
BatchDeleteRequest, BatchGetRequest, BatchPutRequest, CompareAndPutRequest, DeleteRangeRequest,
PutRequest, RangeRequest,
};
use common_meta::rpc::store::{BatchGetRequest, RangeRequest};
use meta_client::client::MetaClientBuilder;
use tracing::{Level, event, subscriber};
use tracing_subscriber::FmtSubscriber;
@@ -67,14 +64,6 @@ async fn run() {
}
});
// put
let put = PutRequest::new()
.with_key(b"key1".to_vec())
.with_value(b"value1".to_vec())
.with_prev_kv();
let res = meta_client.put(put).await.unwrap();
event!(Level::INFO, "put result: {:#?}", res);
// get
let range = RangeRequest::new().with_key(b"key1".to_vec());
let res = meta_client.range(range.clone()).await.unwrap();
@@ -85,60 +74,10 @@ async fn run() {
let res = meta_client.range(range2.clone()).await.unwrap();
event!(Level::INFO, "get prefix result: {:#?}", res);
// batch put
let batch_put = BatchPutRequest::new()
.add_kv(b"batch_put1".to_vec(), b"batch_put_v1".to_vec())
.add_kv(b"batch_put2".to_vec(), b"batch_put_v2".to_vec())
.with_prev_kv();
let res = meta_client.batch_put(batch_put).await.unwrap();
event!(Level::INFO, "batch put result: {:#?}", res);
// cas
let cas = CompareAndPutRequest::new()
.with_key(b"batch_put1".to_vec())
.with_expect(b"batch_put_v_fail".to_vec())
.with_value(b"batch_put_v111".to_vec());
let res = meta_client.compare_and_put(cas).await.unwrap();
event!(Level::INFO, "cas 0 result: {:#?}", res);
let cas = CompareAndPutRequest::new()
.with_key(b"batch_put1".to_vec())
.with_expect(b"batch_put_v1".to_vec())
.with_value(b"batch_put_v111".to_vec());
let res = meta_client.compare_and_put(cas).await.unwrap();
event!(Level::INFO, "cas 1 result: {:#?}", res);
// delete
let delete_range = DeleteRangeRequest::new().with_key(b"key1".to_vec());
let res = meta_client.delete_range(delete_range).await.unwrap();
event!(Level::INFO, "delete range result: {:#?}", res);
// get none
let res = meta_client.range(range).await.unwrap();
event!(Level::INFO, "get range result: {:#?}", res);
// batch delete
// put two
let batch_put = BatchPutRequest::new()
.add_kv(b"batch_put1".to_vec(), b"batch_put_v1".to_vec())
.add_kv(b"batch_put2".to_vec(), b"batch_put_v2".to_vec())
.with_prev_kv();
let res = meta_client.batch_put(batch_put).await.unwrap();
event!(Level::INFO, "batch put result: {:#?}", res);
// delete one
let batch_delete = BatchDeleteRequest::new()
.add_key(b"batch_put1".to_vec())
.with_prev_kv();
let res = meta_client.batch_delete(batch_delete).await.unwrap();
event!(Level::INFO, "batch delete result: {:#?}", res);
// get other one
// batch get
let batch_get = BatchGetRequest::new()
.add_key(b"batch_put1".to_vec())
.add_key(b"batch_put2".to_vec());
.add_key(b"key1".to_vec())
.add_key(b"key2".to_vec());
let res = meta_client.batch_get(batch_get).await.unwrap();
event!(Level::INFO, "batch get result: {:#?}", res);

View File

@@ -91,6 +91,8 @@ pub struct MetaClientBuilder {
role: Role,
enable_heartbeat: bool,
enable_store: bool,
#[cfg(test)]
enable_direct_store_writes: bool,
enable_procedure: bool,
enable_access_cluster_info: bool,
region_follower: Option<RegionFollowerClientRef>,
@@ -143,6 +145,10 @@ impl MetaClientBuilder {
}
}
/// Enables the Store client in read-only mode.
///
/// Store write methods fail fast by default. Metadata writes from production
/// frontend/datanode/flownode clients should go through metasrv procedures.
pub fn enable_store(self) -> Self {
Self {
enable_store: true,
@@ -150,6 +156,18 @@ impl MetaClientBuilder {
}
}
/// Enables direct Store write RPCs for tests.
///
/// Production metadata writes should use metasrv-owned write paths instead.
#[cfg(test)]
pub(super) fn enable_direct_store_writes_for_test(self) -> Self {
Self {
enable_store: true,
enable_direct_store_writes: true,
..self
}
}
pub fn enable_procedure(self) -> Self {
Self {
enable_procedure: true,
@@ -216,9 +234,16 @@ impl MetaClientBuilder {
let config = self
.enable_heartbeat
.then(|| ConfigClient::new(self.id, self.role, mgr.clone()));
let store = self
.enable_store
.then(|| StoreClient::new(self.id, self.role, mgr.clone()));
let store = self.enable_store.then(|| {
#[cfg(test)]
{
if self.enable_direct_store_writes {
return StoreClient::new_writable(self.id, self.role, mgr.clone());
}
}
StoreClient::new(self.id, self.role, mgr.clone())
});
let procedure = self.enable_procedure.then(|| {
let mgr = self.ddl_channel_manager.unwrap_or(mgr.clone());
ProcedureClient::new(
@@ -989,6 +1014,19 @@ mod tests {
assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
}
#[tokio::test]
async fn test_store_writes_are_read_only_by_default() {
let meta_client = MetaClientBuilder::new(0, Role::Datanode)
.enable_store()
.build();
let res = meta_client.put(PutRequest::default()).await;
assert!(matches!(
res.err(),
Some(error::Error::ReadOnlyKvBackend { .. })
));
}
#[tokio::test]
async fn test_ask_leader() {
let tc = new_client("test_ask_leader").await;

View File

@@ -39,11 +39,27 @@ pub struct Client {
impl Client {
pub fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self {
Self::new_with_read_only(id, role, channel_manager, true)
}
/// Builds a writable direct Store RPC client for tests.
#[cfg(test)]
pub(super) fn new_writable(id: Id, role: Role, channel_manager: ChannelManager) -> Self {
Self::new_with_read_only(id, role, channel_manager, false)
}
fn new_with_read_only(
id: Id,
role: Role,
channel_manager: ChannelManager,
read_only: bool,
) -> Self {
let inner = Arc::new(RwLock::new(Inner {
id,
role,
channel_manager,
peers: vec![],
read_only,
}));
Self { inner }
@@ -103,6 +119,7 @@ struct Inner {
role: Role,
channel_manager: ChannelManager,
peers: Vec<String>,
read_only: bool,
}
impl Inner {
@@ -142,6 +159,8 @@ impl Inner {
}
async fn put(&self, mut req: PutRequest) -> Result<PutResponse> {
self.ensure_writable()?;
let mut client = self.random_client()?;
req.set_header(
self.id,
@@ -167,6 +186,8 @@ impl Inner {
}
async fn batch_put(&self, mut req: BatchPutRequest) -> Result<BatchPutResponse> {
self.ensure_writable()?;
let mut client = self.random_client()?;
req.set_header(
self.id,
@@ -179,6 +200,8 @@ impl Inner {
}
async fn batch_delete(&self, mut req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
self.ensure_writable()?;
let mut client = self.random_client()?;
req.set_header(
self.id,
@@ -194,6 +217,8 @@ impl Inner {
&self,
mut req: CompareAndPutRequest,
) -> Result<CompareAndPutResponse> {
self.ensure_writable()?;
let mut client = self.random_client()?;
req.set_header(
self.id,
@@ -209,6 +234,8 @@ impl Inner {
}
async fn delete_range(&self, mut req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
self.ensure_writable()?;
let mut client = self.random_client()?;
req.set_header(
self.id,
@@ -231,6 +258,17 @@ impl Inner {
self.make_client(peer)
}
fn ensure_writable(&self) -> Result<()> {
if self.read_only {
return error::ReadOnlyKvBackendSnafu {
name: "MetaClient Store".to_string(),
}
.fail();
}
Ok(())
}
fn make_client(&self, addr: impl AsRef<str>) -> Result<StoreClient<Channel>> {
let channel = self
.channel_manager
@@ -284,4 +322,26 @@ mod test {
.unwrap();
assert_eq!(1, client.inner.write().await.peers.len());
}
#[tokio::test]
async fn test_read_only_store_rejects_writes_before_rpc() {
let client = Client::new(0, Role::Frontend, ChannelManager::default());
fn assert_read_only<T>(result: Result<T>) {
assert!(matches!(
result,
Err(error::Error::ReadOnlyKvBackend { .. })
));
}
assert_read_only(client.put(PutRequest::default()).await);
assert_read_only(client.batch_put(BatchPutRequest::default()).await);
assert_read_only(client.batch_delete(BatchDeleteRequest::default()).await);
assert_read_only(
client
.compare_and_put(CompareAndPutRequest::default())
.await,
);
assert_read_only(client.delete_range(DeleteRangeRequest::default()).await);
}
}

View File

@@ -144,9 +144,10 @@ impl ErrorExt for Error {
| Error::CreateHeartbeatStream { .. }
| Error::CreateChannel { .. }
| Error::RetryTimesExceeded { .. }
| Error::ReadOnlyKvBackend { .. }
| Error::ConvertMetaConfig { .. } => StatusCode::Internal,
Error::ReadOnlyKvBackend { .. } => StatusCode::Unsupported,
Error::MetaServer { code, .. } => *code,
Error::InvalidResponseHeader { source, .. }

View File

@@ -63,6 +63,7 @@ pub async fn mock_client_by(server_addr: String, channel_manager: ChannelManager
let id = 2000u64;
let mut meta_client = MetaClientBuilder::datanode_default_options(id)
.enable_access_cluster_info()
.enable_direct_store_writes_for_test()
.channel_manager(channel_manager)
.build();
meta_client.start(&[&server_addr]).await.unwrap();

View File

@@ -26,7 +26,9 @@ use cache::{
with_default_composite_cache_registry,
};
use catalog::information_extension::DistributedInformationExtension;
use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManagerBuilder, MetaKvBackend};
use catalog::kvbackend::{
CachedKvBackendBuilder, KvBackendCatalogManagerBuilder, new_read_only_meta_kv_backend,
};
use catalog::process_manager::ProcessManager;
use client::Client;
use client::client_manager::NodeClients;
@@ -285,6 +287,9 @@ impl GreptimeDbClusterBuilder {
..Default::default()
};
test_util::prepare_another_catalog_and_schema_with_kv_backend(self.kv_backend.clone())
.await;
let metasrv = meta_srv::mocks::mock(
opt,
self.kv_backend.clone(),
@@ -307,8 +312,6 @@ impl GreptimeDbClusterBuilder {
.build_frontend(metasrv.clone(), datanode_clients, start_frontend_servers)
.await;
test_util::prepare_another_catalog_and_schema(&frontend.instance).await;
frontend.start().await.unwrap();
GreptimeDbCluster {
@@ -422,9 +425,7 @@ impl GreptimeDbClusterBuilder {
meta_client.start(&[&metasrv.server_addr]).await.unwrap();
let meta_client = Arc::new(meta_client);
let meta_backend = Arc::new(MetaKvBackend {
client: meta_client.clone(),
});
let meta_backend = new_read_only_meta_kv_backend(meta_client.clone());
let layered_cache_registry = Arc::new(
LayeredCacheRegistryBuilder::default()
@@ -456,9 +457,9 @@ impl GreptimeDbClusterBuilder {
meta_client.start(&[&metasrv.server_addr]).await.unwrap();
let meta_client = Arc::new(meta_client);
let cached_meta_backend = Arc::new(
CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone()))).build(),
);
let readonly_meta_backend = new_read_only_meta_kv_backend(meta_client.clone());
let cached_meta_backend =
Arc::new(CachedKvBackendBuilder::new(readonly_meta_backend.clone()).build());
let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry(
CacheRegistryBuilder::default()
@@ -466,7 +467,7 @@ impl GreptimeDbClusterBuilder {
.build(),
);
let fundamental_cache_registry =
build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone())));
build_fundamental_cache_registry(readonly_meta_backend.clone());
let cache_registry = Arc::new(
with_default_composite_cache_registry(
layered_cache_builder.add_cache_registry(fundamental_cache_registry),

View File

@@ -22,8 +22,10 @@ use axum::Router;
use catalog::kvbackend::KvBackendCatalogManager;
use common_base::Plugins;
use common_config::Configurable;
use common_meta::key::TableMetadataManager;
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::schema_name::SchemaNameKey;
use common_meta::kv_backend::KvBackendRef;
use common_query::Output;
use common_runtime::runtime::BuilderBuild;
use common_runtime::{Builder as RuntimeBuilder, Runtime};
@@ -907,6 +909,17 @@ pub(crate) async fn prepare_another_catalog_and_schema(instance: &Instance) {
.unwrap();
let table_metadata_manager = catalog_manager.table_metadata_manager_ref();
prepare_another_catalog_and_schema_with_manager(table_metadata_manager).await;
}
pub(crate) async fn prepare_another_catalog_and_schema_with_kv_backend(kv_backend: KvBackendRef) {
let table_metadata_manager = TableMetadataManager::new(kv_backend);
prepare_another_catalog_and_schema_with_manager(&table_metadata_manager).await;
}
async fn prepare_another_catalog_and_schema_with_manager(
table_metadata_manager: &TableMetadataManager,
) {
table_metadata_manager
.catalog_manager()
.create(CatalogNameKey::new("another_catalog"), true)