mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-26 01:40:36 +00:00
fix: can not find catalog when create table (#1118)
* fix: get catalog by name in RemoteCatalogManager * cr * cr * cr * fix: ut failed
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -1100,6 +1100,7 @@ dependencies = [
|
||||
"common-runtime",
|
||||
"common-telemetry",
|
||||
"common-time",
|
||||
"dashmap",
|
||||
"datafusion",
|
||||
"datatypes",
|
||||
"futures",
|
||||
@@ -1109,6 +1110,7 @@ dependencies = [
|
||||
"meta-client",
|
||||
"mito",
|
||||
"object-store",
|
||||
"parking_lot",
|
||||
"regex",
|
||||
"serde",
|
||||
"serde_json",
|
||||
|
||||
@@ -18,12 +18,14 @@ common-recordbatch = { path = "../common/recordbatch" }
|
||||
common-runtime = { path = "../common/runtime" }
|
||||
common-telemetry = { path = "../common/telemetry" }
|
||||
common-time = { path = "../common/time" }
|
||||
dashmap = "5.4"
|
||||
datafusion.workspace = true
|
||||
datatypes = { path = "../datatypes" }
|
||||
futures = "0.3"
|
||||
futures-util.workspace = true
|
||||
lazy_static = "1.4"
|
||||
meta-client = { path = "../meta-client" }
|
||||
parking_lot = "0.12"
|
||||
regex = "1.6"
|
||||
serde = "1.0"
|
||||
serde_json = "1.0"
|
||||
|
||||
@@ -13,7 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -22,8 +22,10 @@ use async_stream::stream;
|
||||
use async_trait::async_trait;
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
|
||||
use common_telemetry::{debug, error, info};
|
||||
use dashmap::DashMap;
|
||||
use futures::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use parking_lot::RwLock;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use table::engine::{EngineContext, TableEngineRef};
|
||||
use table::metadata::TableId;
|
||||
@@ -39,6 +41,7 @@ use crate::error::{
|
||||
use crate::helper::{
|
||||
build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, CatalogValue,
|
||||
SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue, TableRegionalKey, TableRegionalValue,
|
||||
CATALOG_KEY_PREFIX,
|
||||
};
|
||||
use crate::remote::{Kv, KvBackendRef};
|
||||
use crate::{
|
||||
@@ -51,10 +54,9 @@ use crate::{
|
||||
pub struct RemoteCatalogManager {
|
||||
node_id: u64,
|
||||
backend: KvBackendRef,
|
||||
catalogs: Arc<ArcSwap<HashMap<String, CatalogProviderRef>>>,
|
||||
catalogs: Arc<RwLock<DashMap<String, CatalogProviderRef>>>,
|
||||
engine: TableEngineRef,
|
||||
system_table_requests: Mutex<Vec<RegisterSystemTableRequest>>,
|
||||
mutex: Arc<Mutex<()>>,
|
||||
}
|
||||
|
||||
impl RemoteCatalogManager {
|
||||
@@ -65,7 +67,6 @@ impl RemoteCatalogManager {
|
||||
backend,
|
||||
catalogs: Default::default(),
|
||||
system_table_requests: Default::default(),
|
||||
mutex: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -386,7 +387,14 @@ impl CatalogManager for RemoteCatalogManager {
|
||||
"Initialized catalogs: {:?}",
|
||||
catalogs.keys().cloned().collect::<Vec<_>>()
|
||||
);
|
||||
self.catalogs.store(Arc::new(catalogs));
|
||||
|
||||
{
|
||||
let self_catalogs = self.catalogs.read();
|
||||
catalogs.into_iter().for_each(|(k, v)| {
|
||||
self_catalogs.insert(k, v);
|
||||
});
|
||||
}
|
||||
|
||||
info!("Max table id allocated: {}", max_table_id);
|
||||
|
||||
let mut system_table_requests = self.system_table_requests.lock().await;
|
||||
@@ -504,12 +512,10 @@ impl CatalogList for RemoteCatalogManager {
|
||||
) -> Result<Option<CatalogProviderRef>> {
|
||||
let key = self.build_catalog_key(&name).to_string();
|
||||
let backend = self.backend.clone();
|
||||
let mutex = self.mutex.clone();
|
||||
let catalogs = self.catalogs.clone();
|
||||
|
||||
std::thread::spawn(|| {
|
||||
common_runtime::block_on_write(async move {
|
||||
let _guard = mutex.lock().await;
|
||||
backend
|
||||
.set(
|
||||
key.as_bytes(),
|
||||
@@ -518,11 +524,10 @@ impl CatalogList for RemoteCatalogManager {
|
||||
.context(InvalidCatalogValueSnafu)?,
|
||||
)
|
||||
.await?;
|
||||
let prev_catalogs = catalogs.load();
|
||||
let mut new_catalogs = HashMap::with_capacity(prev_catalogs.len() + 1);
|
||||
new_catalogs.clone_from(&prev_catalogs);
|
||||
let prev = new_catalogs.insert(name, catalog);
|
||||
catalogs.store(Arc::new(new_catalogs));
|
||||
|
||||
let catalogs = catalogs.read();
|
||||
let prev = catalogs.insert(name, catalog.clone());
|
||||
|
||||
Ok(prev)
|
||||
})
|
||||
})
|
||||
@@ -532,12 +537,65 @@ impl CatalogList for RemoteCatalogManager {
|
||||
|
||||
/// List all catalogs from metasrv
|
||||
fn catalog_names(&self) -> Result<Vec<String>> {
|
||||
Ok(self.catalogs.load().keys().cloned().collect::<Vec<_>>())
|
||||
let catalogs = self.catalogs.read();
|
||||
Ok(catalogs.iter().map(|k| k.key().to_string()).collect())
|
||||
}
|
||||
|
||||
/// Read catalog info of given name from metasrv.
|
||||
fn catalog(&self, name: &str) -> Result<Option<CatalogProviderRef>> {
|
||||
Ok(self.catalogs.load().get(name).cloned())
|
||||
{
|
||||
let catalogs = self.catalogs.read();
|
||||
let catalog = catalogs.get(name);
|
||||
|
||||
if let Some(catalog) = catalog {
|
||||
return Ok(Some(catalog.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
let catalogs = self.catalogs.write();
|
||||
|
||||
let catalog = catalogs.get(name);
|
||||
if let Some(catalog) = catalog {
|
||||
return Ok(Some(catalog.clone()));
|
||||
}
|
||||
|
||||
// It's for lack of incremental catalog syncing between datanode and meta. Here we fetch catalog
|
||||
// from meta on demand. This can be removed when incremental catalog syncing is done in datanode.
|
||||
|
||||
let backend = self.backend.clone();
|
||||
|
||||
let catalogs_from_meta: HashSet<String> = std::thread::spawn(|| {
|
||||
common_runtime::block_on_read(async move {
|
||||
let mut stream = backend.range(CATALOG_KEY_PREFIX.as_bytes());
|
||||
let mut catalogs = HashSet::new();
|
||||
|
||||
while let Some(catalog) = stream.next().await {
|
||||
if let Ok(catalog) = catalog {
|
||||
let catalog_key = String::from_utf8_lossy(&catalog.0);
|
||||
|
||||
if let Ok(key) = CatalogKey::parse(&catalog_key) {
|
||||
catalogs.insert(key.catalog_name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
catalogs
|
||||
})
|
||||
})
|
||||
.join()
|
||||
.unwrap();
|
||||
|
||||
catalogs.retain(|catalog_name, _| catalogs_from_meta.get(catalog_name).is_some());
|
||||
|
||||
for catalog in catalogs_from_meta {
|
||||
catalogs
|
||||
.entry(catalog.clone())
|
||||
.or_insert(self.new_catalog_provider(&catalog));
|
||||
}
|
||||
|
||||
let catalog = catalogs.get(name);
|
||||
|
||||
Ok(catalog.as_deref().cloned())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user