From fa4a497d758fb9b95646593b01163f49a4d3f498 Mon Sep 17 00:00:00 2001 From: fys <40801205+Fengys123@users.noreply.github.com> Date: Wed, 24 May 2023 15:07:29 +0800 Subject: [PATCH] feat: add cache for catalog kv backend (#1592) * feat: add kvbackend cache * fix: cargo fmt --- Cargo.lock | 62 ++++++++++- src/catalog/Cargo.toml | 1 + src/catalog/src/error.rs | 5 +- src/catalog/src/lib.rs | 1 + src/catalog/src/metrics.rs | 3 + src/catalog/src/remote.rs | 13 ++- src/catalog/src/remote/client.rs | 123 ++++++++++++++++++++-- src/catalog/tests/mock.rs | 14 ++- src/catalog/tests/remote_catalog_tests.rs | 60 ++++++++++- src/cmd/src/cli/repl.rs | 9 +- src/datanode/src/instance.rs | 10 +- src/frontend/src/catalog.rs | 18 +++- src/frontend/src/instance.rs | 14 +-- src/frontend/src/instance/distributed.rs | 29 +++++ 14 files changed, 323 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ce65ac5592..550f189efa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1204,6 +1204,7 @@ dependencies = [ "meta-client", "metrics", "mito", + "moka 0.11.0", "object-store", "parking_lot", "regex", @@ -3113,7 +3114,7 @@ dependencies = [ "meter-core", "meter-macros", "mito", - "moka", + "moka 0.9.7", "object-store", "openmetrics-parser", "partition", @@ -4775,6 +4776,15 @@ dependencies = [ "libc", ] +[[package]] +name = "mach2" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d0d1830bcd151a6fc4aea1369af235b36c1528fe976b8ff678683c9995eade8" +dependencies = [ + "libc", +] + [[package]] name = "maplit" version = "1.0.2" @@ -4985,7 +4995,7 @@ dependencies = [ "metrics-util", "parking_lot", "portable-atomic", - "quanta", + "quanta 0.10.1", "thiserror", ] @@ -5016,7 +5026,7 @@ dependencies = [ "ordered-float 2.10.0", "parking_lot", "portable-atomic", - "quanta", + "quanta 0.10.1", "radix_trie", "sketches-ddsketch", ] @@ -5115,7 +5125,33 @@ dependencies = [ "num_cpus", "once_cell", "parking_lot", - "quanta", + "quanta 0.10.1", + "rustc_version 0.4.0", + "scheduled-thread-pool", + "skeptic", + "smallvec", + "tagptr", + "thiserror", + "triomphe", + "uuid", +] + +[[package]] +name = "moka" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "934030d03f6191edbb4ba16835ccdb80d560788ac686570a8e2986a0fb59ded8" +dependencies = [ + "async-io", + "async-lock", + "crossbeam-channel", + "crossbeam-epoch", + "crossbeam-utils", + "futures-util", + "num_cpus", + "once_cell", + "parking_lot", + "quanta 0.11.0", "rustc_version 0.4.0", "scheduled-thread-pool", "skeptic", @@ -5847,7 +5883,7 @@ dependencies = [ "datafusion-expr", "datatypes", "meta-client", - "moka", + "moka 0.9.7", "serde", "serde_json", "snafu", @@ -6578,6 +6614,22 @@ dependencies = [ "winapi", ] +[[package]] +name = "quanta" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8cc73c42f9314c4bdce450c77e6f09ecbddefbeddb1b5979ded332a3913ded33" +dependencies = [ + "crossbeam-utils", + "libc", + "mach2", + "once_cell", + "raw-cpuid", + "wasi 0.11.0+wasi-snapshot-preview1", + "web-sys", + "winapi", +] + [[package]] name = "query" version = "0.2.0" diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index 5ceb7b2954..115b4f8c18 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -29,6 +29,7 @@ key-lock = "0.1" lazy_static = "1.4" meta-client = { path = "../meta-client" } metrics.workspace = true +moka = { version = "0.11", features = ["future"] } parking_lot = "0.12" regex = "1.6" serde = "1.0" diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index 072f382f36..79b2c57ab3 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -233,6 +233,9 @@ pub enum Error { #[snafu(backtrace)] source: table::error::Error, }, + + #[snafu(display("A generic error has occurred, msg: {}", msg))] + Generic { msg: String, location: Location }, } pub type Result = std::result::Result; @@ -253,7 +256,7 @@ impl ErrorExt for Error { | Error::EmptyValue { .. } | Error::ValueDeserialize { .. } => StatusCode::StorageUnavailable, - Error::SystemCatalogTypeMismatch { .. } => StatusCode::Internal, + Error::Generic { .. } | Error::SystemCatalogTypeMismatch { .. } => StatusCode::Internal, Error::ReadSystemCatalog { source, .. } | Error::CreateRecordBatch { source } => { source.status_code() diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index ad8a1735cc..09876a2dc0 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![feature(trait_upcasting)] #![feature(assert_matches)] use std::any::Any; diff --git a/src/catalog/src/metrics.rs b/src/catalog/src/metrics.rs index 7597150207..6e481c15e2 100644 --- a/src/catalog/src/metrics.rs +++ b/src/catalog/src/metrics.rs @@ -20,6 +20,9 @@ pub(crate) const METRIC_CATALOG_MANAGER_CATALOG_COUNT: &str = "catalog.catalog_c pub(crate) const METRIC_CATALOG_MANAGER_SCHEMA_COUNT: &str = "catalog.schema_count"; pub(crate) const METRIC_CATALOG_MANAGER_TABLE_COUNT: &str = "catalog.table_count"; +pub(crate) const METRIC_CATALOG_KV_REMOTE_GET: &str = "catalog.kv.get.remote"; +pub(crate) const METRIC_CATALOG_KV_GET: &str = "catalog.kv.get"; + #[inline] pub(crate) fn db_label(catalog: &str, schema: &str) -> (&'static str, String) { (METRIC_DB_LABEL, build_db_string(catalog, schema)) diff --git a/src/catalog/src/remote.rs b/src/catalog/src/remote.rs index f66cc40963..431cbbed34 100644 --- a/src/catalog/src/remote.rs +++ b/src/catalog/src/remote.rs @@ -16,7 +16,7 @@ use std::fmt::Debug; use std::pin::Pin; use std::sync::Arc; -pub use client::MetaKvBackend; +pub use client::CachedMetaKvBackend; use futures::Stream; use futures_util::StreamExt; pub use manager::{RemoteCatalogManager, RemoteCatalogProvider, RemoteSchemaProvider}; @@ -74,6 +74,13 @@ pub trait KvBackend: Send + Sync { pub type KvBackendRef = Arc; +#[async_trait::async_trait] +pub trait KvCacheInvalidator: Send + Sync { + async fn invalidate_key(&self, key: &[u8]); +} + +pub type KvCacheInvalidatorRef = Arc; + #[cfg(test)] mod tests { use async_stream::stream; @@ -119,12 +126,16 @@ mod tests { #[tokio::test] async fn test_get() { let backend = MockKvBackend {}; + let result = backend.get(0.to_string().as_bytes()).await; assert_eq!(0.to_string().as_bytes(), result.unwrap().unwrap().0); + let result = backend.get(1.to_string().as_bytes()).await; assert_eq!(1.to_string().as_bytes(), result.unwrap().unwrap().0); + let result = backend.get(2.to_string().as_bytes()).await; assert_eq!(2.to_string().as_bytes(), result.unwrap().unwrap().0); + let result = backend.get(3.to_string().as_bytes()).await; assert!(result.unwrap().is_none()); } diff --git a/src/catalog/src/remote/client.rs b/src/catalog/src/remote/client.rs index b981a89bea..64ca217c06 100644 --- a/src/catalog/src/remote/client.rs +++ b/src/catalog/src/remote/client.rs @@ -14,15 +14,124 @@ use std::fmt::Debug; use std::sync::Arc; +use std::time::Duration; use async_stream::stream; use common_meta::rpc::store::{CompareAndPutRequest, DeleteRangeRequest, PutRequest, RangeRequest}; -use common_telemetry::info; +use common_telemetry::{info, timer}; use meta_client::client::MetaClient; +use moka::future::{Cache, CacheBuilder}; use snafu::ResultExt; -use crate::error::{Error, MetaSrvSnafu}; -use crate::remote::{Kv, KvBackend, ValueIter}; +use super::KvCacheInvalidator; +use crate::error::{Error, GenericSnafu, MetaSrvSnafu, Result}; +use crate::metrics::{METRIC_CATALOG_KV_GET, METRIC_CATALOG_KV_REMOTE_GET}; +use crate::remote::{Kv, KvBackend, KvBackendRef, ValueIter}; + +const CACHE_MAX_CAPACITY: u64 = 10000; +const CACHE_TTL_SECOND: u64 = 10 * 60; +const CACHE_TTI_SECOND: u64 = 5 * 60; + +pub struct CachedMetaKvBackend { + kv_backend: KvBackendRef, + cache: Arc, Option>>, +} + +#[async_trait::async_trait] +impl KvBackend for CachedMetaKvBackend { + fn range<'a, 'b>(&'a self, key: &[u8]) -> ValueIter<'b, Error> + where + 'a: 'b, + { + self.kv_backend.range(key) + } + + async fn get(&self, key: &[u8]) -> Result> { + let _timer = timer!(METRIC_CATALOG_KV_GET); + + let init = async { + let _timer = timer!(METRIC_CATALOG_KV_REMOTE_GET); + + self.kv_backend.get(key).await + }; + + let schema_provider = self.cache.try_get_with_by_ref(key, init).await; + schema_provider.map_err(|e| GenericSnafu { msg: e.to_string() }.build()) + } + + async fn set(&self, key: &[u8], val: &[u8]) -> Result<()> { + let ret = self.kv_backend.set(key, val).await; + + if ret.is_ok() { + self.invalidate_key(key).await; + } + + ret + } + + async fn delete(&self, key: &[u8]) -> Result<()> { + let ret = self.kv_backend.delete_range(key, &[]).await; + + if ret.is_ok() { + self.invalidate_key(key).await; + } + + ret + } + + async fn delete_range(&self, _key: &[u8], _end: &[u8]) -> Result<()> { + // TODO(fys): implement it + unimplemented!() + } + + async fn compare_and_set( + &self, + key: &[u8], + expect: &[u8], + val: &[u8], + ) -> Result>>> { + let ret = self.kv_backend.compare_and_set(key, expect, val).await; + + if ret.is_ok() { + self.invalidate_key(key).await; + } + + ret + } +} + +#[async_trait::async_trait] +impl KvCacheInvalidator for CachedMetaKvBackend { + async fn invalidate_key(&self, key: &[u8]) { + self.cache.invalidate(key).await + } +} + +impl CachedMetaKvBackend { + pub fn new(client: Arc) -> Self { + let cache = Arc::new( + CacheBuilder::new(CACHE_MAX_CAPACITY) + .time_to_live(Duration::from_secs(CACHE_TTL_SECOND)) + .time_to_idle(Duration::from_secs(CACHE_TTI_SECOND)) + .build(), + ); + let kv_backend = Arc::new(MetaKvBackend { client }); + + Self { kv_backend, cache } + } + + pub fn wrap(kv_backend: KvBackendRef) -> Self { + let cache = Arc::new( + CacheBuilder::new(CACHE_MAX_CAPACITY) + .time_to_live(Duration::from_secs(CACHE_TTL_SECOND)) + .time_to_idle(Duration::from_secs(CACHE_TTI_SECOND)) + .build(), + ); + + Self { kv_backend, cache } + } +} + #[derive(Debug)] pub struct MetaKvBackend { pub client: Arc, @@ -51,7 +160,7 @@ impl KvBackend for MetaKvBackend { })) } - async fn get(&self, key: &[u8]) -> Result, Error> { + async fn get(&self, key: &[u8]) -> Result> { let mut response = self .client .range(RangeRequest::new().with_key(key)) @@ -63,7 +172,7 @@ impl KvBackend for MetaKvBackend { .map(|kv| Kv(kv.take_key(), kv.take_value()))) } - async fn set(&self, key: &[u8], val: &[u8]) -> Result<(), Error> { + async fn set(&self, key: &[u8], val: &[u8]) -> Result<()> { let req = PutRequest::new() .with_key(key.to_vec()) .with_value(val.to_vec()); @@ -71,7 +180,7 @@ impl KvBackend for MetaKvBackend { Ok(()) } - async fn delete_range(&self, key: &[u8], end: &[u8]) -> Result<(), Error> { + async fn delete_range(&self, key: &[u8], end: &[u8]) -> Result<()> { let req = DeleteRangeRequest::new().with_range(key.to_vec(), end.to_vec()); let resp = self.client.delete_range(req).await.context(MetaSrvSnafu)?; info!( @@ -89,7 +198,7 @@ impl KvBackend for MetaKvBackend { key: &[u8], expect: &[u8], val: &[u8], - ) -> Result>>, Error> { + ) -> Result>>> { let request = CompareAndPutRequest::new() .with_key(key.to_vec()) .with_expect(expect.to_vec()) diff --git a/src/catalog/tests/mock.rs b/src/catalog/tests/mock.rs index b28094351a..70937bb535 100644 --- a/src/catalog/tests/mock.rs +++ b/src/catalog/tests/mock.rs @@ -139,12 +139,16 @@ impl KvBackend for MockKvBackend { } async fn delete_range(&self, key: &[u8], end: &[u8]) -> Result<(), Error> { - let start = key.to_vec(); - let end = end.to_vec(); - let range = start..end; - let mut map = self.map.write().await; - map.retain(|k, _| !range.contains(k)); + if end.is_empty() { + let _ = map.remove(key); + } else { + let start = key.to_vec(); + let end = end.to_vec(); + let range = start..end; + + map.retain(|k, _| !range.contains(k)); + } Ok(()) } } diff --git a/src/catalog/tests/remote_catalog_tests.rs b/src/catalog/tests/remote_catalog_tests.rs index 42bc41f7e0..f577844bae 100644 --- a/src/catalog/tests/remote_catalog_tests.rs +++ b/src/catalog/tests/remote_catalog_tests.rs @@ -24,7 +24,8 @@ mod tests { use catalog::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue}; use catalog::remote::{ - KvBackend, KvBackendRef, RemoteCatalogManager, RemoteCatalogProvider, RemoteSchemaProvider, + CachedMetaKvBackend, KvBackend, KvBackendRef, RemoteCatalogManager, RemoteCatalogProvider, + RemoteSchemaProvider, }; use catalog::{CatalogManager, RegisterTableRequest}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE}; @@ -76,6 +77,52 @@ mod tests { ); } + #[tokio::test] + async fn test_cached_backend() { + common_telemetry::init_default_ut_logging(); + let backend = CachedMetaKvBackend::wrap(Arc::new(MockKvBackend::default())); + + let default_catalog_key = CatalogKey { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + } + .to_string(); + + backend + .set( + default_catalog_key.as_bytes(), + &CatalogValue {}.as_bytes().unwrap(), + ) + .await + .unwrap(); + + let ret = backend.get(b"__c-greptime").await.unwrap(); + assert!(ret.is_some()); + + let _ = backend + .compare_and_set( + b"__c-greptime", + &CatalogValue {}.as_bytes().unwrap(), + b"123", + ) + .await + .unwrap(); + + let ret = backend.get(b"__c-greptime").await.unwrap(); + assert!(ret.is_some()); + assert_eq!(&b"123"[..], &(ret.as_ref().unwrap().1)); + + let _ = backend.set(b"__c-greptime", b"1234").await; + + let ret = backend.get(b"__c-greptime").await.unwrap(); + assert!(ret.is_some()); + assert_eq!(&b"1234"[..], &(ret.as_ref().unwrap().1)); + + backend.delete(b"__c-greptime").await.unwrap(); + + let ret = backend.get(b"__c-greptime").await.unwrap(); + assert!(ret.is_none()); + } + async fn prepare_components( node_id: u64, ) -> ( @@ -84,17 +131,22 @@ mod tests { Arc, TableEngineManagerRef, ) { - let backend = Arc::new(MockKvBackend::default()) as KvBackendRef; + let cached_backend = Arc::new(CachedMetaKvBackend::wrap( + Arc::new(MockKvBackend::default()), + )); + let table_engine = Arc::new(MockTableEngine::default()); let engine_manager = Arc::new(MemoryTableEngineManager::alias( MITO_ENGINE.to_string(), table_engine.clone(), )); + let catalog_manager = - RemoteCatalogManager::new(engine_manager.clone(), node_id, backend.clone()); + RemoteCatalogManager::new(engine_manager.clone(), node_id, cached_backend.clone()); catalog_manager.start().await.unwrap(); + ( - backend, + cached_backend, table_engine, Arc::new(catalog_manager), engine_manager as Arc<_>, diff --git a/src/cmd/src/cli/repl.rs b/src/cmd/src/cli/repl.rs index 1ae27a8476..514a36f51a 100644 --- a/src/cmd/src/cli/repl.rs +++ b/src/cmd/src/cli/repl.rs @@ -16,7 +16,7 @@ use std::path::PathBuf; use std::sync::Arc; use std::time::Instant; -use catalog::remote::MetaKvBackend; +use catalog::remote::CachedMetaKvBackend; use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::prelude::ErrorExt; use common_query::Output; @@ -253,9 +253,7 @@ async fn create_query_engine(meta_addr: &str) -> Result { .context(StartMetaClientSnafu)?; let meta_client = Arc::new(meta_client); - let backend = Arc::new(MetaKvBackend { - client: meta_client.clone(), - }); + let cached_meta_backend = Arc::new(CachedMetaKvBackend::new(meta_client.clone())); let table_routes = Arc::new(TableRoutes::new(meta_client)); let partition_manager = Arc::new(PartitionRuleManager::new(table_routes)); @@ -263,7 +261,8 @@ async fn create_query_engine(meta_addr: &str) -> Result { let datanode_clients = Arc::new(DatanodeClients::default()); let catalog_list = Arc::new(FrontendCatalogManager::new( - backend, + cached_meta_backend.clone(), + cached_meta_backend, partition_manager, datanode_clients, )); diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 8ac6ae38b5..5903359a27 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -18,7 +18,7 @@ use std::time::Duration; use std::{fs, path}; use api::v1::meta::Role; -use catalog::remote::MetaKvBackend; +use catalog::remote::CachedMetaKvBackend; use catalog::{CatalogManager, CatalogManagerRef, RegisterTableRequest}; use common_base::paths::{CLUSTER_DIR, WAL_DIR}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID}; @@ -186,12 +186,14 @@ impl Instance { } Mode::Distributed => { + let kv_backend = Arc::new(CachedMetaKvBackend::new( + meta_client.as_ref().unwrap().clone(), + )); + let catalog = Arc::new(catalog::remote::RemoteCatalogManager::new( engine_manager.clone(), opts.node_id.context(MissingNodeIdSnafu)?, - Arc::new(MetaKvBackend { - client: meta_client.as_ref().unwrap().clone(), - }), + kv_backend, )); (catalog as CatalogManagerRef, None) } diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index e90c1832ed..58a58d62a1 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -26,7 +26,7 @@ use catalog::helper::{ build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, SchemaKey, TableGlobalKey, TableGlobalValue, }; -use catalog::remote::{Kv, KvBackendRef}; +use catalog::remote::{Kv, KvBackendRef, KvCacheInvalidatorRef}; use catalog::{ CatalogManager, CatalogProvider, CatalogProviderRef, DeregisterTableRequest, RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest, @@ -51,6 +51,7 @@ use crate::table::DistTable; #[derive(Clone)] pub struct FrontendCatalogManager { backend: KvBackendRef, + backend_cache_invalidtor: KvCacheInvalidatorRef, partition_manager: PartitionRuleManagerRef, datanode_clients: Arc, @@ -64,11 +65,13 @@ pub struct FrontendCatalogManager { impl FrontendCatalogManager { pub fn new( backend: KvBackendRef, + backend_cache_invalidtor: KvCacheInvalidatorRef, partition_manager: PartitionRuleManagerRef, datanode_clients: Arc, ) -> Self { Self { backend, + backend_cache_invalidtor, partition_manager, datanode_clients, dist_instance: None, @@ -90,6 +93,19 @@ impl FrontendCatalogManager { pub fn datanode_clients(&self) -> Arc { self.datanode_clients.clone() } + + pub async fn invalidate_table(&self, catalog: &str, schema: &str, table: &str) { + let tg_key = TableGlobalKey { + catalog_name: catalog.into(), + schema_name: schema.into(), + table_name: table.into(), + } + .to_string(); + + let tg_key = tg_key.as_bytes(); + + self.backend_cache_invalidtor.invalidate_key(tg_key).await; + } } // FIXME(hl): Frontend only needs a CatalogList, should replace with trait upcasting diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 741f6ed98b..a550ac57e1 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -30,7 +30,7 @@ use api::v1::greptime_request::Request; use api::v1::meta::Role; use api::v1::{AddColumns, AlterExpr, Column, DdlRequest, InsertRequest}; use async_trait::async_trait; -use catalog::remote::MetaKvBackend; +use catalog::remote::CachedMetaKvBackend; use catalog::CatalogManagerRef; use common_base::Plugins; use common_catalog::consts::MITO_ENGINE; @@ -137,14 +137,16 @@ impl Instance { datanode_clients: Arc, plugins: Arc, ) -> Result { - let meta_backend = Arc::new(MetaKvBackend { - client: meta_client.clone(), - }); + let meta_backend = Arc::new(CachedMetaKvBackend::new(meta_client.clone())); let table_routes = Arc::new(TableRoutes::new(meta_client.clone())); let partition_manager = Arc::new(PartitionRuleManager::new(table_routes)); - let mut catalog_manager = - FrontendCatalogManager::new(meta_backend, partition_manager, datanode_clients.clone()); + let mut catalog_manager = FrontendCatalogManager::new( + meta_backend.clone(), + meta_backend, + partition_manager, + datanode_clients.clone(), + ); let dist_instance = DistInstance::new( meta_client.clone(), diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index c9e6512da4..fbeecdb5b4 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -202,6 +202,19 @@ impl DistInstance { .await .context(RequestDatanodeSnafu)?; } + + // Since the table information created on meta does not go through KvBackend, so we + // manually invalidate the cache here. + // + // TODO(fys): when the meta invalidation cache mechanism is established, remove it. + self.catalog_manager + .invalidate_table( + &table_name.catalog_name, + &table_name.schema_name, + &table_name.table_name, + ) + .await; + Ok(table) } @@ -260,6 +273,18 @@ impl DistInstance { } } + // Since the table information dropped on meta does not go through KvBackend, so we + // manually invalidate the cache here. + // + // TODO(fys): when the meta invalidation cache mechanism is established, remove it. + self.catalog_manager() + .invalidate_table( + &table_name.catalog_name, + &table_name.schema_name, + &table_name.table_name, + ) + .await; + Ok(Output::AffectedRows(1)) } @@ -470,12 +495,15 @@ impl DistInstance { } else { expr.catalog_name.as_str() }; + let schema_name = if expr.schema_name.is_empty() { DEFAULT_SCHEMA_NAME } else { expr.schema_name.as_str() }; + let table_name = expr.table_name.as_str(); + let table = self .catalog_manager .table(catalog_name, schema_name, table_name) @@ -489,6 +517,7 @@ impl DistInstance { .context(AlterExprToRequestSnafu)?; let mut context = AlterContext::with_capacity(1); + context.insert(expr); table.alter(context, &request).await.context(TableSnafu)?;