refactor!: trying to replace TableGlobalValue, part 2 (#1985)

* refactor!: using the new table metadata values

* fix: resolve PR comments

* fix: resolve PR comments

* fix: resolve PR comments
This commit is contained in:
LFC
2023-07-19 20:01:43 +08:00
committed by GitHub
parent 2ef0d06cdb
commit 172febb1af
41 changed files with 936 additions and 1499 deletions

View File

@@ -17,28 +17,23 @@ use std::sync::Arc;
use async_trait::async_trait;
use common_catalog::consts::MITO_ENGINE;
use common_meta::helper::{
build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, SchemaKey,
TableGlobalKey, TableGlobalValue, TableRegionalKey, TableRegionalValue,
};
use common_meta::helper::{CatalogKey, SchemaKey};
use common_meta::ident::TableIdent;
use common_meta::key::datanode_table::DatanodeTableValue;
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::KvBackendRef;
use common_meta::rpc::store::{PutRequest, RangeRequest};
use common_meta::rpc::KeyValue;
use common_telemetry::{debug, error, info, warn};
use common_telemetry::{error, info, warn};
use metrics::increment_gauge;
use snafu::{ensure, OptionExt, ResultExt};
use table::engine::manager::TableEngineManagerRef;
use table::engine::{EngineContext, TableReference};
use table::engine::EngineContext;
use table::requests::OpenTableRequest;
use table::TableRef;
use tokio::sync::Mutex;
use crate::error::{
InvalidCatalogValueSnafu, OpenTableSnafu, ParallelOpenTableSnafu, Result,
TableEngineNotFoundSnafu, TableExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
UnimplementedSnafu,
OpenTableSnafu, ParallelOpenTableSnafu, Result, TableEngineNotFoundSnafu, TableExistsSnafu,
TableMetadataManagerSnafu, TableNotFoundSnafu, UnimplementedSnafu,
};
use crate::local::MemoryCatalogManager;
use crate::remote::region_alive_keeper::RegionAliveKeepers;
@@ -77,85 +72,25 @@ impl RemoteCatalogManager {
}
}
async fn iter_remote_catalogs(&self) -> Result<Vec<CatalogKey>> {
let catalog_range_prefix = build_catalog_prefix();
let req = RangeRequest::new().with_prefix(catalog_range_prefix.as_bytes());
let kvs = self
.backend
.range(req)
.await
.context(TableMetadataManagerSnafu)?
.kvs;
let catalogs = kvs
.into_iter()
.filter_map(|kv| {
let catalog_key = String::from_utf8_lossy(kv.key());
match CatalogKey::parse(&catalog_key) {
Ok(x) => Some(x),
Err(e) => {
error!(e; "Ignore invalid catalog key {:?}", catalog_key);
None
}
}
})
.collect();
Ok(catalogs)
}
/// Fetch catalogs/schemas/tables from remote catalog manager along with max table id allocated.
async fn initiate_catalogs(&self) -> Result<()> {
let catalogs = self.iter_remote_catalogs().await?;
let mut joins = Vec::new();
for CatalogKey { catalog_name } in catalogs {
info!("Fetch catalog from metasrv: {}", catalog_name);
let tables = self
.table_metadata_manager
.datanode_table_manager()
.tables(self.node_id)
.await
.context(TableMetadataManagerSnafu)?;
let node_id = self.node_id;
let backend = self.backend.clone();
let engine_manager = self.engine_manager.clone();
increment_gauge!(crate::metrics::METRIC_CATALOG_MANAGER_CATALOG_COUNT, 1.0);
joins.push(self.initiate_schemas(node_id, backend, engine_manager, catalog_name));
}
futures::future::try_join_all(joins).await?;
Ok(())
}
fn build_schema_key(&self, catalog_name: String, schema_name: String) -> SchemaKey {
SchemaKey {
catalog_name,
schema_name,
}
}
/// Initiates all tables inside the catalog by fetching data from metasrv.
/// Return maximum table id in the schema.
async fn initiate_tables(
&self,
node_id: u64,
backend: KvBackendRef,
engine_manager: TableEngineManagerRef,
catalog_name: String,
schema_name: String,
) -> Result<()> {
info!("initializing tables in {}.{}", catalog_name, schema_name);
let kvs = iter_remote_tables(node_id, &backend, &catalog_name, &schema_name).await?;
let joins = kvs
let joins = tables
.into_iter()
.map(|(_, table_value)| {
let engine_manager = engine_manager.clone();
.map(|datanode_table_value| {
let engine_manager = self.engine_manager.clone();
let memory_catalog_manager = self.memory_catalog_manager.clone();
let table_metadata_manager = self.table_metadata_manager.clone();
let table_id = table_value.table_id();
common_runtime::spawn_bg(async move {
let table_id = datanode_table_value.table_id;
if let Err(e) = open_and_register_table(
node_id,
engine_manager,
&table_value,
datanode_table_value,
memory_catalog_manager,
table_metadata_manager,
)
@@ -170,261 +105,44 @@ impl RemoteCatalogManager {
})
.collect::<Vec<_>>();
futures::future::try_join_all(joins)
let _ = futures::future::try_join_all(joins)
.await
.context(ParallelOpenTableSnafu)?;
Ok(())
}
/// Initiates all schemas inside the catalog by fetching data from metasrv.
/// Return maximum table id in the catalog.
async fn initiate_schemas(
&self,
node_id: u64,
backend: KvBackendRef,
engine_manager: TableEngineManagerRef,
catalog_name: String,
) -> Result<()> {
let schemas = iter_remote_schemas(&backend, &catalog_name).await?;
let mut joins = Vec::new();
for SchemaKey {
fn build_schema_key(&self, catalog_name: String, schema_name: String) -> SchemaKey {
SchemaKey {
catalog_name,
schema_name,
} in schemas
{
info!(
"Fetch schema from metasrv: {}.{}",
&catalog_name, &schema_name
);
increment_gauge!(crate::metrics::METRIC_CATALOG_MANAGER_SCHEMA_COUNT, 1.0);
let backend = backend.clone();
let engine_manager = engine_manager.clone();
joins.push(self.initiate_tables(
node_id,
backend,
engine_manager,
catalog_name,
schema_name,
));
}
futures::future::try_join_all(joins).await?;
Ok(())
}
async fn register_table(
&self,
catalog_name: String,
schema_name: String,
table_name: String,
table: TableRef,
) -> Result<Option<TableRef>> {
let table_info = table.table_info();
let table_version = table_info.ident.version;
let table_value = TableRegionalValue {
table_id: Some(table.table_info().ident.table_id),
version: table_version,
regions_ids: table.table_info().meta.region_numbers.clone(),
engine_name: Some(table_info.meta.engine.clone()),
};
let table_key = self
.build_regional_table_key(catalog_name, schema_name, table_name)
.to_string();
let req = PutRequest::new()
.with_key(table_key.as_bytes())
.with_value(table_value.as_bytes().context(InvalidCatalogValueSnafu)?);
self.backend
.put(req)
.await
.context(TableMetadataManagerSnafu)?;
debug!(
"Successfully set catalog table entry, key: {}, table value: {:?}",
table_key, table_value
);
// TODO(hl): retrieve prev table info using cas
Ok(None)
}
async fn deregister_table(
&self,
catalog_name: String,
schema_name: String,
table_name: String,
) -> Result<Option<TableRef>> {
let table_key = self
.build_regional_table_key(
catalog_name.clone(),
schema_name.clone(),
table_name.clone(),
)
.to_string();
let engine_opt = self
.backend
.get(table_key.as_bytes())
.await
.context(TableMetadataManagerSnafu)?
.map(|KeyValue { key: _, value: v }| {
let TableRegionalValue {
table_id,
engine_name,
..
} = TableRegionalValue::parse(String::from_utf8_lossy(&v))
.context(InvalidCatalogValueSnafu)?;
Ok(engine_name.and_then(|name| table_id.map(|id| (name, id))))
})
.transpose()?
.flatten();
let Some((engine_name, table_id)) = engine_opt else {
warn!("Cannot find table id and engine name for {table_key}");
return Ok(None);
};
self.backend
.delete(table_key.as_bytes(), false)
.await
.context(TableMetadataManagerSnafu)?;
debug!(
"Successfully deleted catalog table entry, key: {}",
table_key
);
// deregistering table does not necessarily mean dropping the table
let table = self
.engine_manager
.engine(&engine_name)
.context(TableEngineNotFoundSnafu { engine_name })?
.get_table(&EngineContext {}, table_id)
.with_context(|_| {
let reference = TableReference {
catalog: &catalog_name,
schema: &schema_name,
table: &table_name,
};
OpenTableSnafu {
table_info: reference.to_string(),
}
})?;
Ok(table)
}
fn build_regional_table_key(
&self,
catalog_name: String,
schema_name: String,
table_name: String,
) -> TableRegionalKey {
TableRegionalKey {
catalog_name,
schema_name,
table_name,
node_id: self.node_id,
}
}
}
async fn iter_remote_schemas<'a>(
backend: &'a KvBackendRef,
catalog_name: &'a str,
) -> Result<Vec<SchemaKey>> {
let schema_prefix = build_schema_prefix(catalog_name);
let req = RangeRequest::new().with_prefix(schema_prefix.as_bytes());
let kvs = backend
.range(req)
.await
.context(TableMetadataManagerSnafu)?
.kvs;
let schemas = kvs
.into_iter()
.filter_map(|kv| {
let schema_key = String::from_utf8_lossy(kv.key());
match SchemaKey::parse(&schema_key) {
Ok(x) => Some(x),
Err(e) => {
warn!("Ignore invalid schema key {:?}: {e}", schema_key);
None
}
}
})
.collect();
Ok(schemas)
}
/// Iterate over all table entries on metasrv
async fn iter_remote_tables<'a>(
node_id: u64,
backend: &'a KvBackendRef,
catalog_name: &'a str,
schema_name: &'a str,
) -> Result<Vec<(TableGlobalKey, TableGlobalValue)>> {
let table_prefix = build_table_global_prefix(catalog_name, schema_name);
let req = RangeRequest::new().with_prefix(table_prefix.as_bytes());
let kvs = backend
.range(req)
.await
.context(TableMetadataManagerSnafu)?
.kvs;
let mut tables = Vec::with_capacity(kvs.len());
for kv in kvs {
let tgk = &String::from_utf8_lossy(kv.key());
let Ok(table_key) = TableGlobalKey::parse(tgk) else {
warn!("Ignore invalid table global key {:?}", tgk);
continue;
};
let Ok(table_value) = TableGlobalValue::from_bytes(kv.value()) else {
warn!("Ignore invalid table global value {:?}", String::from_utf8_lossy(kv.value()));
continue;
};
info!("Found catalog table entry, key: {table_key}, value: {table_value:?}");
// metasrv has allocated region ids to current datanode
if table_value
.regions_id_map
.get(&node_id)
.map(|v| !v.is_empty())
.unwrap_or(false)
{
tables.push((table_key, table_value))
}
}
Ok(tables)
}
async fn open_and_register_table(
node_id: u64,
engine_manager: TableEngineManagerRef,
table_value: &TableGlobalValue,
datanode_table_value: DatanodeTableValue,
memory_catalog_manager: Arc<MemoryCatalogManager>,
_table_metadata_manager: TableMetadataManagerRef,
table_metadata_manager: TableMetadataManagerRef,
) -> Result<()> {
let context = EngineContext {};
let table_id = table_value.table_id();
let TableGlobalValue {
table_info,
regions_id_map,
..
} = table_value;
let table_id = datanode_table_value.table_id;
let region_numbers = datanode_table_value.regions;
let table_info_value = table_metadata_manager
.table_info_manager()
.get(table_id)
.await
.context(TableMetadataManagerSnafu)?
.context(TableNotFoundSnafu {
table_info: format!("table id: {table_id}"),
})?;
let table_info = &table_info_value.table_info;
let catalog_name = table_info.catalog_name.clone();
let schema_name = table_info.schema_name.clone();
let table_name = table_info.name.clone();
// unwrap safety: checked in yielding this table when `iter_remote_tables`
let region_numbers = regions_id_map.get(&node_id).unwrap();
let request = OpenTableRequest {
catalog_name: catalog_name.clone(),
schema_name: schema_name.clone(),
@@ -511,17 +229,11 @@ impl CatalogManager for RemoteCatalogManager {
}
async fn register_table(&self, request: RegisterTableRequest) -> Result<bool> {
let catalog = request.catalog.clone();
let schema = request.schema.clone();
let table_name = request.table_name.clone();
let table = request.table.clone();
let registered = self.memory_catalog_manager.register_table_sync(request)?;
if registered {
self.register_table(catalog, schema, table_name, table.clone())
.await?;
let table_info = table.table_info();
let table_ident = TableIdent {
catalog: table_info.catalog_name.clone(),
@@ -544,13 +256,6 @@ impl CatalogManager for RemoteCatalogManager {
.table(&request.catalog, &request.schema, &request.table_name)
.await? else { return Ok(()) };
self.deregister_table(
request.catalog.clone(),
request.schema.clone(),
request.table_name.clone(),
)
.await?;
let table_info = table.table_info();
let table_ident = TableIdent {
catalog: request.catalog.clone(),