diff --git a/Cargo.lock b/Cargo.lock index 85f85fb66a..cc28f66fab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2000,6 +2000,7 @@ dependencies = [ "catalog", "client", "common-base", + "common-catalog", "common-error", "common-grpc", "common-query", @@ -2014,10 +2015,12 @@ dependencies = [ "datatypes", "futures", "itertools", + "meta-client", "openmetrics-parser", "prost 0.11.0", "query", "serde", + "serde_json", "servers", "snafu", "sql", diff --git a/src/api/greptime/v1/insert.proto b/src/api/greptime/v1/insert.proto index 0b2b6d6343..0e173723a6 100644 --- a/src/api/greptime/v1/insert.proto +++ b/src/api/greptime/v1/insert.proto @@ -9,6 +9,6 @@ message InsertBatch { uint32 row_count = 2; } -message RegionId { - uint64 id = 1; +message RegionNumber { + uint32 id = 1; } diff --git a/src/api/src/serde.rs b/src/api/src/serde.rs index f887dab918..2e65330b95 100644 --- a/src/api/src/serde.rs +++ b/src/api/src/serde.rs @@ -1,7 +1,7 @@ pub use prost::DecodeError; use prost::Message; -use crate::v1::codec::{InsertBatch, PhysicalPlanNode, RegionId, SelectResult}; +use crate::v1::codec::{InsertBatch, PhysicalPlanNode, RegionNumber, SelectResult}; macro_rules! impl_convert_with_bytes { ($data_type: ty) => { @@ -24,7 +24,7 @@ macro_rules! impl_convert_with_bytes { impl_convert_with_bytes!(InsertBatch); impl_convert_with_bytes!(SelectResult); impl_convert_with_bytes!(PhysicalPlanNode); -impl_convert_with_bytes!(RegionId); +impl_convert_with_bytes!(RegionNumber); #[cfg(test)] mod tests { @@ -130,10 +130,10 @@ mod tests { #[test] fn test_convert_region_id() { - let region_id = RegionId { id: 12 }; + let region_id = RegionNumber { id: 12 }; let bytes: Vec = region_id.into(); - let region_id: RegionId = bytes.deref().try_into().unwrap(); + let region_id: RegionNumber = bytes.deref().try_into().unwrap(); assert_eq!(12, region_id.id); } diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index d4ad62fd21..bb251aed57 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -155,6 +155,25 @@ pub enum Error { #[snafu(display("Failed to parse table id from metasrv, data: {:?}", data))] ParseTableId { data: String, backtrace: Backtrace }, + + #[snafu(display("Failed to deserialize partition rule from string: {:?}", data))] + DeserializePartitionRule { + data: String, + source: serde_json::error::Error, + backtrace: Backtrace, + }, + + #[snafu(display("Invalid table schema in catalog, source: {:?}", source))] + InvalidSchemaInCatalog { + #[snafu(backtrace)] + source: datatypes::error::Error, + }, + + #[snafu(display("Catalog internal error: {}", source))] + Internal { + #[snafu(backtrace)] + source: BoxedError, + }, } pub type Result = std::result::Result; @@ -194,6 +213,9 @@ impl ErrorExt for Error { Error::BumpTableId { .. } | Error::ParseTableId { .. } => { StatusCode::StorageUnavailable } + Error::DeserializePartitionRule { .. } => StatusCode::Unexpected, + Error::InvalidSchemaInCatalog { .. } => StatusCode::Unexpected, + Error::Internal { source, .. } => source.status_code(), } } diff --git a/src/catalog/src/local.rs b/src/catalog/src/local.rs index 087d86f01b..33cda37275 100644 --- a/src/catalog/src/local.rs +++ b/src/catalog/src/local.rs @@ -3,5 +3,5 @@ pub mod memory; pub use manager::LocalCatalogManager; pub use memory::{ - new_memory_catalog_list, MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider, + new_memory_catalog_list, MemoryCatalogManager, MemoryCatalogProvider, MemorySchemaProvider, }; diff --git a/src/catalog/src/local/manager.rs b/src/catalog/src/local/manager.rs index e678e9e8dd..3aba2ed3a0 100644 --- a/src/catalog/src/local/manager.rs +++ b/src/catalog/src/local/manager.rs @@ -25,7 +25,7 @@ use crate::error::{ SchemaNotFoundSnafu, SystemCatalogSnafu, SystemCatalogTypeMismatchSnafu, TableExistsSnafu, TableNotFoundSnafu, }; -use crate::local::memory::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider}; +use crate::local::memory::{MemoryCatalogManager, MemoryCatalogProvider, MemorySchemaProvider}; use crate::system::{ decode_system_catalog, Entry, SystemCatalogTable, TableEntry, ENTRY_TYPE_INDEX, KEY_INDEX, VALUE_INDEX, @@ -40,7 +40,7 @@ use crate::{ /// A `CatalogManager` consists of a system catalog and a bunch of user catalogs. pub struct LocalCatalogManager { system: Arc, - catalogs: Arc, + catalogs: Arc, engine: TableEngineRef, next_table_id: AtomicU32, init_lock: Mutex, diff --git a/src/catalog/src/local/memory.rs b/src/catalog/src/local/memory.rs index f195b07718..fb04c59599 100644 --- a/src/catalog/src/local/memory.rs +++ b/src/catalog/src/local/memory.rs @@ -1,23 +1,94 @@ use std::any::Any; use std::collections::hash_map::Entry; use std::collections::HashMap; +use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; use std::sync::RwLock; +use common_catalog::consts::MIN_USER_TABLE_ID; +use snafu::OptionExt; +use table::metadata::TableId; use table::TableRef; -use crate::error::{Result, TableExistsSnafu}; +use crate::error::{CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu}; use crate::schema::SchemaProvider; -use crate::{CatalogList, CatalogProvider, CatalogProviderRef, SchemaProviderRef}; +use crate::{ + CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, RegisterSystemTableRequest, + RegisterTableRequest, SchemaProviderRef, +}; /// Simple in-memory list of catalogs -#[derive(Default)] -pub struct MemoryCatalogList { +pub struct MemoryCatalogManager { /// Collection of catalogs containing schemas and ultimately Tables pub catalogs: RwLock>, + pub table_id: AtomicU32, } -impl MemoryCatalogList { +impl Default for MemoryCatalogManager { + fn default() -> Self { + let manager = Self { + table_id: AtomicU32::new(MIN_USER_TABLE_ID), + catalogs: Default::default(), + }; + let default_catalog = Arc::new(MemoryCatalogProvider::new()); + manager + .register_catalog("greptime".to_string(), default_catalog.clone()) + .unwrap(); + default_catalog + .register_schema("public".to_string(), Arc::new(MemorySchemaProvider::new())) + .unwrap(); + manager + } +} + +#[async_trait::async_trait] +impl CatalogManager for MemoryCatalogManager { + async fn start(&self) -> Result<()> { + self.table_id.store(MIN_USER_TABLE_ID, Ordering::Relaxed); + Ok(()) + } + + async fn next_table_id(&self) -> Result { + Ok(self.table_id.fetch_add(1, Ordering::Relaxed)) + } + + async fn register_table(&self, request: RegisterTableRequest) -> Result { + let catalogs = self.catalogs.write().unwrap(); + let catalog = catalogs + .get(&request.catalog) + .context(CatalogNotFoundSnafu { + catalog_name: &request.catalog, + })? + .clone(); + let schema = catalog + .schema(&request.schema)? + .with_context(|| SchemaNotFoundSnafu { + schema_info: format!("{}.{}", &request.catalog, &request.schema), + })?; + schema + .register_table(request.table_name, request.table) + .map(|v| if v.is_some() { 0 } else { 1 }) + } + + async fn register_system_table(&self, _request: RegisterSystemTableRequest) -> Result<()> { + unimplemented!() + } + + fn table(&self, catalog: &str, schema: &str, table_name: &str) -> Result> { + let c = self.catalogs.read().unwrap(); + let catalog = if let Some(c) = c.get(catalog) { + c.clone() + } else { + return Ok(None); + }; + match catalog.schema(schema)? { + None => Ok(None), + Some(s) => s.table(table_name), + } + } +} + +impl MemoryCatalogManager { /// Registers a catalog and return `None` if no catalog with the same name was already /// registered, or `Some` with the previously registered catalog. pub fn register_catalog_if_absent( @@ -37,7 +108,7 @@ impl MemoryCatalogList { } } -impl CatalogList for MemoryCatalogList { +impl CatalogList for MemoryCatalogManager { fn as_any(&self) -> &dyn Any { self } @@ -162,8 +233,8 @@ impl SchemaProvider for MemorySchemaProvider { } /// Create a memory catalog list contains a numbers table for test -pub fn new_memory_catalog_list() -> Result> { - Ok(Arc::new(MemoryCatalogList::default())) +pub fn new_memory_catalog_list() -> Result> { + Ok(Arc::new(MemoryCatalogManager::default())) } #[cfg(test)] @@ -178,23 +249,11 @@ mod tests { #[test] fn test_new_memory_catalog_list() { let catalog_list = new_memory_catalog_list().unwrap(); + let default_catalog = catalog_list.catalog(DEFAULT_CATALOG_NAME).unwrap().unwrap(); - assert!(catalog_list - .catalog(DEFAULT_CATALOG_NAME) - .unwrap() - .is_none()); - let default_catalog = Arc::new(MemoryCatalogProvider::default()); - catalog_list - .register_catalog(DEFAULT_CATALOG_NAME.to_string(), default_catalog.clone()) - .unwrap(); - - assert!(default_catalog + let default_schema = default_catalog .schema(DEFAULT_SCHEMA_NAME) .unwrap() - .is_none()); - let default_schema = Arc::new(MemorySchemaProvider::default()); - default_catalog - .register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema.clone()) .unwrap(); default_schema @@ -203,7 +262,6 @@ mod tests { let table = default_schema.table("numbers").unwrap(); assert!(table.is_some()); - assert!(default_schema.table("not_exists").unwrap().is_none()); } @@ -229,7 +287,7 @@ mod tests { #[test] pub fn test_register_if_absent() { - let list = MemoryCatalogList::default(); + let list = MemoryCatalogManager::default(); assert!(list .register_catalog_if_absent( "test_catalog".to_string(), @@ -241,6 +299,8 @@ mod tests { Arc::new(MemoryCatalogProvider::new()), ) .unwrap(); - list.as_any().downcast_ref::().unwrap(); + list.as_any() + .downcast_ref::() + .unwrap(); } } diff --git a/src/catalog/src/tables.rs b/src/catalog/src/tables.rs index 9ae5943b6a..2af8b6fcb4 100644 --- a/src/catalog/src/tables.rs +++ b/src/catalog/src/tables.rs @@ -312,6 +312,7 @@ fn build_schema_for_tables() -> Schema { #[cfg(test)] mod tests { + use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_query::physical_plan::RuntimeEnv; use datatypes::arrow::array::Utf8Array; use datatypes::arrow::datatypes::DataType; @@ -319,32 +320,30 @@ mod tests { use table::table::numbers::NumbersTable; use super::*; - use crate::local::memory::{ - new_memory_catalog_list, MemoryCatalogProvider, MemorySchemaProvider, - }; + use crate::local::memory::new_memory_catalog_list; use crate::CatalogList; #[tokio::test] async fn test_tables() { let catalog_list = new_memory_catalog_list().unwrap(); - let catalog_provider = Arc::new(MemoryCatalogProvider::default()); - let schema = Arc::new(MemorySchemaProvider::new()); + let schema = catalog_list + .catalog(DEFAULT_CATALOG_NAME) + .unwrap() + .unwrap() + .schema(DEFAULT_SCHEMA_NAME) + .unwrap() + .unwrap(); schema .register_table("test_table".to_string(), Arc::new(NumbersTable::default())) .unwrap(); - catalog_provider - .register_schema("test_schema".to_string(), schema) - .unwrap(); - catalog_list - .register_catalog("test_catalog".to_string(), catalog_provider) - .unwrap(); - let tables = Tables::new(catalog_list, "test_engine".to_string()); + let tables = Tables::new(catalog_list, "test_engine".to_string()); let tables_stream = tables.scan(&None, &[], None).await.unwrap(); let mut tables_stream = tables_stream .execute(0, Arc::new(RuntimeEnv::default())) .await .unwrap(); + if let Some(t) = tables_stream.next().await { let batch = t.unwrap().df_recordbatch; assert_eq!(1, batch.num_rows()); @@ -354,7 +353,7 @@ mod tests { assert_eq!(&DataType::Utf8, batch.column(2).data_type()); assert_eq!(&DataType::Utf8, batch.column(3).data_type()); assert_eq!( - "test_catalog", + "greptime", batch .column(0) .as_any() @@ -364,7 +363,7 @@ mod tests { ); assert_eq!( - "test_schema", + "public", batch .column(1) .as_any() diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs index 89d4b4dcee..9c37e4eceb 100644 --- a/src/datanode/src/instance/grpc.rs +++ b/src/datanode/src/instance/grpc.rs @@ -1,6 +1,6 @@ use std::ops::Deref; -use api::v1::codec::RegionId; +use api::v1::codec::RegionNumber; use api::v1::{ admin_expr, codec::InsertBatch, insert_expr, object_expr, select_expr, AdminExpr, AdminResult, ObjectExpr, ObjectResult, SelectExpr, @@ -209,13 +209,13 @@ impl GrpcQueryHandler for Instance { })?; // TODO(fys): _region_id is for later use. - let _region_id: Option = insert_expr + let _region_id: Option = insert_expr .options .get("region_id") .map(|id| { id.deref() .try_into() - .context(servers::error::DecodeRegionIdSnafu) + .context(servers::error::DecodeRegionNumberSnafu) }) .transpose()?; diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 63e924c18d..2e9edf4bdb 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -14,6 +14,7 @@ common-error = { path = "../common/error" } common-grpc = { path = "../common/grpc" } common-query = { path = "../common/query" } common-recordbatch = { path = "../common/recordbatch" } +common-catalog = { path = "../common/catalog" } common-runtime = { path = "../common/runtime" } common-telemetry = { path = "../common/telemetry" } common-time = { path = "../common/time" } @@ -21,11 +22,13 @@ datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" } datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" } datatypes = { path = "../datatypes" } +futures = "0.3" itertools = "0.10" openmetrics-parser = "0.4" prost = "0.11" query = { path = "../query" } serde = "1.0" +serde_json = "1.0" sqlparser = "0.15" servers = { path = "../servers" } snafu = { version = "0.7", features = ["backtraces"] } @@ -33,6 +36,7 @@ sql = { path = "../sql" } store-api = { path = "../store-api" } table = { path = "../table" } tokio = { version = "1.18", features = ["full"] } +meta-client = {path = "../meta-client"} [dependencies.arrow] package = "arrow2" diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs new file mode 100644 index 0000000000..af1b0da4d2 --- /dev/null +++ b/src/frontend/src/catalog.rs @@ -0,0 +1,270 @@ +use std::any::Any; +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use catalog::error::{ + DeserializePartitionRuleSnafu, InvalidCatalogValueSnafu, InvalidSchemaInCatalogSnafu, +}; +use catalog::remote::{Kv, KvBackendRef}; +use catalog::{ + CatalogList, CatalogProvider, CatalogProviderRef, SchemaProvider, SchemaProviderRef, +}; +use common_catalog::{CatalogKey, SchemaKey, TableGlobalKey, TableGlobalValue}; +use common_error::ext::BoxedError; +use futures::StreamExt; +use snafu::{OptionExt, ResultExt}; +use table::TableRef; +use tokio::sync::RwLock; + +use crate::error::DatanodeNotAvailableSnafu; +use crate::mock::{DatanodeId, DatanodeInstance}; +use crate::partitioning::range::RangePartitionRule; +use crate::table::DistTable; + +pub type DatanodeInstances = HashMap; + +pub struct FrontendCatalogManager { + backend: KvBackendRef, + datanode_instances: Arc>, +} + +impl FrontendCatalogManager { + #[allow(dead_code)] + pub fn new(backend: KvBackendRef, datanode_instances: Arc>) -> Self { + Self { + backend, + datanode_instances, + } + } +} + +impl CatalogList for FrontendCatalogManager { + fn as_any(&self) -> &dyn Any { + self + } + + fn register_catalog( + &self, + _name: String, + _catalog: CatalogProviderRef, + ) -> catalog::error::Result> { + unimplemented!("Frontend catalog list does not support register catalog") + } + + fn catalog_names(&self) -> catalog::error::Result> { + let backend = self.backend.clone(); + let res = std::thread::spawn(|| { + common_runtime::block_on_read(async move { + let key = common_catalog::build_catalog_prefix(); + let mut iter = backend.range(key.as_bytes()); + let mut res = HashSet::new(); + + while let Some(r) = iter.next().await { + let Kv(k, _) = r?; + let key = CatalogKey::parse(String::from_utf8_lossy(&k)) + .context(InvalidCatalogValueSnafu)?; + res.insert(key.catalog_name); + } + Ok(res.into_iter().collect()) + }) + }) + .join() + .unwrap(); + res + } + + fn catalog(&self, name: &str) -> catalog::error::Result> { + let all_catalogs = self.catalog_names()?; + if all_catalogs.contains(&name.to_string()) { + Ok(Some(Arc::new(FrontendCatalogProvider { + catalog_name: name.to_string(), + backend: self.backend.clone(), + datanode_instances: self.datanode_instances.clone(), + }))) + } else { + Ok(None) + } + } +} + +pub struct FrontendCatalogProvider { + catalog_name: String, + backend: KvBackendRef, + datanode_instances: Arc>, +} + +impl CatalogProvider for FrontendCatalogProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema_names(&self) -> catalog::error::Result> { + let backend = self.backend.clone(); + let catalog_name = self.catalog_name.clone(); + let res = std::thread::spawn(|| { + common_runtime::block_on_read(async move { + let key = common_catalog::build_schema_prefix(&catalog_name); + let mut iter = backend.range(key.as_bytes()); + let mut res = HashSet::new(); + + while let Some(r) = iter.next().await { + let Kv(k, _) = r?; + let key = SchemaKey::parse(String::from_utf8_lossy(&k)) + .context(InvalidCatalogValueSnafu)?; + res.insert(key.schema_name); + } + Ok(res.into_iter().collect()) + }) + }) + .join() + .unwrap(); + res + } + + fn register_schema( + &self, + _name: String, + _schema: SchemaProviderRef, + ) -> catalog::error::Result> { + unimplemented!("Frontend catalog provider does not support register schema") + } + + fn schema(&self, name: &str) -> catalog::error::Result> { + let all_schemas = self.schema_names()?; + if all_schemas.contains(&name.to_string()) { + Ok(Some(Arc::new(FrontendSchemaProvider { + catalog_name: self.catalog_name.clone(), + schema_name: name.to_string(), + backend: self.backend.clone(), + datanode_instances: self.datanode_instances.clone(), + }))) + } else { + Ok(None) + } + } +} + +pub struct FrontendSchemaProvider { + catalog_name: String, + schema_name: String, + backend: KvBackendRef, + datanode_instances: Arc>, +} + +impl SchemaProvider for FrontendSchemaProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn table_names(&self) -> catalog::error::Result> { + let backend = self.backend.clone(); + let catalog_name = self.catalog_name.clone(); + let schema_name = self.schema_name.clone(); + + std::thread::spawn(|| { + common_runtime::block_on_read(async move { + let key = common_catalog::build_table_global_prefix(catalog_name, schema_name); + let mut iter = backend.range(key.as_bytes()); + let mut res = HashSet::new(); + + while let Some(r) = iter.next().await { + let Kv(k, _) = r?; + let key = TableGlobalKey::parse(String::from_utf8_lossy(&k)) + .context(InvalidCatalogValueSnafu)?; + res.insert(key.table_name); + } + Ok(res.into_iter().collect()) + }) + }) + .join() + .unwrap() + } + + fn table(&self, name: &str) -> catalog::error::Result> { + let table_global_key = TableGlobalKey { + catalog_name: self.catalog_name.clone(), + schema_name: self.schema_name.clone(), + table_name: name.to_string(), + }; + + let instances = self.datanode_instances.clone(); + let backend = self.backend.clone(); + let table_name = name.to_string(); + let result: Result, catalog::error::Error> = std::thread::spawn(|| { + common_runtime::block_on_read(async move { + let mut datanode_instances = HashMap::new(); + let res = match backend.get(table_global_key.to_string().as_bytes()).await? { + None => { + return Ok(None); + } + Some(r) => r, + }; + + let mut region_to_datanode_map = HashMap::new(); + + let val = TableGlobalValue::parse(String::from_utf8_lossy(&res.1)) + .context(InvalidCatalogValueSnafu)?; + let node_id: DatanodeId = val.node_id; + + // TODO(hl): We need to deserialize string to PartitionRule trait object + let partition_rule: Arc = + Arc::new(serde_json::from_str(&val.partition_rules).context( + DeserializePartitionRuleSnafu { + data: &val.partition_rules, + }, + )?); + + for (node_id, region_ids) in val.regions_id_map { + for region_id in region_ids { + region_to_datanode_map.insert(region_id, node_id); + } + } + + datanode_instances.insert( + node_id, + instances + .read() + .await + .get(&node_id) + .context(DatanodeNotAvailableSnafu { node_id }) + .map_err(BoxedError::new) + .context(catalog::error::InternalSnafu)? + .clone(), + ); + + let table = Arc::new(DistTable { + table_name: table_name.clone(), + schema: Arc::new( + val.meta + .schema + .try_into() + .context(InvalidSchemaInCatalogSnafu)?, + ), + partition_rule, + region_dist_map: region_to_datanode_map, + datanode_instances, + }); + Ok(Some(table as _)) + }) + }) + .join() + .unwrap(); + result + } + + fn register_table( + &self, + _name: String, + _table: TableRef, + ) -> catalog::error::Result> { + unimplemented!("Frontend schema provider does not support register table") + } + + fn deregister_table(&self, _name: &str) -> catalog::error::Result> { + unimplemented!("Frontend schema provider does not support deregister table") + } + + fn table_exist(&self, name: &str) -> catalog::error::Result { + Ok(self.table_names()?.contains(&name.to_string())) + } +} diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index ff1a5bee89..08cf3c96b7 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -142,12 +142,6 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display("Failed to access catalog, source: {}", source))] - Catalog { - #[snafu(backtrace)] - source: catalog::error::Error, - }, - #[snafu(display("Table not found: {}", table_name))] TableNotFound { table_name: String, @@ -177,6 +171,21 @@ pub enum Error { source: common_runtime::JoinError, backtrace: Backtrace, }, + + #[snafu(display("Failed access catalog: {}", source))] + Catalog { + #[snafu(backtrace)] + source: catalog::error::Error, + }, + + #[snafu(display("Failed to parse catalog entry: {}", source))] + ParseCatalogEntry { + #[snafu(backtrace)] + source: common_catalog::error::Error, + }, + + #[snafu(display("Cannot find datanode by id: {}", node_id))] + DatanodeNotAvailable { node_id: u64, backtrace: Backtrace }, } pub type Result = std::result::Result; @@ -205,8 +214,6 @@ impl ErrorExt for Error { Error::RequestDatanode { source } => source.status_code(), - Error::Catalog { source } => source.status_code(), - Error::ColumnDataType { .. } | Error::FindDatanode { .. } | Error::DatanodeInstance { .. } => StatusCode::Internal, @@ -220,6 +227,9 @@ impl ErrorExt for Error { Error::ColumnNotFound { .. } => StatusCode::TableColumnNotFound, Error::JoinTask { .. } => StatusCode::Unexpected, + Error::Catalog { source, .. } => source.status_code(), + Error::ParseCatalogEntry { source, .. } => source.status_code(), + Error::DatanodeNotAvailable { .. } => StatusCode::StorageUnavailable, } } diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 5a7c36226b..8da90c52e7 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -1,5 +1,6 @@ #![feature(assert_matches)] +mod catalog; pub mod error; pub mod frontend; pub mod grpc; diff --git a/src/frontend/src/mock.rs b/src/frontend/src/mock.rs index 0b55824958..5932aa8ed3 100644 --- a/src/frontend/src/mock.rs +++ b/src/frontend/src/mock.rs @@ -21,7 +21,7 @@ use table::table::adapter::DfTableProviderAdapter; pub(crate) type DatanodeId = u64; #[derive(Clone)] -pub(crate) struct DatanodeInstance { +pub struct DatanodeInstance { pub(crate) datanode_id: DatanodeId, catalog_manager: CatalogManagerRef, db: Database, @@ -37,12 +37,12 @@ impl DatanodeInstance { #[allow(dead_code)] pub(crate) fn new( datanode_id: DatanodeId, - catalog_manager: CatalogManagerRef, + catalog_list: CatalogManagerRef, db: Database, ) -> Self { Self { datanode_id, - catalog_manager, + catalog_manager: catalog_list, db, } } diff --git a/src/frontend/src/partitioning.rs b/src/frontend/src/partitioning.rs index db0432f479..7809640256 100644 --- a/src/frontend/src/partitioning.rs +++ b/src/frontend/src/partitioning.rs @@ -6,7 +6,7 @@ use std::sync::Arc; pub use datafusion_expr::Operator; use datatypes::prelude::Value; -use store_api::storage::RegionId; +use store_api::storage::RegionNumber; pub(crate) type PartitionRuleRef = Arc>; @@ -17,9 +17,9 @@ pub trait PartitionRule: Sync + Send { // TODO(LFC): Unify `find_region` and `find_regions` methods when distributed read and write features are both merged into develop. // Or find better names since one is mainly for writes and the other is for reads. - fn find_region(&self, values: &[Value]) -> Result; + fn find_region(&self, values: &[Value]) -> Result; - fn find_regions(&self, exprs: &[PartitionExpr]) -> Result, Self::Error>; + fn find_regions(&self, exprs: &[PartitionExpr]) -> Result, Self::Error>; } /// The right bound(exclusive) of partition range. diff --git a/src/frontend/src/partitioning/columns.rs b/src/frontend/src/partitioning/columns.rs index 2ebc78b829..5305483a69 100644 --- a/src/frontend/src/partitioning/columns.rs +++ b/src/frontend/src/partitioning/columns.rs @@ -1,9 +1,10 @@ use datafusion_expr::Operator; use datatypes::value::Value; use snafu::ensure; +use store_api::storage::RegionNumber; use crate::error::{self, Error}; -use crate::partitioning::{PartitionBound, PartitionExpr, PartitionRule, RegionId}; +use crate::partitioning::{PartitionBound, PartitionExpr, PartitionRule}; /// A [RangeColumnsPartitionRule] is very similar to [RangePartitionRule] except that it allows /// partitioning by multiple columns. @@ -32,7 +33,7 @@ use crate::partitioning::{PartitionBound, PartitionExpr, PartitionRule, RegionId struct RangeColumnsPartitionRule { column_list: Vec, value_lists: Vec>, - regions: Vec, + regions: Vec, // TODO(LFC): Implement finding regions by all partitioning columns, not by the first one only. // Singled out the first partitioning column's bounds for finding regions by range. @@ -54,7 +55,7 @@ struct RangeColumnsPartitionRule { // // The following two fields are acted as caches, so we don't need to recalculate them every time. first_column_bounds: Vec, - first_column_regions: Vec>, + first_column_regions: Vec>, } impl RangeColumnsPartitionRule { @@ -65,7 +66,7 @@ impl RangeColumnsPartitionRule { fn new( column_list: Vec, value_lists: Vec>, - regions: Vec, + regions: Vec, ) -> Self { // An example range columns partition rule to calculate the first column bounds and regions: // SQL: @@ -87,7 +88,7 @@ impl RangeColumnsPartitionRule { let mut distinct_bounds = Vec::::new(); distinct_bounds.push(first_column_bounds[0].clone()); - let mut first_column_regions = Vec::>::new(); + let mut first_column_regions = Vec::>::new(); first_column_regions.push(vec![regions[0]]); for i in 1..first_column_bounds.len() { @@ -116,7 +117,7 @@ impl PartitionRule for RangeColumnsPartitionRule { self.column_list.clone() } - fn find_region(&self, values: &[Value]) -> Result { + fn find_region(&self, values: &[Value]) -> Result { ensure!( values.len() == self.column_list.len(), error::RegionKeysSizeSnafu { @@ -137,7 +138,7 @@ impl PartitionRule for RangeColumnsPartitionRule { }) } - fn find_regions(&self, exprs: &[PartitionExpr]) -> Result, Self::Error> { + fn find_regions(&self, exprs: &[PartitionExpr]) -> Result, Self::Error> { let regions = if exprs.iter().all(|x| self.column_list.contains(&x.column)) { let PartitionExpr { column: _, @@ -173,7 +174,7 @@ impl PartitionRule for RangeColumnsPartitionRule { .iter() .flatten() .cloned() - .collect::>() + .collect::>() } else { self.regions.clone() }; @@ -224,7 +225,7 @@ mod tests { vec![1, 2, 3, 4, 5, 6], ); - let test = |op: Operator, value: &str, expected_regions: Vec| { + let test = |op: Operator, value: &str, expected_regions: Vec| { let exprs = vec![ // Intentionally fix column b's partition expr to "b < 1". If we support finding // regions by both columns("a" and "b") in the future, some test cases should fail. @@ -242,7 +243,7 @@ mod tests { let regions = rule.find_regions(&exprs).unwrap(); assert_eq!( regions, - expected_regions.into_iter().collect::>() + expected_regions.into_iter().collect::>() ); }; diff --git a/src/frontend/src/partitioning/range.rs b/src/frontend/src/partitioning/range.rs index b67998a912..69119bc29c 100644 --- a/src/frontend/src/partitioning/range.rs +++ b/src/frontend/src/partitioning/range.rs @@ -1,8 +1,10 @@ use datatypes::prelude::*; +use serde::{Deserialize, Serialize}; use snafu::OptionExt; +use store_api::storage::RegionNumber; use crate::error::{self, Error}; -use crate::partitioning::{Operator, PartitionExpr, PartitionRule, RegionId}; +use crate::partitioning::{Operator, PartitionExpr, PartitionRule}; /// [RangePartitionRule] manages the distribution of partitions partitioning by some column's value /// range. It's generated from create table request, using MySQL's syntax: @@ -41,13 +43,14 @@ use crate::partitioning::{Operator, PartitionExpr, PartitionRule, RegionId}; /// // TODO(LFC): Further clarify "partition" and "region". // Could be creating an extra layer between partition and region. -pub(crate) struct RangePartitionRule { +#[derive(Debug, Serialize, Deserialize)] +pub struct RangePartitionRule { column_name: String, // Does not store the last "MAXVALUE" bound; because in this way our binary search in finding // partitions are easier (besides, it's hard to represent "MAXVALUE" in our `Value`). // Then the length of `bounds` is one less than `regions`. bounds: Vec, - regions: Vec, + regions: Vec, } impl RangePartitionRule { @@ -56,7 +59,7 @@ impl RangePartitionRule { pub(crate) fn new( column_name: impl Into, bounds: Vec, - regions: Vec, + regions: Vec, ) -> Self { Self { column_name: column_name.into(), @@ -69,7 +72,7 @@ impl RangePartitionRule { &self.column_name } - fn all_regions(&self) -> &Vec { + fn all_regions(&self) -> &Vec { &self.regions } } @@ -81,11 +84,11 @@ impl PartitionRule for RangePartitionRule { vec![self.column_name().to_string()] } - fn find_region(&self, _values: &[Value]) -> Result { + fn find_region(&self, _values: &[Value]) -> Result { unimplemented!() } - fn find_regions(&self, exprs: &[PartitionExpr]) -> Result, Self::Error> { + fn find_regions(&self, exprs: &[PartitionExpr]) -> Result, Self::Error> { if exprs.is_empty() { return Ok(self.regions.clone()); } @@ -152,18 +155,19 @@ mod test { regions: vec![1, 2, 3, 4], }; - let test = |column: &str, op: Operator, value: &str, expected_regions: Vec| { - let expr = PartitionExpr { - column: column.to_string(), - op, - value: value.into(), + let test = + |column: &str, op: Operator, value: &str, expected_regions: Vec| { + let expr = PartitionExpr { + column: column.to_string(), + op, + value: value.into(), + }; + let regions = rule.find_regions(&[expr]).unwrap(); + assert_eq!( + regions, + expected_regions.into_iter().collect::>() + ); }; - let regions = rule.find_regions(&[expr]).unwrap(); - assert_eq!( - regions, - expected_regions.into_iter().collect::>() - ); - }; test("a", Operator::NotEq, "hz", vec![1, 2, 3, 4]); test("a", Operator::NotEq, "what", vec![1, 2, 3, 4]); diff --git a/src/frontend/src/spliter.rs b/src/frontend/src/spliter.rs index b788f82422..a3771afe6a 100644 --- a/src/frontend/src/spliter.rs +++ b/src/frontend/src/spliter.rs @@ -5,7 +5,7 @@ use datatypes::vectors::VectorBuilder; use datatypes::vectors::VectorRef; use snafu::ensure; use snafu::OptionExt; -use store_api::storage::RegionId; +use store_api::storage::RegionNumber; use table::requests::InsertRequest; use crate::error::Error; @@ -15,7 +15,7 @@ use crate::error::InvalidInsertRequestSnafu; use crate::error::Result; use crate::partitioning::PartitionRuleRef; -pub type DistInsertRequest = HashMap; +pub type DistInsertRequest = HashMap; pub struct WriteSpliter { partition_rule: PartitionRuleRef, @@ -41,11 +41,11 @@ impl WriteSpliter { fn split_partitioning_values( &self, values: &[VectorRef], - ) -> Result>> { + ) -> Result>> { if values.is_empty() { return Ok(HashMap::default()); } - let mut region_map: HashMap> = HashMap::new(); + let mut region_map: HashMap> = HashMap::new(); let row_count = values[0].len(); for idx in 0..row_count { let region_id = match self @@ -113,9 +113,9 @@ fn partition_values(partition_columns: &[VectorRef], idx: usize) -> Vec { fn partition_insert_request( insert: &InsertRequest, - region_map: HashMap>, + region_map: HashMap>, ) -> DistInsertRequest { - let mut dist_insert: HashMap> = + let mut dist_insert: HashMap> = HashMap::with_capacity(region_map.len()); let column_count = insert.columns_values.len(); @@ -162,10 +162,12 @@ mod tests { value::Value, vectors::VectorBuilder, }; + use serde::{Deserialize, Serialize}; + use store_api::storage::RegionNumber; use table::requests::InsertRequest; use super::{ - check_req, find_partitioning_values, partition_insert_request, partition_values, RegionId, + check_req, find_partitioning_values, partition_insert_request, partition_values, WriteSpliter, }; use crate::{ @@ -242,13 +244,13 @@ mod tests { #[test] fn test_partition_insert_request() { let insert = mock_insert_request(); - let mut region_map: HashMap> = HashMap::with_capacity(2); + let mut region_map: HashMap> = HashMap::with_capacity(2); region_map.insert(1, vec![2, 0]); region_map.insert(2, vec![1]); let dist_insert = partition_insert_request(&insert, region_map); - let r1_insert = dist_insert.get(&1_u64).unwrap(); + let r1_insert = dist_insert.get(&1_u32).unwrap(); assert_eq!("demo", r1_insert.table_name); let expected: Value = 3_i16.into(); assert_eq!(expected, r1_insert.columns_values.get("id").unwrap().get(0)); @@ -283,7 +285,7 @@ mod tests { .get(1) ); - let r2_insert = dist_insert.get(&2_u64).unwrap(); + let r2_insert = dist_insert.get(&2_u32).unwrap(); assert_eq!("demo", r2_insert.table_name); let expected: Value = 2_i16.into(); assert_eq!(expected, r2_insert.columns_values.get("id").unwrap().get(0)); @@ -401,6 +403,7 @@ mod tests { } } + #[derive(Debug, Serialize, Deserialize)] struct MockPartitionRule; // PARTITION BY LIST COLUMNS(id) ( @@ -414,7 +417,7 @@ mod tests { vec!["id".to_string()] } - fn find_region(&self, values: &[Value]) -> Result { + fn find_region(&self, values: &[Value]) -> Result { let val = values.get(0).unwrap().to_owned(); let id_1: Value = 1_i16.into(); let id_2: Value = 2_i16.into(); @@ -428,7 +431,7 @@ mod tests { unreachable!() } - fn find_regions(&self, _: &[PartitionExpr]) -> Result, Self::Error> { + fn find_regions(&self, _: &[PartitionExpr]) -> Result, Error> { unimplemented!() } } diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index ba35893b7f..cc759b05a9 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -14,7 +14,7 @@ use datafusion::logical_plan::Expr as DfExpr; use datafusion::physical_plan::Partitioning; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use snafu::prelude::*; -use store_api::storage::RegionId; +use store_api::storage::RegionNumber; use table::error::Error as TableError; use table::metadata::{FilterPushDownType, TableInfoRef}; use table::requests::InsertRequest; @@ -26,12 +26,12 @@ use crate::mock::{DatanodeId, DatanodeInstance, TableScanPlan}; use crate::partitioning::{Operator, PartitionExpr, PartitionRuleRef}; use crate::spliter::WriteSpliter; -struct DistTable { - table_name: String, - schema: SchemaRef, - partition_rule: PartitionRuleRef, - region_dist_map: HashMap, - datanode_instances: HashMap, +pub struct DistTable { + pub table_name: String, + pub schema: SchemaRef, + pub partition_rule: PartitionRuleRef, + pub region_dist_map: HashMap, + pub datanode_instances: HashMap, } #[async_trait] @@ -104,7 +104,7 @@ impl Table for DistTable { impl DistTable { // TODO(LFC): Finding regions now seems less efficient, should be further looked into. - fn find_regions(&self, filters: &[Expr]) -> Result> { + fn find_regions(&self, filters: &[Expr]) -> Result> { let regions = if let Some((first, rest)) = filters.split_first() { let mut target = self.find_regions0(first)?; for filter in rest { @@ -119,7 +119,7 @@ impl DistTable { break; } } - target.into_iter().collect::>() + target.into_iter().collect::>() } else { self.partition_rule.find_regions(&[])? }; @@ -136,7 +136,7 @@ impl DistTable { // - BETWEEN and IN (maybe more) // - expr with arithmetic like "a + 1 < 10" (should have been optimized in logic plan?) // - not comparison or neither "AND" nor "OR" operations, for example, "a LIKE x" - fn find_regions0(&self, filter: &Expr) -> Result> { + fn find_regions0(&self, filter: &Expr) -> Result> { let expr = filter.df_expr(); match expr { DfExpr::BinaryExpr { left, op, right } if is_compare_op(op) => { @@ -156,7 +156,7 @@ impl DistTable { .partition_rule .find_regions(&[PartitionExpr::new(column, op, value)])? .into_iter() - .collect::>()); + .collect::>()); } } DfExpr::BinaryExpr { left, op, right } @@ -168,11 +168,11 @@ impl DistTable { Operator::And => left_regions .intersection(&right_regions) .cloned() - .collect::>(), + .collect::>(), Operator::Or => left_regions .union(&right_regions) .cloned() - .collect::>(), + .collect::>(), _ => unreachable!(), }; return Ok(regions); @@ -185,10 +185,13 @@ impl DistTable { .partition_rule .find_regions(&[])? .into_iter() - .collect::>()) + .collect::>()) } - fn find_datanodes(&self, regions: Vec) -> Result>> { + fn find_datanodes( + &self, + regions: Vec, + ) -> Result>> { let mut datanodes = HashMap::new(); for region in regions.iter() { let datanode = *self @@ -344,6 +347,7 @@ mod test { #[tokio::test(flavor = "multi_thread")] async fn test_dist_table_scan() { + common_telemetry::init_default_ut_logging(); let table = Arc::new(new_dist_table().await); // should scan all regions @@ -434,7 +438,7 @@ mod test { let partition_rule = RangePartitionRule::new( "a", vec![10_i32.into(), 20_i32.into(), 50_i32.into()], - vec![1_u64, 2, 3, 4], + vec![1_u32, 2, 3, 4], ); let table1 = new_memtable(schema.clone(), (0..5).collect::>()); @@ -458,7 +462,7 @@ mod test { table_name: "dist_numbers".to_string(), schema, partition_rule: Arc::new(partition_rule), - region_dist_map: HashMap::from([(1_u64, 1), (2_u64, 2), (3_u64, 3), (4_u64, 4)]), + region_dist_map: HashMap::from([(1_u32, 1), (2_u32, 2), (3_u32, 3), (4_u32, 4)]), datanode_instances, } } @@ -491,20 +495,21 @@ mod test { let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap()); instance.start().await.unwrap(); - let catalog_manager = instance.catalog_manager().clone(); + let client = crate::tests::create_datanode_client(instance).await; + + let table_name = table.table_name().to_string(); catalog_manager .register_table(RegisterTableRequest { catalog: "greptime".to_string(), schema: "public".to_string(), - table_name: table.table_name().to_string(), + table_name: table_name.clone(), table_id: 1234, table: Arc::new(table), }) .await .unwrap(); - let client = crate::tests::create_datanode_client(instance).await; DatanodeInstance::new( datanode_id, catalog_manager, @@ -516,7 +521,7 @@ mod test { async fn test_find_regions() { let table = new_dist_table().await; - let test = |filters: Vec, expect_regions: Vec| { + let test = |filters: Vec, expect_regions: Vec| { let mut regions = table.find_regions(filters.as_slice()).unwrap(); regions.sort(); diff --git a/src/frontend/src/table/insert.rs b/src/frontend/src/table/insert.rs index fea776dbe7..01fd2a1852 100644 --- a/src/frontend/src/table/insert.rs +++ b/src/frontend/src/table/insert.rs @@ -11,7 +11,7 @@ use client::ObjectResult; use snafu::ensure; use snafu::OptionExt; use snafu::ResultExt; -use store_api::storage::RegionId; +use store_api::storage::RegionNumber; use table::requests::InsertRequest; use super::DistTable; @@ -21,7 +21,7 @@ use crate::error::Result; impl DistTable { pub async fn dist_insert( &self, - inserts: HashMap, + inserts: HashMap, ) -> Result { let mut joins = Vec::with_capacity(inserts.len()); @@ -66,7 +66,7 @@ impl DistTable { } } -fn to_insert_expr(region_id: RegionId, insert: InsertRequest) -> Result { +fn to_insert_expr(region_id: RegionNumber, insert: InsertRequest) -> Result { let mut row_count = None; let columns = insert @@ -109,7 +109,7 @@ fn to_insert_expr(region_id: RegionId, insert: InsertRequest) -> Result Arc { let schema_provider = Arc::new(MemorySchemaProvider::new()); let catalog_provider = Arc::new(MemoryCatalogProvider::new()); - let catalog_list = Arc::new(MemoryCatalogList::default()); + let catalog_list = Arc::new(MemoryCatalogManager::default()); let mut column_schemas = vec![]; let mut columns = vec![]; diff --git a/src/query/tests/my_sum_udaf_example.rs b/src/query/tests/my_sum_udaf_example.rs index 40170b5d89..b28d8e9ddf 100644 --- a/src/query/tests/my_sum_udaf_example.rs +++ b/src/query/tests/my_sum_udaf_example.rs @@ -2,7 +2,7 @@ use std::fmt::Debug; use std::marker::PhantomData; use std::sync::Arc; -use catalog::local::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider}; +use catalog::local::{MemoryCatalogManager, MemoryCatalogProvider, MemorySchemaProvider}; use catalog::{CatalogList, CatalogProvider, SchemaProvider}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_function::scalars::aggregate::AggregateFunctionMeta; @@ -242,7 +242,7 @@ fn new_query_engine_factory(table: MemTable) -> QueryEngineFactory { let schema_provider = Arc::new(MemorySchemaProvider::new()); let catalog_provider = Arc::new(MemoryCatalogProvider::new()); - let catalog_list = Arc::new(MemoryCatalogList::default()); + let catalog_list = Arc::new(MemoryCatalogManager::default()); schema_provider.register_table(table_name, table).unwrap(); catalog_provider diff --git a/src/query/tests/percentile_test.rs b/src/query/tests/percentile_test.rs index 3737f5d3df..e809554b08 100644 --- a/src/query/tests/percentile_test.rs +++ b/src/query/tests/percentile_test.rs @@ -1,6 +1,6 @@ use std::sync::Arc; mod function; -use catalog::local::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider}; +use catalog::local::{MemoryCatalogManager, MemoryCatalogProvider, MemorySchemaProvider}; use catalog::{CatalogList, CatalogProvider, SchemaProvider}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_query::Output; @@ -114,7 +114,7 @@ fn create_correctness_engine() -> Arc { // create engine let schema_provider = Arc::new(MemorySchemaProvider::new()); let catalog_provider = Arc::new(MemoryCatalogProvider::new()); - let catalog_list = Arc::new(MemoryCatalogList::default()); + let catalog_list = Arc::new(MemoryCatalogManager::default()); let mut column_schemas = vec![]; let mut columns = vec![]; diff --git a/src/query/tests/query_engine_test.rs b/src/query/tests/query_engine_test.rs index b763ab7485..fde9f951c1 100644 --- a/src/query/tests/query_engine_test.rs +++ b/src/query/tests/query_engine_test.rs @@ -3,7 +3,7 @@ mod pow; use std::sync::Arc; use arrow::array::UInt32Array; -use catalog::local::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider}; +use catalog::local::{MemoryCatalogManager, MemoryCatalogProvider, MemorySchemaProvider}; use catalog::{CatalogList, CatalogProvider, SchemaProvider}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_query::prelude::{create_udf, make_scalar_function, Volatility}; @@ -150,7 +150,7 @@ async fn test_udf() -> Result<()> { fn create_query_engine() -> Arc { let schema_provider = Arc::new(MemorySchemaProvider::new()); let catalog_provider = Arc::new(MemoryCatalogProvider::new()); - let catalog_list = Arc::new(MemoryCatalogList::default()); + let catalog_list = Arc::new(MemoryCatalogManager::default()); // create table with primitives, and all columns' length are even let mut column_schemas = vec![]; diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 39f46de3d1..8963c6ae7c 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -154,7 +154,7 @@ pub enum Error { InvalidPromRemoteReadQueryResult { msg: String, backtrace: Backtrace }, #[snafu(display("Failed to decode region id, source: {}", source))] - DecodeRegionId { source: api::DecodeError }, + DecodeRegionNumber { source: api::DecodeError }, #[snafu(display("Failed to build gRPC reflection service, source: {}", source))] GrpcReflectionService { @@ -196,7 +196,7 @@ impl ErrorExt for Error { | DecodePromRemoteRequest { .. } | DecompressPromRemoteRequest { .. } | InvalidPromRemoteRequest { .. } - | DecodeRegionId { .. } + | DecodeRegionNumber { .. } | TimePrecision { .. } => StatusCode::InvalidArguments, InfluxdbLinesWrite { source, .. } => source.status_code(), diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs index 25726a773a..065f7dec7d 100644 --- a/src/servers/tests/mod.rs +++ b/src/servers/tests/mod.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::sync::{Arc, RwLock}; use async_trait::async_trait; -use catalog::local::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider}; +use catalog::local::{MemoryCatalogManager, MemoryCatalogProvider, MemorySchemaProvider}; use catalog::{CatalogList, CatalogProvider, SchemaProvider}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_query::Output; @@ -70,7 +70,7 @@ fn create_testing_sql_query_handler(table: MemTable) -> SqlQueryHandlerRef { let schema_provider = Arc::new(MemorySchemaProvider::new()); let catalog_provider = Arc::new(MemoryCatalogProvider::new()); - let catalog_list = Arc::new(MemoryCatalogList::default()); + let catalog_list = Arc::new(MemoryCatalogManager::default()); schema_provider.register_table(table_name, table).unwrap(); catalog_provider .register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider) diff --git a/src/store-api/src/storage/descriptors.rs b/src/store-api/src/storage/descriptors.rs index b10b46dbd3..cb5dd5075d 100644 --- a/src/store-api/src/storage/descriptors.rs +++ b/src/store-api/src/storage/descriptors.rs @@ -10,6 +10,8 @@ pub type ColumnFamilyId = u32; /// Id of the region. pub type RegionId = u64; +pub type RegionNumber = u32; + /// A [ColumnDescriptor] contains information to create a column. #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Builder)] #[builder(pattern = "owned", build_fn(validate = "Self::validate"))]