mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
* feat: Implement `table_info()`` for `DistTable` (#536) * Update src/catalog/src/error.rs Co-authored-by: Yingwen <1405012107@qq.com> Co-authored-by: luofucong <luofucong@greptime.com> Co-authored-by: Yingwen <1405012107@qq.com>
This commit is contained in:
@@ -185,8 +185,8 @@ pub enum Error {
|
||||
source: meta_client::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid table schema in catalog, source: {:?}", source))]
|
||||
InvalidSchemaInCatalog {
|
||||
#[snafu(display("Invalid table info in catalog, source: {}", source))]
|
||||
InvalidTableInfoInCatalog {
|
||||
#[snafu(backtrace)]
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
@@ -233,7 +233,7 @@ impl ErrorExt for Error {
|
||||
Error::SystemCatalogTableScan { source } => source.status_code(),
|
||||
Error::SystemCatalogTableScanExec { source } => source.status_code(),
|
||||
Error::InvalidTableSchema { source, .. } => source.status_code(),
|
||||
Error::InvalidSchemaInCatalog { .. } => StatusCode::Unexpected,
|
||||
Error::InvalidTableInfoInCatalog { .. } => StatusCode::Unexpected,
|
||||
Error::Internal { source, .. } => source.status_code(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -250,10 +250,7 @@ impl RemoteCatalogManager {
|
||||
let table_ref = self.open_or_create_table(&table_key, &table_value).await?;
|
||||
schema.register_table(table_key.table_name.to_string(), table_ref)?;
|
||||
info!("Registered table {}", &table_key.table_name);
|
||||
if table_value.id > max_table_id {
|
||||
info!("Max table id: {} -> {}", max_table_id, table_value.id);
|
||||
max_table_id = table_value.id;
|
||||
}
|
||||
max_table_id = max_table_id.max(table_value.table_id());
|
||||
table_num += 1;
|
||||
}
|
||||
info!(
|
||||
@@ -311,9 +308,10 @@ impl RemoteCatalogManager {
|
||||
..
|
||||
} = table_key;
|
||||
|
||||
let table_id = table_value.table_id();
|
||||
|
||||
let TableGlobalValue {
|
||||
id,
|
||||
meta,
|
||||
table_info,
|
||||
regions_id_map,
|
||||
..
|
||||
} = table_value;
|
||||
@@ -322,14 +320,17 @@ impl RemoteCatalogManager {
|
||||
catalog_name: catalog_name.clone(),
|
||||
schema_name: schema_name.clone(),
|
||||
table_name: table_name.clone(),
|
||||
table_id: *id,
|
||||
table_id,
|
||||
};
|
||||
match self
|
||||
.engine
|
||||
.open_table(&context, request)
|
||||
.await
|
||||
.with_context(|_| OpenTableSnafu {
|
||||
table_info: format!("{}.{}.{}, id:{}", catalog_name, schema_name, table_name, id,),
|
||||
table_info: format!(
|
||||
"{}.{}.{}, id:{}",
|
||||
catalog_name, schema_name, table_name, table_id
|
||||
),
|
||||
})? {
|
||||
Some(table) => {
|
||||
info!(
|
||||
@@ -344,6 +345,7 @@ impl RemoteCatalogManager {
|
||||
catalog_name, schema_name, table_name
|
||||
);
|
||||
|
||||
let meta = &table_info.meta;
|
||||
let schema = meta
|
||||
.schema
|
||||
.clone()
|
||||
@@ -353,7 +355,7 @@ impl RemoteCatalogManager {
|
||||
schema: meta.schema.clone(),
|
||||
})?;
|
||||
let req = CreateTableRequest {
|
||||
id: *id,
|
||||
id: table_id,
|
||||
catalog_name: catalog_name.clone(),
|
||||
schema_name: schema_name.clone(),
|
||||
table_name: table_name.clone(),
|
||||
@@ -371,7 +373,7 @@ impl RemoteCatalogManager {
|
||||
.context(CreateTableSnafu {
|
||||
table_info: format!(
|
||||
"{}.{}.{}, id:{}",
|
||||
&catalog_name, &schema_name, &table_name, id
|
||||
&catalog_name, &schema_name, &table_name, table_id
|
||||
),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ use lazy_static::lazy_static;
|
||||
use regex::Regex;
|
||||
use serde::{Deserialize, Serialize, Serializer};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use table::metadata::{RawTableMeta, TableId, TableVersion};
|
||||
use table::metadata::{RawTableInfo, TableId, TableVersion};
|
||||
|
||||
use crate::consts::{
|
||||
CATALOG_KEY_PREFIX, SCHEMA_KEY_PREFIX, TABLE_GLOBAL_KEY_PREFIX, TABLE_REGIONAL_KEY_PREFIX,
|
||||
@@ -128,15 +128,18 @@ impl TableGlobalKey {
|
||||
/// table id, table meta(schema...), region id allocation across datanodes.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
pub struct TableGlobalValue {
|
||||
/// Table id is the same across all datanodes.
|
||||
pub id: TableId,
|
||||
/// Id of datanode that created the global table info kv. only for debugging.
|
||||
pub node_id: u64,
|
||||
// TODO(LFC): Maybe remove it?
|
||||
/// Allocation of region ids across all datanodes.
|
||||
pub regions_id_map: HashMap<u64, Vec<u32>>,
|
||||
// TODO(LFC): Too much for assembling the table schema that DistTable needs, find another way.
|
||||
pub meta: RawTableMeta,
|
||||
pub table_info: RawTableInfo,
|
||||
}
|
||||
|
||||
impl TableGlobalValue {
|
||||
pub fn table_id(&self) -> TableId {
|
||||
self.table_info.ident.table_id
|
||||
}
|
||||
}
|
||||
|
||||
/// Table regional info that varies between datanode, so it contains a `node_id` field.
|
||||
@@ -279,6 +282,7 @@ define_catalog_value!(
|
||||
mod tests {
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::{ColumnSchema, RawSchema, Schema};
|
||||
use table::metadata::{RawTableMeta, TableIdent, TableType};
|
||||
|
||||
use super::*;
|
||||
|
||||
@@ -339,11 +343,23 @@ mod tests {
|
||||
region_numbers: vec![1],
|
||||
};
|
||||
|
||||
let table_info = RawTableInfo {
|
||||
ident: TableIdent {
|
||||
table_id: 42,
|
||||
version: 1,
|
||||
},
|
||||
name: "table_1".to_string(),
|
||||
desc: Some("blah".to_string()),
|
||||
catalog_name: "catalog_1".to_string(),
|
||||
schema_name: "schema_1".to_string(),
|
||||
meta,
|
||||
table_type: TableType::Base,
|
||||
};
|
||||
|
||||
let value = TableGlobalValue {
|
||||
id: 42,
|
||||
node_id: 0,
|
||||
regions_id_map: HashMap::from([(0, vec![1, 2, 3])]),
|
||||
meta,
|
||||
table_info,
|
||||
};
|
||||
let serialized = serde_json::to_string(&value).unwrap();
|
||||
let deserialized = TableGlobalValue::parse(&serialized).unwrap();
|
||||
|
||||
@@ -16,7 +16,7 @@ use std::any::Any;
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use catalog::error::{InvalidCatalogValueSnafu, InvalidSchemaInCatalogSnafu};
|
||||
use catalog::error::{self as catalog_err, InvalidCatalogValueSnafu};
|
||||
use catalog::remote::{Kv, KvBackendRef};
|
||||
use catalog::{
|
||||
CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, RegisterSchemaRequest,
|
||||
@@ -276,17 +276,16 @@ impl SchemaProvider for FrontendSchemaProvider {
|
||||
let val = TableGlobalValue::parse(String::from_utf8_lossy(&res.1))
|
||||
.context(InvalidCatalogValueSnafu)?;
|
||||
|
||||
let table = Arc::new(DistTable {
|
||||
let table = Arc::new(DistTable::new(
|
||||
table_name,
|
||||
schema: Arc::new(
|
||||
val.meta
|
||||
.schema
|
||||
Arc::new(
|
||||
val.table_info
|
||||
.try_into()
|
||||
.context(InvalidSchemaInCatalogSnafu)?,
|
||||
.context(catalog_err::InvalidTableInfoInCatalogSnafu)?,
|
||||
),
|
||||
table_routes,
|
||||
datanode_clients,
|
||||
});
|
||||
));
|
||||
Ok(Some(table as _))
|
||||
})
|
||||
})
|
||||
|
||||
@@ -37,7 +37,7 @@ use sql::statements::create::Partitions;
|
||||
use sql::statements::sql_value_to_value;
|
||||
use sql::statements::statement::Statement;
|
||||
use sqlparser::ast::Value as SqlValue;
|
||||
use table::metadata::RawTableMeta;
|
||||
use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType};
|
||||
|
||||
use crate::catalog::FrontendCatalogManager;
|
||||
use crate::datanode::DatanodeClients;
|
||||
@@ -274,11 +274,23 @@ fn create_table_global_value(
|
||||
created_on: DateTime::default(),
|
||||
};
|
||||
|
||||
let table_info = RawTableInfo {
|
||||
ident: TableIdent {
|
||||
table_id: table_route.table.id as u32,
|
||||
version: 0,
|
||||
},
|
||||
name: table_name.table_name.clone(),
|
||||
desc: create_table.desc.clone(),
|
||||
catalog_name: table_name.catalog_name.clone(),
|
||||
schema_name: table_name.schema_name.clone(),
|
||||
meta,
|
||||
table_type: TableType::Base,
|
||||
};
|
||||
|
||||
Ok(TableGlobalValue {
|
||||
id: table_route.table.id as u32,
|
||||
node_id,
|
||||
regions_id_map: HashMap::new(),
|
||||
meta,
|
||||
table_info,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -55,10 +55,10 @@ pub(crate) mod scan;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct DistTable {
|
||||
pub(crate) table_name: TableName,
|
||||
pub(crate) schema: SchemaRef,
|
||||
pub(crate) table_routes: Arc<TableRoutes>,
|
||||
pub(crate) datanode_clients: Arc<DatanodeClients>,
|
||||
table_name: TableName,
|
||||
table_info: TableInfoRef,
|
||||
table_routes: Arc<TableRoutes>,
|
||||
datanode_clients: Arc<DatanodeClients>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -68,11 +68,11 @@ impl Table for DistTable {
|
||||
}
|
||||
|
||||
fn schema(&self) -> SchemaRef {
|
||||
self.schema.clone()
|
||||
self.table_info.meta.schema.clone()
|
||||
}
|
||||
|
||||
fn table_info(&self) -> TableInfoRef {
|
||||
unimplemented!()
|
||||
self.table_info.clone()
|
||||
}
|
||||
|
||||
async fn insert(&self, request: InsertRequest) -> table::Result<usize> {
|
||||
@@ -133,6 +133,20 @@ impl Table for DistTable {
|
||||
}
|
||||
|
||||
impl DistTable {
|
||||
pub(crate) fn new(
|
||||
table_name: TableName,
|
||||
table_info: TableInfoRef,
|
||||
table_routes: Arc<TableRoutes>,
|
||||
datanode_clients: Arc<DatanodeClients>,
|
||||
) -> Self {
|
||||
Self {
|
||||
table_name,
|
||||
table_info,
|
||||
table_routes,
|
||||
datanode_clients,
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(LFC): Finding regions now seems less efficient, should be further looked into.
|
||||
fn find_regions(
|
||||
&self,
|
||||
@@ -477,6 +491,7 @@ mod test {
|
||||
use sql::parser::ParserContext;
|
||||
use sql::statements::statement::Statement;
|
||||
use sqlparser::dialect::GenericDialect;
|
||||
use table::metadata::{TableInfoBuilder, TableMetaBuilder};
|
||||
use table::TableRef;
|
||||
use tempdir::TempDir;
|
||||
|
||||
@@ -496,11 +511,22 @@ mod test {
|
||||
ColumnSchema::new("b", ConcreteDataType::string_datatype(), true),
|
||||
];
|
||||
let schema = Arc::new(Schema::new(column_schemas.clone()));
|
||||
let meta = TableMetaBuilder::default()
|
||||
.schema(schema)
|
||||
.primary_key_indices(vec![])
|
||||
.next_column_id(1)
|
||||
.build()
|
||||
.unwrap();
|
||||
let table_info = TableInfoBuilder::default()
|
||||
.name(&table_name.table_name)
|
||||
.meta(meta)
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let table_routes = Arc::new(TableRoutes::new(Arc::new(MetaClient::default())));
|
||||
let table = DistTable {
|
||||
table_name: table_name.clone(),
|
||||
schema,
|
||||
table_info: Arc::new(table_info),
|
||||
table_routes: table_routes.clone(),
|
||||
datanode_clients: Arc::new(DatanodeClients::new()),
|
||||
};
|
||||
@@ -862,9 +888,20 @@ mod test {
|
||||
insert_testing_data(&table_name, instance.clone(), numbers, start_ts).await;
|
||||
}
|
||||
|
||||
let meta = TableMetaBuilder::default()
|
||||
.schema(schema)
|
||||
.primary_key_indices(vec![])
|
||||
.next_column_id(1)
|
||||
.build()
|
||||
.unwrap();
|
||||
let table_info = TableInfoBuilder::default()
|
||||
.name(&table_name.table_name)
|
||||
.meta(meta)
|
||||
.build()
|
||||
.unwrap();
|
||||
DistTable {
|
||||
table_name,
|
||||
schema,
|
||||
table_info: Arc::new(table_info),
|
||||
table_routes,
|
||||
datanode_clients,
|
||||
}
|
||||
@@ -968,9 +1005,21 @@ mod test {
|
||||
ConcreteDataType::int32_datatype(),
|
||||
true,
|
||||
)]));
|
||||
let table_name = TableName::new("greptime", "public", "foo");
|
||||
let meta = TableMetaBuilder::default()
|
||||
.schema(schema)
|
||||
.primary_key_indices(vec![])
|
||||
.next_column_id(1)
|
||||
.build()
|
||||
.unwrap();
|
||||
let table_info = TableInfoBuilder::default()
|
||||
.name(&table_name.table_name)
|
||||
.meta(meta)
|
||||
.build()
|
||||
.unwrap();
|
||||
let table = DistTable {
|
||||
table_name: TableName::new("greptime", "public", "foo"),
|
||||
schema,
|
||||
table_name,
|
||||
table_info: Arc::new(table_info),
|
||||
table_routes: Arc::new(TableRoutes::new(Arc::new(MetaClient::default()))),
|
||||
datanode_clients: Arc::new(DatanodeClients::new()),
|
||||
};
|
||||
|
||||
@@ -184,8 +184,7 @@ async fn fetch_tables(
|
||||
}
|
||||
let tv = tv.unwrap();
|
||||
|
||||
let table_id = tv.id as u64;
|
||||
let tr_key = TableRouteKey::with_table_global_key(table_id, &tk);
|
||||
let tr_key = TableRouteKey::with_table_global_key(tv.table_id() as u64, &tk);
|
||||
let tr = get_table_route_value(kv_store, &tr_key).await?;
|
||||
|
||||
tables.push((tv, tr));
|
||||
|
||||
Reference in New Issue
Block a user