refactor: remote catalog uses memory (#1926)

* refactor: remote catalog uses memory

* rebase develop

* fix: resolve PR comments
This commit is contained in:
LFC
2023-07-12 17:33:33 +08:00
committed by GitHub
parent 39091421a4
commit 4fdb6d2f21
41 changed files with 529 additions and 528 deletions

1
Cargo.lock generated
View File

@@ -1595,6 +1595,7 @@ dependencies = [
"client",
"common-base",
"common-error",
"common-meta",
"common-query",
"common-recordbatch",
"common-telemetry",

View File

@@ -29,12 +29,12 @@ datatypes = { path = "../datatypes" }
futures = "0.3"
futures-util.workspace = true
key-lock = "0.1"
lazy_static = "1.4"
lazy_static.workspace = true
meta-client = { path = "../meta-client" }
metrics.workspace = true
moka = { version = "0.11", features = ["future"] }
parking_lot = "0.12"
regex = "1.6"
regex.workspace = true
serde = "1.0"
serde_json = "1.0"
session = { path = "../session" }

View File

@@ -525,8 +525,6 @@ impl CatalogManager for LocalCatalogManager {
}
async fn register_system_table(&self, request: RegisterSystemTableRequest) -> Result<()> {
self.check_state().await?;
let catalog_name = request.create_table_request.catalog_name.clone();
let schema_name = request.create_table_request.schema_name.clone();

View File

@@ -75,15 +75,7 @@ impl CatalogManager for MemoryCatalogManager {
}
async fn register_table(&self, request: RegisterTableRequest) -> Result<bool> {
let catalog = request.catalog.clone();
let schema = request.schema.clone();
let result = self.register_table_sync(request);
increment_gauge!(
crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT,
1.0,
&[crate::metrics::db_label(&catalog, &schema)],
);
result
self.register_table_sync(request)
}
async fn rename_table(&self, request: RenameTableRequest) -> Result<bool> {
@@ -143,11 +135,7 @@ impl CatalogManager for MemoryCatalogManager {
}
async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<bool> {
let registered = self.register_schema_sync(request)?;
if registered {
increment_gauge!(crate::metrics::METRIC_CATALOG_MANAGER_SCHEMA_COUNT, 1.0);
}
Ok(registered)
self.register_schema_sync(request)
}
async fn deregister_schema(&self, request: DeregisterSchemaRequest) -> Result<bool> {
@@ -263,11 +251,7 @@ impl CatalogManager for MemoryCatalogManager {
}
async fn register_catalog(&self, name: String) -> Result<bool> {
let registered = self.register_catalog_sync(name)?;
if registered {
increment_gauge!(crate::metrics::METRIC_CATALOG_MANAGER_CATALOG_COUNT, 1.0);
}
Ok(registered)
self.register_catalog_sync(name)
}
fn as_any(&self) -> &dyn Any {
@@ -291,7 +275,15 @@ impl MemoryCatalogManager {
pub fn register_catalog_sync(&self, name: String) -> Result<bool> {
let mut catalogs = self.catalogs.write().unwrap();
Ok(catalogs.insert(name, HashMap::new()).is_some())
match catalogs.entry(name) {
Entry::Vacant(e) => {
e.insert(HashMap::new());
increment_gauge!(crate::metrics::METRIC_CATALOG_MANAGER_CATALOG_COUNT, 1.0);
Ok(true)
}
Entry::Occupied(_) => Ok(false),
}
}
pub fn register_schema_sync(&self, request: RegisterSchemaRequest) -> Result<bool> {
@@ -301,11 +293,15 @@ impl MemoryCatalogManager {
.with_context(|| CatalogNotFoundSnafu {
catalog_name: &request.catalog,
})?;
if catalog.contains_key(&request.schema) {
return Ok(false);
match catalog.entry(request.schema) {
Entry::Vacant(e) => {
e.insert(HashMap::new());
increment_gauge!(crate::metrics::METRIC_CATALOG_MANAGER_SCHEMA_COUNT, 1.0);
Ok(true)
}
Entry::Occupied(_) => Ok(false),
}
let _ = catalog.insert(request.schema, HashMap::new());
Ok(true)
}
pub fn register_table_sync(&self, request: RegisterTableRequest) -> Result<bool> {
@@ -327,8 +323,13 @@ impl MemoryCatalogManager {
}
.fail();
}
Ok(schema.insert(request.table_name, request.table).is_none())
schema.insert(request.table_name, request.table);
increment_gauge!(
crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT,
1.0,
&[crate::metrics::db_label(&request.catalog, &request.schema)],
);
Ok(true)
}
#[cfg(any(test, feature = "testing"))]

View File

@@ -13,34 +13,34 @@
// limitations under the License.
use std::any::Any;
use std::collections::HashSet;
use std::sync::Arc;
use async_trait::async_trait;
use common_catalog::consts::{MAX_SYS_TABLE_ID, MITO_ENGINE};
use common_catalog::consts::MITO_ENGINE;
use common_meta::ident::TableIdent;
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::KvBackendRef;
use common_meta::rpc::store::{PutRequest, RangeRequest};
use common_meta::rpc::KeyValue;
use common_telemetry::{debug, error, info, warn};
use metrics::{decrement_gauge, increment_gauge};
use snafu::ResultExt;
use metrics::increment_gauge;
use snafu::{ensure, OptionExt, ResultExt};
use table::engine::manager::TableEngineManagerRef;
use table::engine::{EngineContext, TableReference};
use table::requests::{CreateTableRequest, OpenTableRequest};
use table::requests::OpenTableRequest;
use table::TableRef;
use tokio::sync::Mutex;
use crate::error::{
CatalogNotFoundSnafu, CreateTableSnafu, InvalidCatalogValueSnafu, OpenTableSnafu,
ParallelOpenTableSnafu, Result, SchemaNotFoundSnafu, TableEngineNotFoundSnafu,
TableMetadataManagerSnafu, UnimplementedSnafu,
InvalidCatalogValueSnafu, OpenTableSnafu, ParallelOpenTableSnafu, Result,
TableEngineNotFoundSnafu, TableExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
UnimplementedSnafu,
};
use crate::helper::{
build_catalog_prefix, build_schema_prefix, build_table_global_prefix,
build_table_regional_prefix, CatalogKey, CatalogValue, SchemaKey, SchemaValue, TableGlobalKey,
TableGlobalValue, TableRegionalKey, TableRegionalValue, CATALOG_KEY_PREFIX,
build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, SchemaKey,
TableGlobalKey, TableGlobalValue, TableRegionalKey, TableRegionalValue,
};
use crate::local::MemoryCatalogManager;
use crate::remote::region_alive_keeper::RegionAliveKeepers;
use crate::{
handle_system_table_request, CatalogManager, DeregisterSchemaRequest, DeregisterTableRequest,
@@ -54,6 +54,8 @@ pub struct RemoteCatalogManager {
engine_manager: TableEngineManagerRef,
system_table_requests: Mutex<Vec<RegisterSystemTableRequest>>,
region_alive_keepers: Arc<RegionAliveKeepers>,
memory_catalog_manager: Arc<MemoryCatalogManager>,
table_metadata_manager: TableMetadataManagerRef,
}
impl RemoteCatalogManager {
@@ -62,6 +64,7 @@ impl RemoteCatalogManager {
node_id: u64,
backend: KvBackendRef,
region_alive_keepers: Arc<RegionAliveKeepers>,
table_metadata_manager: TableMetadataManagerRef,
) -> Self {
Self {
engine_manager,
@@ -69,6 +72,8 @@ impl RemoteCatalogManager {
backend,
system_table_requests: Default::default(),
region_alive_keepers,
memory_catalog_manager: Arc::new(MemoryCatalogManager::default()),
table_metadata_manager,
}
}
@@ -120,40 +125,6 @@ impl RemoteCatalogManager {
Ok(())
}
pub async fn create_catalog_and_schema(
&self,
catalog_name: &str,
schema_name: &str,
) -> Result<()> {
let schema_key = SchemaKey {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
}
.to_string();
let req = PutRequest::new()
.with_key(schema_key.as_bytes())
.with_value(SchemaValue.as_bytes().context(InvalidCatalogValueSnafu)?);
self.backend
.put(req)
.await
.context(TableMetadataManagerSnafu)?;
info!("Created schema '{schema_key}'");
let catalog_key = CatalogKey {
catalog_name: catalog_name.to_string(),
}
.to_string();
let req = PutRequest::new()
.with_key(catalog_key.as_bytes())
.with_value(CatalogValue.as_bytes().context(InvalidCatalogValueSnafu)?);
self.backend
.put(req)
.await
.context(TableMetadataManagerSnafu)?;
info!("Created catalog '{catalog_key}");
Ok(())
}
fn build_schema_key(&self, catalog_name: String, schema_name: String) -> SchemaKey {
SchemaKey {
catalog_name,
@@ -170,74 +141,39 @@ impl RemoteCatalogManager {
engine_manager: TableEngineManagerRef,
catalog_name: String,
schema_name: String,
) -> Result<u32> {
) -> Result<()> {
info!("initializing tables in {}.{}", catalog_name, schema_name);
let kvs = iter_remote_tables(node_id, &backend, &catalog_name, &schema_name).await?;
let table_num = kvs.len();
let joins = kvs
.into_iter()
.map(|(table_key, table_value)| {
.map(|(_, table_value)| {
let engine_manager = engine_manager.clone();
let backend = backend.clone();
let memory_catalog_manager = self.memory_catalog_manager.clone();
let table_metadata_manager = self.table_metadata_manager.clone();
let table_id = table_value.table_id();
common_runtime::spawn_bg(async move {
match open_or_create_table(node_id, engine_manager, &table_key, &table_value)
.await
if let Err(e) = open_and_register_table(
node_id,
engine_manager,
&table_value,
memory_catalog_manager,
table_metadata_manager,
)
.await
{
Ok(table_ref) => {
let table_info = table_ref.table_info();
let table_name = &table_info.name;
info!("Registered table {}", table_name);
Ok(Some(table_info.ident.table_id))
}
Err(err) => {
warn!(
"Node id: {}, failed to open table: {}, source: {}",
node_id, table_key, err
);
debug!(
"Node id: {}, TableGlobalKey: {}, value: {:?},",
node_id, table_key, table_value
);
print_regional_key_debug_info(node_id, backend, &table_key).await;
Ok(None)
}
// Note that we don't return error here if table opened failed. This is because
// we don't want those broken tables to impede the startup of Datanode.
// However, this could be changed in the future.
error!(e; "Failed to open or register table, id = {table_id}")
}
})
})
.collect::<Vec<_>>();
let opened_table_ids = futures::future::try_join_all(joins)
futures::future::try_join_all(joins)
.await
.context(ParallelOpenTableSnafu)?
.into_iter()
.collect::<Result<Vec<_>>>()?
.into_iter()
.flatten()
.collect::<Vec<_>>();
let opened = opened_table_ids.len();
let max_table_id = opened_table_ids
.into_iter()
.max()
.unwrap_or(MAX_SYS_TABLE_ID);
increment_gauge!(
crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT,
table_num as f64,
&[crate::metrics::db_label(&catalog_name, &schema_name)],
);
info!(
"initialized tables in {}.{}, total: {}, opened: {}, failed: {}",
catalog_name,
schema_name,
table_num,
opened,
table_num - opened
);
Ok(max_table_id)
.context(ParallelOpenTableSnafu)?;
Ok(())
}
/// Initiates all schemas inside the catalog by fetching data from metasrv.
@@ -275,19 +211,7 @@ impl RemoteCatalogManager {
));
}
let mut max_table_id = MAX_SYS_TABLE_ID;
if let Some(found_max_table_id) = futures::future::try_join_all(joins)
.await?
.into_iter()
.max()
{
max_table_id = max_table_id.max(found_max_table_id);
info!(
"Catalog name: {}, max table id allocated: {}",
catalog_name, max_table_id
);
}
futures::future::try_join_all(joins).await?;
Ok(())
}
@@ -402,24 +326,6 @@ impl RemoteCatalogManager {
node_id: self.node_id,
}
}
async fn check_catalog_schema_exist(
&self,
catalog_name: &str,
schema_name: &str,
) -> Result<()> {
if !self.catalog_exist(catalog_name).await? {
return CatalogNotFoundSnafu { catalog_name }.fail()?;
}
if !self.schema_exist(catalog_name, schema_name).await? {
return SchemaNotFoundSnafu {
catalog: catalog_name,
schema: schema_name,
}
.fail()?;
}
Ok(())
}
}
async fn iter_remote_schemas<'a>(
@@ -495,59 +401,14 @@ async fn iter_remote_tables<'a>(
Ok(tables)
}
async fn print_regional_key_debug_info(
node_id: u64,
backend: KvBackendRef,
table_key: &TableGlobalKey,
) {
let regional_key = TableRegionalKey {
catalog_name: table_key.catalog_name.clone(),
schema_name: table_key.schema_name.clone(),
table_name: table_key.table_name.clone(),
node_id,
}
.to_string();
match backend.get(regional_key.as_bytes()).await {
Ok(Some(KeyValue {
key: _,
value: values_bytes,
})) => {
debug!(
"Node id: {}, TableRegionalKey: {}, value: {},",
node_id,
table_key,
String::from_utf8_lossy(&values_bytes),
);
}
Ok(None) => {
debug!(
"Node id: {}, TableRegionalKey: {}, value: None",
node_id, table_key,
);
}
Err(err) => {
debug!(
"Node id: {}, failed to fetch TableRegionalKey: {}, source: {}",
node_id, regional_key, err
);
}
}
}
async fn open_or_create_table(
async fn open_and_register_table(
node_id: u64,
engine_manager: TableEngineManagerRef,
table_key: &TableGlobalKey,
table_value: &TableGlobalValue,
) -> Result<TableRef> {
memory_catalog_manager: Arc<MemoryCatalogManager>,
_table_metadata_manager: TableMetadataManagerRef,
) -> Result<()> {
let context = EngineContext {};
let TableGlobalKey {
catalog_name,
schema_name,
table_name,
..
} = table_key;
let table_id = table_value.table_id();
@@ -557,6 +418,10 @@ async fn open_or_create_table(
..
} = table_value;
let catalog_name = table_info.catalog_name.clone();
let schema_name = table_info.schema_name.clone();
let table_name = table_info.name.clone();
// unwrap safety: checked in yielding this table when `iter_remote_tables`
let region_numbers = regions_id_map.get(&node_id).unwrap();
@@ -573,51 +438,42 @@ async fn open_or_create_table(
.context(TableEngineNotFoundSnafu {
engine_name: &table_info.meta.engine,
})?;
match engine
let table_ident = TableIdent {
catalog: catalog_name,
schema: schema_name,
table: table_name,
table_id,
engine: table_info.meta.engine.clone(),
};
let table = engine
.open_table(&context, request)
.await
.with_context(|_| OpenTableSnafu {
table_info: format!("{catalog_name}.{schema_name}.{table_name}, id:{table_id}"),
})? {
Some(table) => {
info!(
"Table opened: {}.{}.{}",
catalog_name, schema_name, table_name
);
Ok(table)
}
None => {
info!(
"Try create table: {}.{}.{}",
catalog_name, schema_name, table_name
);
table_info: table_ident.to_string(),
})?
.with_context(|| TableNotFoundSnafu {
table_info: table_ident.to_string(),
})?;
info!("Successfully opened table, {table_ident}");
let meta = &table_info.meta;
let req = CreateTableRequest {
id: table_id,
catalog_name: catalog_name.clone(),
schema_name: schema_name.clone(),
table_name: table_name.clone(),
desc: None,
schema: meta.schema.clone(),
region_numbers: region_numbers.clone(),
primary_key_indices: meta.primary_key_indices.clone(),
create_if_not_exists: true,
table_options: meta.options.clone(),
engine: engine.name().to_string(),
};
engine
.create_table(&context, req)
.await
.context(CreateTableSnafu {
table_info: format!(
"{}.{}.{}, id:{}",
&catalog_name, &schema_name, &table_name, table_id
),
})
let request = RegisterTableRequest {
catalog: table_ident.catalog.clone(),
schema: table_ident.schema.clone(),
table_name: table_ident.table.clone(),
table_id,
table,
};
let registered = memory_catalog_manager.register_table_sync(request)?;
ensure!(
registered,
TableExistsSnafu {
table: table_ident.to_string(),
}
}
);
info!("Successfully registered table, {table_ident}");
Ok(())
}
#[async_trait]
@@ -638,92 +494,70 @@ impl CatalogManager for RemoteCatalogManager {
}
async fn register_table(&self, request: RegisterTableRequest) -> Result<bool> {
let catalog_name = request.catalog;
let schema_name = request.schema;
self.check_catalog_schema_exist(&catalog_name, &schema_name)
.await?;
let catalog = request.catalog.clone();
let schema = request.schema.clone();
let table_name = request.table_name.clone();
let table = request.table.clone();
let _ = self
.register_table(
catalog_name.clone(),
schema_name.clone(),
request.table_name,
request.table.clone(),
)
.await?;
let registered = self.memory_catalog_manager.register_table_sync(request)?;
let table_info = request.table.table_info();
let table_ident = TableIdent {
catalog: table_info.catalog_name.clone(),
schema: table_info.schema_name.clone(),
table: table_info.name.clone(),
table_id: table_info.ident.table_id,
engine: table_info.meta.engine.clone(),
};
self.region_alive_keepers
.register_table(table_ident, request.table)
.await?;
if registered {
self.register_table(catalog, schema, table_name, table.clone())
.await?;
increment_gauge!(
crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT,
1.0,
&[crate::metrics::db_label(&catalog_name, &schema_name)],
);
Ok(true)
let table_info = table.table_info();
let table_ident = TableIdent {
catalog: table_info.catalog_name.clone(),
schema: table_info.schema_name.clone(),
table: table_info.name.clone(),
table_id: table_info.table_id(),
engine: table_info.meta.engine.clone(),
};
self.region_alive_keepers
.register_table(table_ident, table)
.await?;
}
Ok(registered)
}
async fn deregister_table(&self, request: DeregisterTableRequest) -> Result<()> {
let catalog_name = request.catalog;
let schema_name = request.schema;
let table_name = request.table_name;
self.check_catalog_schema_exist(&catalog_name, &schema_name)
.await?;
let Some(table) = self
.memory_catalog_manager
.table(&request.catalog, &request.schema, &request.table_name)
.await? else { return Ok(()) };
let result = self
.deregister_table(
catalog_name.clone(),
schema_name.clone(),
table_name.clone(),
)
.await?;
decrement_gauge!(
crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT,
1.0,
&[crate::metrics::db_label(&catalog_name, &schema_name)],
);
self.deregister_table(
request.catalog.clone(),
request.schema.clone(),
request.table_name.clone(),
)
.await?;
if let Some(table) = result.as_ref() {
let table_info = table.table_info();
let table_ident: TableIdent = TableIdent {
catalog: catalog_name,
schema: schema_name,
table: table_name,
table_id: table_info.ident.table_id,
engine: table_info.meta.engine.clone(),
};
let _ = self
.region_alive_keepers
.deregister_table(&table_ident)
.await;
let table_info = table.table_info();
let table_ident = TableIdent {
catalog: request.catalog.clone(),
schema: request.schema.clone(),
table: request.table_name.clone(),
table_id: table_info.ident.table_id,
engine: table_info.meta.engine.clone(),
};
if let Some(keeper) = self
.region_alive_keepers
.deregister_table(&table_ident)
.await
{
warn!(
"Table {} is deregistered from region alive keepers",
keeper.table_ident(),
);
}
Ok(())
self.memory_catalog_manager.deregister_table(request).await
}
async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<bool> {
let catalog_name = request.catalog;
let schema_name = request.schema;
let key = self.build_schema_key(catalog_name, schema_name).to_string();
let req = PutRequest::new()
.with_key(key.as_bytes())
.with_value(SchemaValue.as_bytes().context(InvalidCatalogValueSnafu)?);
self.backend
.put(req)
.await
.context(TableMetadataManagerSnafu)?;
increment_gauge!(crate::metrics::METRIC_CATALOG_MANAGER_SCHEMA_COUNT, 1.0);
Ok(true)
self.memory_catalog_manager.register_schema_sync(request)
}
async fn deregister_schema(&self, _request: DeregisterSchemaRequest) -> Result<bool> {
@@ -734,42 +568,7 @@ impl CatalogManager for RemoteCatalogManager {
}
async fn rename_table(&self, request: RenameTableRequest) -> Result<bool> {
let catalog_name = request.catalog.clone();
let schema_name = request.schema.clone();
self.check_catalog_schema_exist(&catalog_name, &schema_name)
.await?;
let old_table_key = TableRegionalKey {
catalog_name: request.catalog.clone(),
schema_name: request.schema.clone(),
table_name: request.table_name.clone(),
node_id: self.node_id,
}
.to_string();
let Some(KeyValue{ key: _, value }) = self.backend
.get(old_table_key.as_bytes())
.await
.context(TableMetadataManagerSnafu)? else {
return Ok(false)
};
let new_table_key = TableRegionalKey {
catalog_name: request.catalog.clone(),
schema_name: request.schema.clone(),
table_name: request.new_table_name,
node_id: self.node_id,
};
let req = PutRequest::new()
.with_key(new_table_key.to_string().as_bytes())
.with_value(value);
self.backend
.put(req)
.await
.context(TableMetadataManagerSnafu)?;
self.backend
.delete(old_table_key.to_string().as_bytes(), false)
.await
.context(TableMetadataManagerSnafu)?;
Ok(true)
self.memory_catalog_manager.rename_table(request).await
}
async fn register_system_table(&self, request: RegisterSystemTableRequest) -> Result<()> {
@@ -787,15 +586,45 @@ impl CatalogManager for RemoteCatalogManager {
}
async fn schema_exist(&self, catalog: &str, schema: &str) -> Result<bool> {
if !self.catalog_exist(catalog).await? {
return Ok(false);
}
if self
.memory_catalog_manager
.schema_exist(catalog, schema)
.await?
{
return Ok(true);
}
let key = self
.build_schema_key(catalog.to_string(), schema.to_string())
.to_string();
Ok(self
let remote_schema_exists = self
.backend
.get(key.as_bytes())
.await
.context(TableMetadataManagerSnafu)?
.is_some())
.is_some();
// Create schema locally if remote schema exists. Since local schema is managed by memory
// catalog manager, creating a local schema is relatively cheap (just a HashMap).
// Besides, if this method ("schema_exist) is called, it's very likely that someone wants to
// create a table in this schema. We should create the schema now.
if remote_schema_exists
&& self
.memory_catalog_manager
.register_schema(RegisterSchemaRequest {
catalog: catalog.to_string(),
schema: schema.to_string(),
})
.await?
{
info!("register schema '{catalog}/{schema}' on demand");
}
Ok(remote_schema_exists)
}
async fn table(
@@ -804,161 +633,73 @@ impl CatalogManager for RemoteCatalogManager {
schema_name: &str,
table_name: &str,
) -> Result<Option<TableRef>> {
self.check_catalog_schema_exist(catalog_name, schema_name)
.await?;
let key = self
.build_regional_table_key(
catalog_name.to_string(),
schema_name.to_string(),
table_name.to_string(),
)
.to_string();
let table_opt = self
.backend
.get(key.as_bytes())
self.memory_catalog_manager
.table(catalog_name, schema_name, table_name)
.await
.context(TableMetadataManagerSnafu)?
.map(|KeyValue { key: _, value: v }| {
let TableRegionalValue {
table_id,
engine_name,
..
} = TableRegionalValue::parse(String::from_utf8_lossy(&v))
.context(InvalidCatalogValueSnafu)?;
let Some(table_id) = table_id else {
warn!("Cannot find table id for {key}, the value has an old format");
return Ok(None);
};
let engine_name = engine_name.as_deref().unwrap_or(MITO_ENGINE);
let engine = self
.engine_manager
.engine(engine_name)
.context(TableEngineNotFoundSnafu { engine_name })?;
let reference = TableReference {
catalog: catalog_name,
schema: schema_name,
table: table_name,
};
let table = engine
.get_table(&EngineContext {}, table_id)
.with_context(|_| OpenTableSnafu {
table_info: reference.to_string(),
})?;
Ok(table)
})
.transpose()?
.flatten();
Ok(table_opt)
}
async fn catalog_exist(&self, catalog: &str) -> Result<bool> {
if self.memory_catalog_manager.catalog_exist(catalog).await? {
return Ok(true);
}
let key = CatalogKey {
catalog_name: catalog.to_string(),
};
Ok(self
let remote_catalog_exists = self
.backend
.get(key.to_string().as_bytes())
.await
.context(TableMetadataManagerSnafu)?
.is_some())
.is_some();
// Create catalog locally if remote catalog exists. Since local catalog is managed by memory
// catalog manager, creating a local catalog is relatively cheap (just a HashMap).
// Besides, if this method ("catalog_exist) is called, it's very likely that someone wants to
// create a table in this catalog. We should create the catalog now.
if remote_catalog_exists
&& self
.memory_catalog_manager
.register_catalog(catalog.to_string())
.await?
{
info!("register catalog '{catalog}' on demand");
}
Ok(remote_catalog_exists)
}
async fn table_exist(&self, catalog: &str, schema: &str, table: &str) -> Result<bool> {
self.check_catalog_schema_exist(catalog, schema).await?;
let key = TableRegionalKey {
catalog_name: catalog.to_string(),
schema_name: schema.to_string(),
table_name: table.to_string(),
node_id: self.node_id,
if !self.catalog_exist(catalog).await? {
return Ok(false);
}
.to_string();
Ok(self
.backend
.get(key.as_bytes())
if !self.schema_exist(catalog, schema).await? {
return Ok(false);
}
self.memory_catalog_manager
.table_exist(catalog, schema, table)
.await
.context(TableMetadataManagerSnafu)?
.is_some())
}
async fn catalog_names(&self) -> Result<Vec<String>> {
let req = RangeRequest::new().with_prefix(CATALOG_KEY_PREFIX.as_bytes());
let kvs = self
.backend
.range(req)
.await
.context(TableMetadataManagerSnafu)?
.kvs;
let mut catalogs = HashSet::new();
for catalog in kvs {
let catalog_key = String::from_utf8_lossy(catalog.key());
if let Ok(key) = CatalogKey::parse(&catalog_key) {
let _ = catalogs.insert(key.catalog_name);
}
}
Ok(catalogs.into_iter().collect())
self.memory_catalog_manager.catalog_names().await
}
async fn schema_names(&self, catalog_name: &str) -> Result<Vec<String>> {
let req = RangeRequest::new().with_prefix(build_schema_prefix(catalog_name).as_bytes());
let kvs = self
.backend
.range(req)
.await
.context(TableMetadataManagerSnafu)?
.kvs;
let mut schemas = HashSet::new();
for schema in kvs {
let schema_key = String::from_utf8_lossy(schema.key());
if let Ok(key) = SchemaKey::parse(&schema_key) {
let _ = schemas.insert(key.schema_name);
}
}
Ok(schemas.into_iter().collect())
self.memory_catalog_manager.schema_names(catalog_name).await
}
async fn table_names(&self, catalog_name: &str, schema_name: &str) -> Result<Vec<String>> {
self.check_catalog_schema_exist(catalog_name, schema_name)
.await?;
let req = RangeRequest::new()
.with_prefix(build_table_regional_prefix(catalog_name, schema_name).as_bytes());
let kvs = self
.backend
.range(req)
self.memory_catalog_manager
.table_names(catalog_name, schema_name)
.await
.context(TableMetadataManagerSnafu)?
.kvs;
let mut tables = HashSet::new();
for table in kvs {
let table_key = String::from_utf8_lossy(table.key());
if let Ok(key) = TableRegionalKey::parse(&table_key) {
let _ = tables.insert(key.table_name);
}
}
Ok(tables.into_iter().collect())
}
async fn register_catalog(&self, name: String) -> Result<bool> {
let key = CatalogKey { catalog_name: name }.to_string();
// TODO(hl): use compare_and_swap to prevent concurrent update
let req = PutRequest::new()
.with_key(key.as_bytes())
.with_value(CatalogValue.as_bytes().context(InvalidCatalogValueSnafu)?);
self.backend
.put(req)
.await
.context(TableMetadataManagerSnafu)?;
increment_gauge!(crate::metrics::METRIC_CATALOG_MANAGER_CATALOG_COUNT, 1.0);
Ok(false)
self.memory_catalog_manager.register_catalog_sync(name)
}
fn as_any(&self) -> &dyn Any {

View File

@@ -312,6 +312,10 @@ impl RegionAliveKeeper {
}
deadline
}
pub fn table_ident(&self) -> &TableIdent {
&self.table_ident
}
}
#[derive(Debug)]

View File

@@ -29,6 +29,7 @@ mod tests {
use catalog::{CatalogManager, RegisterSchemaRequest, RegisterTableRequest};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE};
use common_meta::ident::TableIdent;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackend;
use common_meta::rpc::store::{CompareAndPutRequest, PutRequest, RangeRequest};
@@ -155,6 +156,7 @@ mod tests {
node_id,
cached_backend.clone(),
region_alive_keepers.clone(),
Arc::new(TableMetadataManager::new(cached_backend)),
);
catalog_manager.start().await.unwrap();

View File

@@ -22,6 +22,7 @@ client = { path = "../client" }
common-base = { path = "../common/base" }
common-error = { path = "../common/error" }
common-query = { path = "../common/query" }
common-meta = { path = "../common/meta" }
common-recordbatch = { path = "../common/recordbatch" }
common-telemetry = { path = "../common/telemetry", features = [
"deadlock_detection",

View File

@@ -21,6 +21,7 @@ use client::client_manager::DatanodeClients;
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_base::Plugins;
use common_error::prelude::ErrorExt;
use common_meta::key::TableMetadataManager;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::logging;
@@ -263,9 +264,10 @@ async fn create_query_engine(meta_addr: &str) -> Result<DatafusionQueryEngine> {
let catalog_list = Arc::new(FrontendCatalogManager::new(
cached_meta_backend.clone(),
cached_meta_backend,
cached_meta_backend.clone(),
partition_manager,
datanode_clients,
Arc::new(TableMetadataManager::new(cached_meta_backend)),
));
let plugins: Arc<Plugins> = Default::default();
let state = Arc::new(QueryEngineState::new(

View File

@@ -70,7 +70,7 @@ pub enum Error {
#[snafu(display("Get null from cache, key: {}", key))]
CacheNotGet { key: String, location: Location },
#[snafu(display("Failed to request MetaSrv, source: {}", source))]
#[snafu(display("{source}"))]
MetaSrv {
source: BoxedError,
location: Location,

View File

@@ -24,6 +24,7 @@ use crate::key::{to_removed_key, TableMetaKey};
use crate::kv_backend::memory::MemoryKvBackend;
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::{CompareAndPutRequest, MoveValueRequest, RangeRequest};
use crate::table_name::TableName;
#[derive(Debug)]
pub struct TableNameKey<'a> {
@@ -77,6 +78,16 @@ impl TableMetaKey for TableNameKey<'_> {
}
}
impl<'a> From<&'a TableName> for TableNameKey<'a> {
fn from(value: &'a TableName) -> Self {
Self {
catalog: &value.catalog_name,
schema: &value.schema_name,
table: &value.table_name,
}
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
pub struct TableNameValue {
table_id: TableId,
@@ -207,9 +218,18 @@ mod tests {
test_err(b"__table_name/x/000_invalid_schema/z");
test_err(b"__table_name/x/y/000_invalid_table");
let table_name =
TableNameKey::strip_table_name(b"__table_name/my_catalog/my_schema/my_table").unwrap();
assert_eq!(table_name, "my_table");
fn test_ok(table_name: &str) {
assert_eq!(
table_name,
TableNameKey::strip_table_name(
format!("__table_name/my_catalog/my_schema/{}", table_name).as_bytes()
)
.unwrap()
);
}
test_ok("my_table");
test_ok("cpu:metrics");
test_ok(":cpu:metrics");
}
#[test]

View File

@@ -28,6 +28,7 @@ use common_error::prelude::BoxedError;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_meta::key::TableMetadataManager;
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::store::state_store::ObjectStateStore;
use common_procedure::ProcedureManagerRef;
@@ -253,8 +254,9 @@ impl Instance {
let catalog_manager = Arc::new(RemoteCatalogManager::new(
engine_manager.clone(),
opts.node_id.context(MissingNodeIdSnafu)?,
kv_backend,
kv_backend.clone(),
region_alive_keepers.clone(),
Arc::new(TableMetadataManager::new(kv_backend)),
));
(
@@ -265,7 +267,6 @@ impl Instance {
}
};
catalog_manager.start().await.context(CatalogSnafu)?;
let factory = QueryEngineFactory::new_with_plugins(
catalog_manager.clone(),
false,

View File

@@ -50,7 +50,7 @@ openmetrics-parser = "0.4"
partition = { path = "../partition" }
prost.workspace = true
query = { path = "../query" }
regex = "1.6"
regex.workspace = true
script = { path = "../script", features = ["python"], optional = true }
serde = "1.0"
serde_json = "1.0"

View File

@@ -34,6 +34,7 @@ use catalog::{
use client::client_manager::DatanodeClients;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME};
use common_error::prelude::BoxedError;
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::KvBackendRef;
use common_meta::rpc::store::RangeRequest;
use common_meta::rpc::KeyValue;
@@ -54,6 +55,7 @@ pub struct FrontendCatalogManager {
backend_cache_invalidator: KvCacheInvalidatorRef,
partition_manager: PartitionRuleManagerRef,
datanode_clients: Arc<DatanodeClients>,
table_metadata_manager: TableMetadataManagerRef,
// TODO(LFC): Remove this field.
// DistInstance in FrontendCatalogManager is only used for creating distributed script table now.
@@ -68,12 +70,14 @@ impl FrontendCatalogManager {
backend_cache_invalidator: KvCacheInvalidatorRef,
partition_manager: PartitionRuleManagerRef,
datanode_clients: Arc<DatanodeClients>,
table_metadata_manager: TableMetadataManagerRef,
) -> Self {
Self {
backend,
backend_cache_invalidator,
partition_manager,
datanode_clients,
table_metadata_manager,
dist_instance: None,
}
}
@@ -415,6 +419,7 @@ impl CatalogManager for FrontendCatalogManager {
TableName::new(catalog, schema, table_name),
table_info,
Arc::new(self.clone()),
self.table_metadata_manager.clone(),
));
Ok(Some(table))
}

View File

@@ -39,6 +39,7 @@ use common_error::ext::BoxedError;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_meta::key::TableMetadataManager;
use common_query::Output;
use common_telemetry::logging::{debug, info};
use common_telemetry::timer;
@@ -149,17 +150,20 @@ impl Instance {
let table_routes = Arc::new(TableRoutes::new(meta_client.clone()));
let partition_manager = Arc::new(PartitionRuleManager::new(table_routes));
let table_metadata_manager = Arc::new(TableMetadataManager::new(meta_backend.clone()));
let mut catalog_manager = FrontendCatalogManager::new(
meta_backend.clone(),
meta_backend.clone(),
partition_manager.clone(),
datanode_clients.clone(),
table_metadata_manager.clone(),
);
let dist_instance = DistInstance::new(
meta_client.clone(),
Arc::new(catalog_manager.clone()),
datanode_clients.clone(),
table_metadata_manager.clone(),
);
let dist_instance = Arc::new(dist_instance);

View File

@@ -34,6 +34,7 @@ use client::Database;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::format_full_table_name;
use common_error::prelude::BoxedError;
use common_meta::key::TableMetadataManagerRef;
use common_meta::peer::Peer;
use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use common_meta::rpc::router::{Partition as MetaPartition, RouteRequest};
@@ -84,6 +85,7 @@ pub struct DistInstance {
meta_client: Arc<MetaClient>,
catalog_manager: Arc<FrontendCatalogManager>,
datanode_clients: Arc<DatanodeClients>,
table_metadata_manager: TableMetadataManagerRef,
}
impl DistInstance {
@@ -91,11 +93,13 @@ impl DistInstance {
meta_client: Arc<MetaClient>,
catalog_manager: Arc<FrontendCatalogManager>,
datanode_clients: Arc<DatanodeClients>,
table_metadata_manager: TableMetadataManagerRef,
) -> Self {
Self {
meta_client,
catalog_manager,
datanode_clients,
table_metadata_manager,
}
}
@@ -131,6 +135,7 @@ impl DistInstance {
table_name.clone(),
table_info,
self.catalog_manager.clone(),
self.table_metadata_manager.clone(),
));
let request = RegisterTableRequest {

View File

@@ -181,6 +181,7 @@ mod tests {
};
use client::client_manager::DatanodeClients;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::{KvBackend, KvBackendRef};
use common_meta::rpc::store::PutRequest;
@@ -262,6 +263,7 @@ mod tests {
async fn test_split_inserts() {
let backend = prepare_mocked_backend().await;
let table_metadata_manager = Arc::new(TableMetadataManager::new(backend.clone()));
let table_name = "one_column_partitioning_table";
create_testing_table(&backend, table_name).await;
@@ -270,6 +272,7 @@ mod tests {
Arc::new(MockKvCacheInvalidator::default()),
create_partition_rule_manager().await,
Arc::new(DatanodeClients::default()),
table_metadata_manager,
));
let inserter = DistInserter::new(

View File

@@ -22,7 +22,7 @@ use async_trait::async_trait;
use catalog::helper::{TableGlobalKey, TableGlobalValue};
use client::Database;
use common_error::prelude::BoxedError;
use common_meta::key::TableRouteKey;
use common_meta::key::{TableMetadataManagerRef, TableRouteKey};
use common_meta::rpc::store::{MoveValueRequest, PutRequest};
use common_meta::table_name::TableName;
use common_query::error::Result as QueryResult;
@@ -72,6 +72,8 @@ pub struct DistTable {
table_name: TableName,
table_info: TableInfoRef,
catalog_manager: Arc<FrontendCatalogManager>,
#[allow(unused)]
table_metadata_manager: TableMetadataManagerRef,
}
#[async_trait]
@@ -242,11 +244,13 @@ impl DistTable {
table_name: TableName,
table_info: TableInfoRef,
catalog_manager: Arc<FrontendCatalogManager>,
table_metadata_manager: TableMetadataManagerRef,
) -> Self {
Self {
table_name,
table_info,
catalog_manager,
table_metadata_manager,
}
}

View File

@@ -537,16 +537,16 @@ mod tests {
assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
}
fn new_table_info() -> RawTableInfo {
fn new_table_info(table_name: &TableName) -> RawTableInfo {
RawTableInfo {
ident: TableIdent {
table_id: 0,
version: 0,
},
name: "t".to_string(),
name: table_name.table_name.clone(),
desc: None,
catalog_name: "c".to_string(),
schema_name: "s".to_string(),
catalog_name: table_name.catalog_name.clone(),
schema_name: table_name.schema_name.clone(),
meta: RawTableMeta {
schema: RawSchema {
column_schemas: vec![ColumnSchema::new(
@@ -579,8 +579,9 @@ mod tests {
.build();
meta_client.start(urls).await.unwrap();
let table_info = new_table_info();
let req = CreateRequest::new(TableName::new("c", "s", "t"), &table_info);
let table_name = TableName::new("c", "s", "t");
let table_info = new_table_info(&table_name);
let req = CreateRequest::new(table_name, &table_info);
let res = meta_client.create_route(req).await;
assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
}
@@ -674,7 +675,7 @@ mod tests {
value_list: vec![b"Max1".to_vec(), b"Max2".to_vec()],
};
let table_name = TableName::new("test_catalog", "test_schema", "test_table");
let table_info = new_table_info();
let table_info = new_table_info(&table_name);
let req = CreateRequest::new(table_name.clone(), &table_info)
.add_partition(p1)
.add_partition(p2);

View File

@@ -29,13 +29,13 @@ etcd-client.workspace = true
futures.workspace = true
h2 = "0.3"
http-body = "0.4"
lazy_static = "1.4"
lazy_static.workspace = true
metrics.workspace = true
once_cell = "1.17"
parking_lot = "0.12"
prost.workspace = true
rand.workspace = true
regex = "1.6"
regex.workspace = true
serde = "1.0"
serde_json = "1.0"
snafu.workspace = true

View File

@@ -15,6 +15,7 @@
use std::sync::Arc;
use client::client_manager::DatanodeClients;
use common_meta::key::TableMetadataManagerRef;
use common_meta::rpc::ddl::{CreateTableTask, DropTableTask};
use common_meta::rpc::router::TableRoute;
use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId};
@@ -34,6 +35,7 @@ pub struct DdlManager {
datanode_clients: Arc<DatanodeClients>,
mailbox: MailboxRef,
server_addr: String,
table_metadata_manager: TableMetadataManagerRef,
}
#[derive(Clone)]
@@ -42,6 +44,8 @@ pub(crate) struct DdlContext {
pub(crate) datanode_clients: Arc<DatanodeClients>,
pub(crate) mailbox: MailboxRef,
pub(crate) server_addr: String,
#[allow(unused)]
pub(crate) table_metadata_manager: TableMetadataManagerRef,
}
impl DdlManager {
@@ -51,6 +55,7 @@ impl DdlManager {
datanode_clients: Arc<DatanodeClients>,
mailbox: MailboxRef,
server_addr: String,
table_metadata_manager: TableMetadataManagerRef,
) -> Self {
Self {
procedure_manager,
@@ -58,6 +63,7 @@ impl DdlManager {
datanode_clients,
mailbox,
server_addr,
table_metadata_manager,
}
}
@@ -67,6 +73,7 @@ impl DdlManager {
datanode_clients: self.datanode_clients.clone(),
mailbox: self.mailbox.clone(),
server_addr: self.server_addr.clone(),
table_metadata_manager: self.table_metadata_manager.clone(),
}
}

View File

@@ -422,6 +422,12 @@ pub enum Error {
source: BoxedError,
location: Location,
},
#[snafu(display("Table metadata manager error: {}", source))]
TableMetadataManager {
source: common_meta::error::Error,
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -512,6 +518,7 @@ impl ErrorExt for Error {
Error::TableRouteConversion { source, .. }
| Error::ConvertProtoData { source, .. }
| Error::TableMetadataManager { source, .. }
| Error::ConvertEtcdTxnObject { source, .. } => source.status_code(),
Error::Other { source, .. } => source.status_code(),

View File

@@ -142,12 +142,15 @@ mod tests {
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use common_meta::key::TableMetadataManager;
use super::*;
use crate::cluster::MetaPeerClientBuilder;
use crate::handler::{HeartbeatMailbox, Pushers};
use crate::keys::StatKey;
use crate::sequence::Sequence;
use crate::service::store::cached_kv::LeaderCachedKvStore;
use crate::service::store::kv::KvBackendAdapter;
use crate::service::store::memory::MemStore;
#[tokio::test]
@@ -168,13 +171,16 @@ mod tests {
let ctx = Context {
server_addr: "127.0.0.1:0000".to_string(),
in_memory,
kv_store,
kv_store: kv_store.clone(),
leader_cached_kv_store,
meta_peer_client,
mailbox,
election: None,
skip_all: Arc::new(AtomicBool::new(false)),
is_infancy: false,
table_metadata_manager: Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap(
kv_store.clone(),
))),
};
let handler = PersistStatsHandler::default();

View File

@@ -19,6 +19,7 @@ use api::v1::meta::{HeartbeatRequest, RegionLease, Role};
use async_trait::async_trait;
use catalog::helper::TableGlobalKey;
use common_meta::ident::TableIdent;
use common_meta::key::TableMetadataManagerRef;
use common_meta::ClusterId;
use store_api::storage::{RegionId, RegionNumber};
@@ -37,16 +38,20 @@ pub(crate) const REGION_LEASE_SECONDS: u64 = 20;
pub(crate) struct RegionLeaseHandler {
kv_store: KvStoreRef,
region_failover_manager: Option<Arc<RegionFailoverManager>>,
#[allow(unused)]
table_metadata_manager: TableMetadataManagerRef,
}
impl RegionLeaseHandler {
pub(crate) fn new(
kv_store: KvStoreRef,
region_failover_manager: Option<Arc<RegionFailoverManager>>,
table_metadata_manager: TableMetadataManagerRef,
) -> Self {
Self {
kv_store,
region_failover_manager,
table_metadata_manager,
}
}
@@ -149,10 +154,12 @@ impl HeartbeatHandler for RegionLeaseHandler {
#[cfg(test)]
mod test {
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_meta::key::TableMetadataManager;
use super::*;
use crate::handler::node_stat::{RegionStat, Stat};
use crate::metasrv::builder::MetaSrvBuilder;
use crate::service::store::kv::KvBackendAdapter;
use crate::test_util;
#[tokio::test]
@@ -165,6 +172,9 @@ mod test {
.clone();
let table_name = "my_table";
let table_metadata_manager = Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap(
kv_store.clone(),
)));
let _ = table_routes::tests::prepare_table_global_value(&kv_store, table_name).await;
let table_ident = TableIdent {
@@ -184,7 +194,11 @@ mod test {
region_number: 1,
});
let handler = RegionLeaseHandler::new(kv_store, Some(region_failover_manager));
let handler = RegionLeaseHandler::new(
kv_store,
Some(region_failover_manager),
table_metadata_manager,
);
let req = HeartbeatRequest {
duration_since_epoch: 1234,

View File

@@ -51,12 +51,14 @@ mod tests {
use std::sync::Arc;
use api::v1::meta::{HeartbeatResponse, RequestHeader};
use common_meta::key::TableMetadataManager;
use super::*;
use crate::cluster::MetaPeerClientBuilder;
use crate::handler::{Context, HeartbeatMailbox, Pushers};
use crate::sequence::Sequence;
use crate::service::store::cached_kv::LeaderCachedKvStore;
use crate::service::store::kv::KvBackendAdapter;
use crate::service::store::memory::MemStore;
#[tokio::test]
@@ -77,13 +79,16 @@ mod tests {
let mut ctx = Context {
server_addr: "127.0.0.1:0000".to_string(),
in_memory,
kv_store,
kv_store: kv_store.clone(),
leader_cached_kv_store,
meta_peer_client,
mailbox,
election: None,
skip_all: Arc::new(AtomicBool::new(false)),
is_infancy: false,
table_metadata_manager: Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap(
kv_store.clone(),
))),
};
let req = HeartbeatRequest {

View File

@@ -19,6 +19,7 @@ use std::sync::Arc;
use api::v1::meta::Peer;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_meta::key::TableMetadataManagerRef;
use common_procedure::options::ProcedureConfig;
use common_procedure::ProcedureManagerRef;
use common_telemetry::logging::LoggingOptions;
@@ -90,6 +91,7 @@ pub struct Context {
pub election: Option<ElectionRef>,
pub skip_all: Arc<AtomicBool>,
pub is_infancy: bool,
pub table_metadata_manager: TableMetadataManagerRef,
}
impl Context {
@@ -145,6 +147,7 @@ pub struct MetaSrv {
metadata_service: MetadataServiceRef,
mailbox: MailboxRef,
ddl_manager: DdlManagerRef,
table_metadata_manager: TableMetadataManagerRef,
}
impl MetaSrv {
@@ -294,6 +297,10 @@ impl MetaSrv {
&self.procedure_manager
}
pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
&self.table_metadata_manager
}
#[inline]
pub fn new_ctx(&self) -> Context {
let server_addr = self.options().server_addr.clone();
@@ -314,6 +321,7 @@ impl MetaSrv {
election,
skip_all,
is_infancy: false,
table_metadata_manager: self.table_metadata_manager.clone(),
}
}
}

View File

@@ -16,6 +16,7 @@ use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use client::client_manager::DatanodeClients;
use common_meta::key::TableMetadataManager;
use common_procedure::local::{LocalManager, ManagerConfig};
use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
@@ -39,7 +40,7 @@ use crate::procedure::state_store::MetaStateStore;
use crate::selector::lease_based::LeaseBasedSelector;
use crate::sequence::Sequence;
use crate::service::store::cached_kv::{CheckLeader, LeaderCachedKvStore};
use crate::service::store::kv::{KvStoreRef, ResettableKvStoreRef};
use crate::service::store::kv::{KvBackendAdapter, KvStoreRef, ResettableKvStoreRef};
use crate::service::store::memory::MemStore;
// TODO(fys): try use derive_builder macro
@@ -173,6 +174,10 @@ impl MetaSrvBuilder {
.unwrap_or_else(|| Arc::new(DefaultMetadataService::new(kv_store.clone())));
let lock = lock.unwrap_or_else(|| Arc::new(MemLock::default()));
let table_metadata_manager = Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap(
kv_store.clone(),
)));
// TODO(weny): considers to modify the default config of procedure manager
let ddl_manager = Arc::new(DdlManager::new(
procedure_manager.clone(),
@@ -180,6 +185,7 @@ impl MetaSrvBuilder {
datanode_clients.unwrap_or_else(|| Arc::new(DatanodeClients::default())),
mailbox.clone(),
options.server_addr.clone(),
table_metadata_manager.clone(),
));
let _ = ddl_manager.try_start();
@@ -204,6 +210,7 @@ impl MetaSrvBuilder {
table: None,
},
lock.clone(),
table_metadata_manager.clone(),
));
Some(
@@ -217,6 +224,7 @@ impl MetaSrvBuilder {
region_failover_handler
.as_ref()
.map(|x| x.region_failover_manager().clone()),
table_metadata_manager.clone(),
);
let group = HeartbeatHandlerGroup::new(pushers);
@@ -254,6 +262,7 @@ impl MetaSrvBuilder {
metadata_service,
mailbox,
ddl_manager,
table_metadata_manager,
})
}
}

View File

@@ -27,6 +27,7 @@ use std::time::Duration;
use async_trait::async_trait;
use catalog::helper::TableGlobalKey;
use common_meta::ident::TableIdent;
use common_meta::key::TableMetadataManagerRef;
use common_meta::{ClusterId, RegionIdent};
use common_procedure::error::{
Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
@@ -74,6 +75,7 @@ pub(crate) struct RegionFailoverManager {
selector_ctx: SelectorContext,
dist_lock: DistLockRef,
running_procedures: Arc<RwLock<HashSet<RegionFailoverKey>>>,
table_metadata_manager: TableMetadataManagerRef,
}
struct FailoverProcedureGuard {
@@ -94,6 +96,7 @@ impl RegionFailoverManager {
selector: SelectorRef,
selector_ctx: SelectorContext,
dist_lock: DistLockRef,
table_metadata_manager: TableMetadataManagerRef,
) -> Self {
Self {
mailbox,
@@ -102,6 +105,7 @@ impl RegionFailoverManager {
selector_ctx,
dist_lock,
running_procedures: Arc::new(RwLock::new(HashSet::new())),
table_metadata_manager,
}
}
@@ -111,6 +115,7 @@ impl RegionFailoverManager {
selector: self.selector.clone(),
selector_ctx: self.selector_ctx.clone(),
dist_lock: self.dist_lock.clone(),
table_metadata_manager: self.table_metadata_manager.clone(),
}
}
@@ -228,6 +233,7 @@ pub struct RegionFailoverContext {
pub selector: SelectorRef,
pub selector_ctx: SelectorContext,
pub dist_lock: DistLockRef,
pub table_metadata_manager: TableMetadataManagerRef,
}
/// The state machine of region failover procedure. Driven by the call to `next`.
@@ -374,6 +380,7 @@ mod tests {
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE};
use common_meta::ident::TableIdent;
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_meta::key::TableMetadataManager;
use common_meta::DatanodeId;
use common_procedure::BoxedProcedure;
use rand::prelude::SliceRandom;
@@ -386,6 +393,7 @@ mod tests {
use crate::selector::{Namespace, Selector};
use crate::sequence::Sequence;
use crate::service::mailbox::Channel;
use crate::service::store::kv::{KvBackendAdapter, KvStoreRef};
use crate::service::store::memory::MemStore;
use crate::table_routes;
@@ -468,7 +476,7 @@ mod tests {
}
pub async fn build(self) -> TestingEnv {
let kv_store = Arc::new(MemStore::new()) as _;
let kv_store: KvStoreRef = Arc::new(MemStore::new());
let meta_peer_client = MetaPeerClientBuilder::default()
.election(None)
.in_memory(Arc::new(MemStore::new()))
@@ -478,6 +486,9 @@ mod tests {
.unwrap();
let table = "my_table";
let table_metadata_manager = Arc::new(TableMetadataManager::new(
KvBackendAdapter::wrap(kv_store.clone()),
));
let (_, table_global_value) =
table_routes::tests::prepare_table_global_value(&kv_store, table).await;
@@ -524,6 +535,7 @@ mod tests {
selector,
selector_ctx,
dist_lock: Arc::new(MemLock::default()),
table_metadata_manager,
},
pushers,
heartbeat_receivers,

View File

@@ -65,6 +65,7 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin {
"/tables",
meta::TablesHandler {
kv_store: meta_srv.kv_store(),
table_metadata_manager: meta_srv.table_metadata_manager().clone(),
},
);
@@ -72,6 +73,7 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin {
"/table",
meta::TableHandler {
kv_store: meta_srv.kv_store(),
table_metadata_manager: meta_srv.table_metadata_manager().clone(),
},
);

View File

@@ -17,6 +17,7 @@ use std::collections::HashMap;
use catalog::helper::{
build_catalog_prefix, build_schema_prefix, build_table_global_prefix, TABLE_GLOBAL_KEY_PREFIX,
};
use common_meta::key::TableMetadataManagerRef;
use common_meta::rpc::store::{RangeRequest, RangeResponse};
use common_meta::util;
use snafu::{OptionExt, ResultExt};
@@ -37,10 +38,12 @@ pub struct SchemasHandler {
pub struct TablesHandler {
pub kv_store: KvStoreRef,
pub table_metadata_manager: TableMetadataManagerRef,
}
pub struct TableHandler {
pub kv_store: KvStoreRef,
pub table_metadata_manager: TableMetadataManagerRef,
}
#[async_trait::async_trait]

View File

@@ -12,9 +12,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::sync::Arc;
use common_meta::kv_backend::KvBackend;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_meta::error::MetaSrvSnafu;
use common_meta::kv_backend::txn::{Txn, TxnResponse};
use common_meta::kv_backend::{KvBackend, KvBackendRef, TxnService};
use common_meta::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, PutResponse,
RangeRequest, RangeResponse,
};
use snafu::ResultExt;
use crate::error::Error;
@@ -24,3 +36,109 @@ pub type ResettableKvStoreRef = Arc<dyn ResettableKvStore>;
pub trait ResettableKvStore: KvBackend<Error = Error> {
fn reset(&self);
}
/// An adaptor to bridge [KvStoreRef] and [KvBackendRef].
pub struct KvBackendAdapter(KvStoreRef);
impl KvBackendAdapter {
pub fn wrap(kv_store: KvStoreRef) -> KvBackendRef {
Arc::new(Self(kv_store))
}
}
#[async_trait]
impl TxnService for KvBackendAdapter {
type Error = common_meta::error::Error;
async fn txn(&self, txn: Txn) -> Result<TxnResponse, Self::Error> {
self.0
.txn(txn)
.await
.map_err(BoxedError::new)
.context(MetaSrvSnafu)
}
}
#[async_trait]
impl KvBackend for KvBackendAdapter {
fn name(&self) -> &str {
self.0.name()
}
async fn range(&self, req: RangeRequest) -> Result<RangeResponse, Self::Error> {
self.0
.range(req)
.await
.map_err(BoxedError::new)
.context(MetaSrvSnafu)
}
async fn put(&self, req: PutRequest) -> Result<PutResponse, Self::Error> {
self.0
.put(req)
.await
.map_err(BoxedError::new)
.context(MetaSrvSnafu)
}
async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse, Self::Error> {
self.0
.batch_put(req)
.await
.map_err(BoxedError::new)
.context(MetaSrvSnafu)
}
async fn compare_and_put(
&self,
req: CompareAndPutRequest,
) -> Result<CompareAndPutResponse, Self::Error> {
self.0
.compare_and_put(req)
.await
.map_err(BoxedError::new)
.context(MetaSrvSnafu)
}
async fn delete_range(
&self,
req: DeleteRangeRequest,
) -> Result<DeleteRangeResponse, Self::Error> {
self.0
.delete_range(req)
.await
.map_err(BoxedError::new)
.context(MetaSrvSnafu)
}
async fn batch_delete(
&self,
req: BatchDeleteRequest,
) -> Result<BatchDeleteResponse, Self::Error> {
self.0
.batch_delete(req)
.await
.map_err(BoxedError::new)
.context(MetaSrvSnafu)
}
async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse, Self::Error> {
self.0
.batch_get(req)
.await
.map_err(BoxedError::new)
.context(MetaSrvSnafu)
}
async fn move_value(&self, req: MoveValueRequest) -> Result<MoveValueResponse, Self::Error> {
self.0
.move_value(req)
.await
.map_err(BoxedError::new)
.context(MetaSrvSnafu)
}
fn as_any(&self) -> &dyn Any {
self.0.as_any()
}
}

View File

@@ -14,6 +14,7 @@
use std::sync::Arc;
use common_meta::key::TableMetadataManager;
use common_procedure::local::{LocalManager, ManagerConfig};
use crate::cluster::MetaPeerClientBuilder;
@@ -24,6 +25,7 @@ use crate::procedure::region_failover::RegionFailoverManager;
use crate::procedure::state_store::MetaStateStore;
use crate::selector::lease_based::LeaseBasedSelector;
use crate::sequence::Sequence;
use crate::service::store::kv::KvBackendAdapter;
use crate::service::store::memory::MemStore;
pub(crate) fn create_region_failover_manager() -> Arc<RegionFailoverManager> {
@@ -49,7 +51,7 @@ pub(crate) fn create_region_failover_manager() -> Arc<RegionFailoverManager> {
let selector_ctx = SelectorContext {
datanode_lease_secs: 10,
server_addr: "127.0.0.1:3002".to_string(),
kv_store,
kv_store: kv_store.clone(),
meta_peer_client,
catalog: None,
schema: None,
@@ -62,5 +64,6 @@ pub(crate) fn create_region_failover_manager() -> Arc<RegionFailoverManager> {
selector,
selector_ctx,
Arc::new(MemLock::default()),
Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap(kv_store))),
))
}

View File

@@ -40,7 +40,7 @@ once_cell = "1.10"
partition = { path = "../partition" }
promql = { path = "../promql" }
promql-parser = "0.1.1"
regex = "1.6"
regex.workspace = true
serde.workspace = true
serde_json = "1.0"
session = { path = "../session" }

View File

@@ -170,7 +170,6 @@ mod tests {
.await
.unwrap(),
);
catalog_manager.start().await.unwrap();
let factory = QueryEngineFactory::new(catalog_manager.clone(), false);
let query_engine = factory.query_engine();

View File

@@ -62,7 +62,7 @@ promql-parser = "0.1.1"
prost.workspace = true
query = { path = "../query" }
rand.workspace = true
regex = "1.6"
regex.workspace = true
rustls = "0.21"
rustls-pemfile = "1.0"
rust-embed = { version = "6.6", features = ["debug-embed"] }

View File

@@ -28,7 +28,7 @@ datafusion.workspace = true
futures.workspace = true
futures-util.workspace = true
itertools.workspace = true
lazy_static = "1.4"
lazy_static.workspace = true
metrics.workspace = true
object-store = { path = "../object-store" }
parquet = { workspace = true, features = ["async"] }

View File

@@ -463,6 +463,12 @@ pub struct TableInfo {
pub type TableInfoRef = Arc<TableInfo>;
impl TableInfo {
pub fn table_id(&self) -> TableId {
self.ident.table_id
}
}
impl TableInfoBuilder {
pub fn new<S: Into<String>>(name: S, meta: TableMeta) -> Self {
Self {

View File

@@ -17,7 +17,6 @@ use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::Role;
use catalog::remote::RemoteCatalogManager;
use client::client_manager::DatanodeClients;
use client::Client;
use common_base::Plugins;
@@ -33,6 +32,7 @@ use frontend::frontend::FrontendOptions;
use frontend::instance::{FrontendInstance, Instance as FeInstance};
use meta_client::client::MetaClientBuilder;
use meta_srv::cluster::MetaPeerClientRef;
use meta_srv::metadata_service::{DefaultMetadataService, MetadataService};
use meta_srv::metasrv::{MetaSrv, MetaSrvOptions};
use meta_srv::mocks::MockInfo;
use meta_srv::service::store::kv::KvStoreRef;
@@ -128,7 +128,16 @@ impl GreptimeDbClusterBuilder {
..Default::default()
};
meta_srv::mocks::mock(opt, self.kv_store.clone(), None, Some(datanode_clients)).await
let mock =
meta_srv::mocks::mock(opt, self.kv_store.clone(), None, Some(datanode_clients)).await;
let metadata_service = DefaultMetadataService::new(mock.meta_srv.kv_store());
metadata_service
.create_schema("another_catalog", "another_schema", true)
.await
.unwrap();
mock
}
async fn build_datanodes(
@@ -208,15 +217,6 @@ impl GreptimeDbClusterBuilder {
if let Some(heartbeat) = heartbeat.as_ref() {
heartbeat.start().await.unwrap();
}
// create another catalog and schema for testing
instance
.catalog_manager()
.as_any()
.downcast_ref::<RemoteCatalogManager>()
.unwrap()
.create_catalog_and_schema("another_catalog", "another_schema")
.await
.unwrap();
(instance, heartbeat)
}

View File

@@ -369,6 +369,8 @@ pub async fn create_test_table(
pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router, TestGuard) {
let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name);
let (instance, heartbeat) = Instance::with_mock_meta_client(&opts).await.unwrap();
instance.start().await.unwrap();
create_test_table(
instance.catalog_manager(),
instance.sql_handler(),
@@ -380,7 +382,6 @@ pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router
let frontend_instance = FeInstance::try_new_standalone(instance.clone())
.await
.unwrap();
instance.start().await.unwrap();
if let Some(heartbeat) = heartbeat {
heartbeat.start().await.unwrap();
}

View File

@@ -20,6 +20,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use catalog::RegisterSchemaRequest;
use common_meta::key::TableMetadataManagerRef;
use common_test_util::temp_dir::TempDir;
use datanode::instance::Instance as DatanodeInstance;
use frontend::instance::Instance;
@@ -49,6 +50,11 @@ impl MockDistributedInstance {
pub fn datanodes(&self) -> &HashMap<u64, Arc<DatanodeInstance>> {
&self.0.datanode_instances
}
#[allow(unused)]
pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
self.0.meta_srv.table_metadata_manager()
}
}
pub struct MockStandaloneInstance {
@@ -73,6 +79,8 @@ pub(crate) async fn create_standalone_instance(test_name: &str) -> MockStandalon
.await
.unwrap();
dn_instance.start().await.unwrap();
assert!(dn_instance
.catalog_manager()
.register_catalog("another_catalog".to_string())
@@ -88,7 +96,6 @@ pub(crate) async fn create_standalone_instance(test_name: &str) -> MockStandalon
.await
.is_ok());
dn_instance.start().await.unwrap();
if let Some(heartbeat) = heartbeat {
heartbeat.start().await.unwrap();
};

View File

@@ -344,6 +344,7 @@ async fn run_region_failover_procedure(
table: None,
},
dist_lock: meta_srv.lock().clone(),
table_metadata_manager: meta_srv.table_metadata_manager().clone(),
},
);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));