diff --git a/Cargo.lock b/Cargo.lock index ade6eba31d..f5c2bda0d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -460,6 +460,20 @@ dependencies = [ "tower-service", ] +[[package]] +name = "backoff" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1" +dependencies = [ + "futures-core", + "getrandom", + "instant", + "pin-project-lite", + "rand 0.8.5", + "tokio", +] + [[package]] name = "backon" version = "0.1.0" @@ -708,6 +722,7 @@ dependencies = [ "arc-swap", "async-stream", "async-trait", + "backoff", "chrono", "common-catalog", "common-error", diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index 33248231d8..ef7eeb46c8 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -9,6 +9,7 @@ api = { path = "../api" } arc-swap = "1.0" async-stream = "0.3" async-trait = "0.1" +backoff = { version = "0.4", features = ["tokio"] } common-catalog = { path = "../common/catalog" } common-error = { path = "../common/error" } common-grpc = { path = "../common/grpc" } diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index 99e81a5767..d4ad62fd21 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -149,6 +149,12 @@ pub enum Error { #[snafu(backtrace)] source: meta_client::error::Error, }, + + #[snafu(display("Failed to bump table id"))] + BumpTableId { msg: String, backtrace: Backtrace }, + + #[snafu(display("Failed to parse table id from metasrv, data: {:?}", data))] + ParseTableId { data: String, backtrace: Backtrace }, } pub type Result = std::result::Result; @@ -185,6 +191,9 @@ impl ErrorExt for Error { Error::SystemCatalogTableScan { source } => source.status_code(), Error::SystemCatalogTableScanExec { source } => source.status_code(), Error::InvalidTableSchema { source, .. } => source.status_code(), + Error::BumpTableId { .. } | Error::ParseTableId { .. } => { + StatusCode::StorageUnavailable + } } } diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 2c6cbb254e..36544361e2 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -67,32 +67,27 @@ pub type CatalogProviderRef = Arc; #[async_trait::async_trait] pub trait CatalogManager: CatalogList { /// Starts a catalog manager. - async fn start(&self) -> error::Result<()>; + async fn start(&self) -> Result<()>; /// Returns next available table id. - fn next_table_id(&self) -> TableId; + async fn next_table_id(&self) -> Result; /// Registers a table given given catalog/schema to catalog manager, /// returns table registered. - async fn register_table(&self, request: RegisterTableRequest) -> error::Result; + async fn register_table(&self, request: RegisterTableRequest) -> Result; /// Register a system table, should be called before starting the manager. async fn register_system_table(&self, request: RegisterSystemTableRequest) -> error::Result<()>; /// Returns the table by catalog, schema and table name. - fn table( - &self, - catalog: &str, - schema: &str, - table_name: &str, - ) -> error::Result>; + fn table(&self, catalog: &str, schema: &str, table_name: &str) -> Result>; } pub type CatalogManagerRef = Arc; /// Hook called after system table opening. -pub type OpenSystemTableHook = Arc error::Result<()> + Send + Sync>; +pub type OpenSystemTableHook = Arc Result<()> + Send + Sync>; /// Register system table request: /// - When system table is already created and registered, the hook will be called diff --git a/src/catalog/src/local/manager.rs b/src/catalog/src/local/manager.rs index f35f7b726c..e678e9e8dd 100644 --- a/src/catalog/src/local/manager.rs +++ b/src/catalog/src/local/manager.rs @@ -272,8 +272,8 @@ impl CatalogManager for LocalCatalogManager { } #[inline] - fn next_table_id(&self) -> TableId { - self.next_table_id.fetch_add(1, Ordering::Relaxed) + async fn next_table_id(&self) -> Result { + Ok(self.next_table_id.fetch_add(1, Ordering::Relaxed)) } async fn register_table(&self, request: RegisterTableRequest) -> Result { diff --git a/src/catalog/src/remote.rs b/src/catalog/src/remote.rs index d0f343932a..c2d525ff12 100644 --- a/src/catalog/src/remote.rs +++ b/src/catalog/src/remote.rs @@ -19,13 +19,27 @@ pub type ValueIter<'a, E> = Pin> + Send + 'a #[async_trait::async_trait] pub trait KvBackend: Send + Sync { - fn range<'a, 'b>(&'a self, key: &[u8]) -> ValueIter<'b, crate::error::Error> + fn range<'a, 'b>(&'a self, key: &[u8]) -> ValueIter<'b, Error> where 'a: 'b; - async fn set(&self, key: &[u8], val: &[u8]) -> Result<(), crate::error::Error>; + async fn set(&self, key: &[u8], val: &[u8]) -> Result<(), Error>; - async fn delete_range(&self, key: &[u8], end: &[u8]) -> Result<(), crate::error::Error>; + /// Compare and set value of key. `expect` is the expected value, if backend's current value associated + /// with key is the same as `expect`, the value will be updated to `val`. + /// + /// - If the compare-and-set operation successfully updated value, this method will return an `Ok(Ok())` + /// - If associated value is not the same as `expect`, no value will be updated and an `Ok(Err(Vec))` + /// will be returned, the `Err(Vec)` indicates the current associated value of key. + /// - If any error happens during operation, an `Err(Error)` will be returned. + async fn compare_and_set( + &self, + key: &[u8], + expect: &[u8], + val: &[u8], + ) -> Result>>, Error>; + + async fn delete_range(&self, key: &[u8], end: &[u8]) -> Result<(), Error>; async fn delete(&self, key: &[u8]) -> Result<(), Error> { self.delete_range(key, &[]).await @@ -74,6 +88,15 @@ mod tests { unimplemented!() } + async fn compare_and_set( + &self, + _key: &[u8], + _expect: &[u8], + _val: &[u8], + ) -> Result>>, Error> { + unimplemented!() + } + async fn delete_range(&self, _key: &[u8], _end: &[u8]) -> Result<(), Error> { unimplemented!() } diff --git a/src/catalog/src/remote/client.rs b/src/catalog/src/remote/client.rs index a9df193a00..4099ccf03e 100644 --- a/src/catalog/src/remote/client.rs +++ b/src/catalog/src/remote/client.rs @@ -3,7 +3,7 @@ use std::fmt::Debug; use async_stream::stream; use common_telemetry::info; use meta_client::client::MetaClient; -use meta_client::rpc::{DeleteRangeRequest, PutRequest, RangeRequest}; +use meta_client::rpc::{CompareAndPutRequest, DeleteRangeRequest, PutRequest, RangeRequest}; use snafu::ResultExt; use crate::error::{Error, MetaSrvSnafu}; @@ -68,4 +68,26 @@ impl KvBackend for MetaKvBackend { Ok(()) } + + async fn compare_and_set( + &self, + key: &[u8], + expect: &[u8], + val: &[u8], + ) -> Result>>, Error> { + let request = CompareAndPutRequest::new() + .with_key(key.to_vec()) + .with_expect(expect.to_vec()) + .with_value(val.to_vec()); + let mut response = self + .client + .compare_and_put(request) + .await + .context(MetaSrvSnafu)?; + if response.is_success() { + Ok(Ok(())) + } else { + Ok(Err(response.take_prev_kv().map(|v| v.value().to_vec()))) + } + } } diff --git a/src/catalog/src/remote/manager.rs b/src/catalog/src/remote/manager.rs index b89f427b05..63af7cec4d 100644 --- a/src/catalog/src/remote/manager.rs +++ b/src/catalog/src/remote/manager.rs @@ -1,30 +1,32 @@ use std::any::Any; use std::collections::HashMap; use std::pin::Pin; -use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; +use std::time::Duration; use arc_swap::ArcSwap; use async_stream::stream; +use backoff::exponential::ExponentialBackoffBuilder; +use backoff::ExponentialBackoff; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID}; use common_catalog::{ - build_catalog_prefix, build_schema_prefix, build_table_prefix, CatalogKey, CatalogValue, - SchemaKey, SchemaValue, TableKey, TableValue, + build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, CatalogValue, + SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue, TableRegionalKey, TableRegionalValue, }; -use common_telemetry::{debug, info}; +use common_telemetry::{debug, error, info}; use futures::Stream; use futures_util::StreamExt; use snafu::{OptionExt, ResultExt}; use table::engine::{EngineContext, TableEngineRef}; -use table::metadata::{TableId, TableVersion}; +use table::metadata::TableId; use table::requests::{CreateTableRequest, OpenTableRequest}; use table::table::numbers::NumbersTable; use table::TableRef; use tokio::sync::Mutex; use crate::error::{ - CatalogNotFoundSnafu, CreateTableSnafu, InvalidCatalogValueSnafu, OpenTableSnafu, - SchemaNotFoundSnafu, TableExistsSnafu, + BumpTableIdSnafu, CatalogNotFoundSnafu, CreateTableSnafu, InvalidCatalogValueSnafu, + OpenTableSnafu, ParseTableIdSnafu, SchemaNotFoundSnafu, TableExistsSnafu, }; use crate::error::{InvalidTableSchemaSnafu, Result}; use crate::remote::{Kv, KvBackendRef}; @@ -38,7 +40,6 @@ pub struct RemoteCatalogManager { node_id: u64, backend: KvBackendRef, catalogs: Arc>>, - next_table_id: Arc, engine: TableEngineRef, system_table_requests: Mutex>, mutex: Arc>, @@ -51,7 +52,6 @@ impl RemoteCatalogManager { node_id, backend, catalogs: Default::default(), - next_table_id: Default::default(), system_table_requests: Default::default(), mutex: Default::default(), } @@ -60,14 +60,12 @@ impl RemoteCatalogManager { fn build_catalog_key(&self, catalog_name: impl AsRef) -> CatalogKey { CatalogKey { catalog_name: catalog_name.as_ref().to_string(), - node_id: self.node_id, } } fn new_catalog_provider(&self, catalog_name: &str) -> CatalogProviderRef { Arc::new(RemoteCatalogProvider { catalog_name: catalog_name.to_string(), - node_id: self.node_id, backend: self.backend.clone(), schemas: Default::default(), mutex: Default::default(), @@ -100,9 +98,7 @@ impl RemoteCatalogManager { } let key = CatalogKey::parse(&String::from_utf8_lossy(&k)) .context(InvalidCatalogValueSnafu)?; - if key.node_id == self.node_id { - yield Ok(key) - } + yield Ok(key) } })) } @@ -124,10 +120,7 @@ impl RemoteCatalogManager { let schema_key = SchemaKey::parse(&String::from_utf8_lossy(&k)) .context(InvalidCatalogValueSnafu)?; - - if schema_key.node_id == self.node_id { - yield Ok(schema_key) - } + yield Ok(schema_key) } })) } @@ -139,8 +132,8 @@ impl RemoteCatalogManager { &self, catalog_name: &str, schema_name: &str, - ) -> Pin> + Send + '_>> { - let table_prefix = build_table_prefix(catalog_name, schema_name); + ) -> Pin> + Send + '_>> { + let table_prefix = build_table_global_prefix(catalog_name, schema_name); let mut tables = self.backend.range(table_prefix.as_bytes()); Box::pin(stream!({ while let Some(r) = tables.next().await { @@ -149,12 +142,22 @@ impl RemoteCatalogManager { debug!("Ignoring non-table prefix: {}", String::from_utf8_lossy(&k)); continue; } - let table_key = TableKey::parse(&String::from_utf8_lossy(&k)) + let table_key = TableGlobalKey::parse(&String::from_utf8_lossy(&k)) .context(InvalidCatalogValueSnafu)?; - let table_value = TableValue::parse(&String::from_utf8_lossy(&v)) + let table_value = TableGlobalValue::parse(&String::from_utf8_lossy(&v)) .context(InvalidCatalogValueSnafu)?; - if table_value.node_id == self.node_id { + debug!( + "Found catalog table entry, key: {}, value: {:?}", + table_key, table_value + ); + // metasrv has allocated region ids to current datanode + if table_value + .regions_id_map + .get(&self.node_id) + .map(|v| !v.is_empty()) + .unwrap_or(false) + { yield Ok((table_key, table_value)) } } @@ -250,14 +253,13 @@ impl RemoteCatalogManager { let schema_key = SchemaKey { schema_name: DEFAULT_SCHEMA_NAME.to_string(), catalog_name: DEFAULT_CATALOG_NAME.to_string(), - node_id: self.node_id, } .to_string(); self.backend .set( schema_key.as_bytes(), &SchemaValue {} - .to_bytes() + .as_bytes() .context(InvalidCatalogValueSnafu)?, ) .await?; @@ -265,14 +267,13 @@ impl RemoteCatalogManager { let catalog_key = CatalogKey { catalog_name: DEFAULT_CATALOG_NAME.to_string(), - node_id: self.node_id, } .to_string(); self.backend .set( catalog_key.as_bytes(), &CatalogValue {} - .to_bytes() + .as_bytes() .context(InvalidCatalogValueSnafu)?, ) .await?; @@ -282,18 +283,23 @@ impl RemoteCatalogManager { async fn open_or_create_table( &self, - table_key: &TableKey, - table_value: &TableValue, + table_key: &TableGlobalKey, + table_value: &TableGlobalValue, ) -> Result { let context = EngineContext {}; - let TableKey { + let TableGlobalKey { catalog_name, schema_name, table_name, .. } = table_key; - let TableValue { id, meta, .. } = table_value; + let TableGlobalValue { + id, + meta, + regions_id_map, + .. + } = table_value; let request = OpenTableRequest { catalog_name: catalog_name.clone(), @@ -325,7 +331,7 @@ impl RemoteCatalogManager { table_name: table_name.clone(), desc: None, schema: Arc::new(schema), - region_numbers: meta.region_numbers.clone(), + region_numbers: regions_id_map.get(&self.node_id).unwrap().clone(), // this unwrap is safe because region_id_map is checked in `iter_remote_tables` primary_key_indices: meta.primary_key_indices.clone(), create_if_not_exists: true, table_options: meta.options.clone(), @@ -354,8 +360,6 @@ impl CatalogManager for RemoteCatalogManager { catalogs.keys().cloned().collect::>() ); self.catalogs.store(Arc::new(catalogs)); - self.next_table_id - .store(max_table_id + 1, Ordering::Relaxed); info!("Max table id allocated: {}", max_table_id); let mut system_table_requests = self.system_table_requests.lock().await; @@ -373,8 +377,61 @@ impl CatalogManager for RemoteCatalogManager { Ok(()) } - fn next_table_id(&self) -> TableId { - self.next_table_id.fetch_add(1, Ordering::Relaxed) + /// Bump table id in a CAS manner with backoff. + async fn next_table_id(&self) -> Result { + let key = common_catalog::consts::TABLE_ID_KEY_PREFIX.as_bytes(); + let op = || async { + // TODO(hl): optimize this get + let (prev, prev_bytes) = match self.backend.get(key).await? { + None => (MIN_USER_TABLE_ID, vec![]), + Some(kv) => (parse_table_id(&kv.1)?, kv.1), + }; + + match self + .backend + .compare_and_set(key, &prev_bytes, &(prev + 1).to_le_bytes()) + .await + { + Ok(cas_res) => match cas_res { + Ok(_) => Ok(prev), + Err(e) => { + info!("Table id {:?} already occupied", e); + Err(backoff::Error::transient( + BumpTableIdSnafu { + msg: "Table id occupied", + } + .build(), + )) + } + }, + Err(e) => { + error!(e;"Failed to CAS table id"); + Err(backoff::Error::permanent( + BumpTableIdSnafu { + msg: format!("Failed to perform CAS operation: {:?}", e), + } + .build(), + )) + } + } + }; + + let retry_policy: ExponentialBackoff = ExponentialBackoffBuilder::new() + .with_initial_interval(Duration::from_millis(4)) + .with_multiplier(2.0) + .with_max_interval(Duration::from_millis(1000)) + .with_max_elapsed_time(Some(Duration::from_millis(3000))) + .build(); + + backoff::future::retry(retry_policy, op).await.map_err(|e| { + BumpTableIdSnafu { + msg: format!( + "Bump table id exceeds max fail times, last error msg: {:?}", + e + ), + } + .build() + }) } async fn register_table(&self, request: RegisterTableRequest) -> Result { @@ -445,7 +502,7 @@ impl CatalogList for RemoteCatalogManager { .set( key.as_bytes(), &CatalogValue {} - .to_bytes() + .as_bytes() .context(InvalidCatalogValueSnafu)?, ) .await?; @@ -474,17 +531,15 @@ impl CatalogList for RemoteCatalogManager { pub struct RemoteCatalogProvider { catalog_name: String, - node_id: u64, backend: KvBackendRef, schemas: Arc>>, mutex: Arc>, } impl RemoteCatalogProvider { - pub fn new(catalog_name: String, node_id: u64, backend: KvBackendRef) -> Self { + pub fn new(catalog_name: String, backend: KvBackendRef) -> Self { Self { catalog_name, - node_id, backend, schemas: Default::default(), mutex: Default::default(), @@ -495,7 +550,6 @@ impl RemoteCatalogProvider { SchemaKey { catalog_name: self.catalog_name.clone(), schema_name: schema_name.as_ref().to_string(), - node_id: self.node_id, } } } @@ -526,7 +580,7 @@ impl CatalogProvider for RemoteCatalogProvider { .set( key.as_bytes(), &SchemaValue {} - .to_bytes() + .as_bytes() .context(InvalidCatalogValueSnafu)?, ) .await?; @@ -548,6 +602,16 @@ impl CatalogProvider for RemoteCatalogProvider { } } +/// Parse u8 slice to `TableId` +fn parse_table_id(val: &[u8]) -> Result { + Ok(TableId::from_le_bytes(val.try_into().map_err(|_| { + ParseTableIdSnafu { + data: format!("{:?}", val), + } + .build() + })?)) +} + pub struct RemoteSchemaProvider { catalog_name: String, schema_name: String, @@ -574,16 +638,11 @@ impl RemoteSchemaProvider { } } - fn build_table_key( - &self, - table_name: impl AsRef, - table_version: TableVersion, - ) -> TableKey { - TableKey { + fn build_regional_table_key(&self, table_name: impl AsRef) -> TableRegionalKey { + TableRegionalKey { catalog_name: self.catalog_name.clone(), schema_name: self.schema_name.clone(), table_name: table_name.as_ref().to_string(), - version: table_version, node_id: self.node_id, } } @@ -605,19 +664,14 @@ impl SchemaProvider for RemoteSchemaProvider { fn register_table(&self, name: String, table: TableRef) -> Result> { let table_info = table.table_info(); let table_version = table_info.ident.version; - let table_value = TableValue { - meta: table_info.meta.clone().into(), - id: table_info.ident.table_id, - node_id: self.node_id, + let table_value = TableRegionalValue { + version: table_version, regions_ids: table.table_info().meta.region_numbers.clone(), }; let backend = self.backend.clone(); let mutex = self.mutex.clone(); let tables = self.tables.clone(); - - let table_key = self - .build_table_key(name.clone(), table_version) - .to_string(); + let table_key = self.build_regional_table_key(&name).to_string(); let prev = std::thread::spawn(move || { common_runtime::block_on_read(async move { @@ -647,18 +701,11 @@ impl SchemaProvider for RemoteSchemaProvider { } fn deregister_table(&self, name: &str) -> Result> { - let table_version = match self.tables.load().get(name) { - None => return Ok(None), - Some(t) => t.table_info().ident.version, - }; - let table_name = name.to_string(); - let table_key = self.build_table_key(&table_name, table_version).to_string(); - + let table_key = self.build_regional_table_key(&table_name).to_string(); let backend = self.backend.clone(); let mutex = self.mutex.clone(); let tables = self.tables.clone(); - let prev = std::thread::spawn(move || { common_runtime::block_on_read(async move { let _guard = mutex.lock().await; @@ -686,3 +733,17 @@ impl SchemaProvider for RemoteSchemaProvider { Ok(self.tables.load().contains_key(name)) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_table_id() { + assert_eq!(12, parse_table_id(&12_i32.to_le_bytes()).unwrap()); + let mut data = vec![]; + data.extend_from_slice(&12_i32.to_le_bytes()); + data.push(0); + assert!(parse_table_id(&data).is_err()); + } +} diff --git a/src/catalog/tests/mock.rs b/src/catalog/tests/mock.rs index e19a5c914c..f2aad50619 100644 --- a/src/catalog/tests/mock.rs +++ b/src/catalog/tests/mock.rs @@ -1,3 +1,4 @@ +use std::collections::btree_map::Entry; use std::collections::{BTreeMap, HashMap}; use std::fmt::{Display, Formatter}; use std::str::FromStr; @@ -68,6 +69,34 @@ impl KvBackend for MockKvBackend { Ok(()) } + async fn compare_and_set( + &self, + key: &[u8], + expect: &[u8], + val: &[u8], + ) -> Result>>, Error> { + let mut map = self.map.write().await; + let existing = map.entry(key.to_vec()); + match existing { + Entry::Vacant(e) => { + if expect.is_empty() { + e.insert(val.to_vec()); + Ok(Ok(())) + } else { + Ok(Err(None)) + } + } + Entry::Occupied(mut existing) => { + if existing.get() == expect { + existing.insert(val.to_vec()); + Ok(Ok(())) + } else { + Ok(Err(Some(existing.get().clone()))) + } + } + } + } + async fn delete_range(&self, key: &[u8], end: &[u8]) -> Result<(), Error> { let start = key.to_vec(); let end = end.to_vec(); diff --git a/src/catalog/tests/remote_catalog_tests.rs b/src/catalog/tests/remote_catalog_tests.rs index b54d15275c..f29740ef28 100644 --- a/src/catalog/tests/remote_catalog_tests.rs +++ b/src/catalog/tests/remote_catalog_tests.rs @@ -12,7 +12,7 @@ mod tests { KvBackend, KvBackendRef, RemoteCatalogManager, RemoteCatalogProvider, RemoteSchemaProvider, }; use catalog::{CatalogManager, CatalogManagerRef, RegisterTableRequest}; - use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; + use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID}; use common_catalog::{CatalogKey, CatalogValue, SchemaKey, SchemaValue}; use datatypes::schema::Schema; use futures_util::StreamExt; @@ -24,19 +24,17 @@ mod tests { #[tokio::test] async fn test_backend() { common_telemetry::init_default_ut_logging(); - let node_id = 42; let backend = MockKvBackend::default(); let default_catalog_key = CatalogKey { catalog_name: DEFAULT_CATALOG_NAME.to_string(), - node_id, } .to_string(); backend .set( default_catalog_key.as_bytes(), - &CatalogValue {}.to_bytes().unwrap(), + &CatalogValue {}.as_bytes().unwrap(), ) .await .unwrap(); @@ -44,11 +42,10 @@ mod tests { let schema_key = SchemaKey { catalog_name: DEFAULT_CATALOG_NAME.to_string(), schema_name: DEFAULT_SCHEMA_NAME.to_string(), - node_id, } .to_string(); backend - .set(schema_key.as_bytes(), &SchemaValue {}.to_bytes().unwrap()) + .set(schema_key.as_bytes(), &SchemaValue {}.as_bytes().unwrap()) .await .unwrap(); @@ -59,7 +56,7 @@ mod tests { res.insert(String::from_utf8_lossy(&kv.0).to_string()); } assert_eq!( - vec!["__c-greptime-42".to_string()], + vec!["__c-greptime".to_string()], res.into_iter().collect::>() ); } @@ -209,7 +206,6 @@ mod tests { let schema_name = "nonexistent_schema".to_string(); let catalog = Arc::new(RemoteCatalogProvider::new( catalog_name.clone(), - node_id, backend.clone(), )); @@ -281,4 +277,19 @@ mod tests { new_catalog.schema_names().unwrap().into_iter().collect() ) } + + #[tokio::test] + async fn test_next_table_id() { + let node_id = 42; + let (_, _, catalog_manager) = prepare_components(node_id).await; + assert_eq!( + MIN_USER_TABLE_ID, + catalog_manager.next_table_id().await.unwrap() + ); + + assert_eq!( + MIN_USER_TABLE_ID + 1, + catalog_manager.next_table_id().await.unwrap() + ); + } } diff --git a/src/common/catalog/src/consts.rs b/src/common/catalog/src/consts.rs index db6b41af3a..5bd94a75fc 100644 --- a/src/common/catalog/src/consts.rs +++ b/src/common/catalog/src/consts.rs @@ -14,4 +14,6 @@ pub const SCRIPTS_TABLE_ID: u32 = 1; pub(crate) const CATALOG_KEY_PREFIX: &str = "__c"; pub(crate) const SCHEMA_KEY_PREFIX: &str = "__s"; -pub(crate) const TABLE_KEY_PREFIX: &str = "__t"; +pub(crate) const TABLE_GLOBAL_KEY_PREFIX: &str = "__tg"; +pub(crate) const TABLE_REGIONAL_KEY_PREFIX: &str = "__tr"; +pub const TABLE_ID_KEY_PREFIX: &str = "__tid"; diff --git a/src/common/catalog/src/helper.rs b/src/common/catalog/src/helper.rs index db8e9b5565..52f64c4453 100644 --- a/src/common/catalog/src/helper.rs +++ b/src/common/catalog/src/helper.rs @@ -1,5 +1,5 @@ +use std::collections::HashMap; use std::fmt::{Display, Formatter}; -use std::str::FromStr; use lazy_static::lazy_static; use regex::Regex; @@ -7,29 +7,38 @@ use serde::{Deserialize, Serialize, Serializer}; use snafu::{ensure, OptionExt, ResultExt}; use table::metadata::{RawTableMeta, TableId, TableVersion}; -use crate::consts::{CATALOG_KEY_PREFIX, SCHEMA_KEY_PREFIX, TABLE_KEY_PREFIX}; +use crate::consts::{ + CATALOG_KEY_PREFIX, SCHEMA_KEY_PREFIX, TABLE_GLOBAL_KEY_PREFIX, TABLE_REGIONAL_KEY_PREFIX, +}; use crate::error::{ - DeserializeCatalogEntryValueSnafu, Error, InvalidCatalogSnafu, ParseNodeIdSnafu, - SerializeCatalogEntryValueSnafu, + DeserializeCatalogEntryValueSnafu, Error, InvalidCatalogSnafu, SerializeCatalogEntryValueSnafu, }; lazy_static! { static ref CATALOG_KEY_PATTERN: Regex = - Regex::new(&format!("^{}-([a-zA-Z_]+)-([0-9]+)$", CATALOG_KEY_PREFIX)).unwrap(); + Regex::new(&format!("^{}-([a-zA-Z_]+)$", CATALOG_KEY_PREFIX)).unwrap(); } lazy_static! { static ref SCHEMA_KEY_PATTERN: Regex = Regex::new(&format!( - "^{}-([a-zA-Z_]+)-([a-zA-Z_]+)-([0-9]+)$", + "^{}-([a-zA-Z_]+)-([a-zA-Z_]+)$", SCHEMA_KEY_PREFIX )) .unwrap(); } lazy_static! { - static ref TABLE_KEY_PATTERN: Regex = Regex::new(&format!( - "^{}-([a-zA-Z_]+)-([a-zA-Z_]+)-([a-zA-Z_]+)-([0-9]+)-([0-9]+)$", - TABLE_KEY_PREFIX + static ref TABLE_GLOBAL_KEY_PATTERN: Regex = Regex::new(&format!( + "^{}-([a-zA-Z_]+)-([a-zA-Z_]+)-([a-zA-Z_]+)$", + TABLE_GLOBAL_KEY_PREFIX + )) + .unwrap(); +} + +lazy_static! { + static ref TABLE_REGIONAL_KEY_PATTERN: Regex = Regex::new(&format!( + "^{}-([a-zA-Z_]+)-([a-zA-Z_]+)-([a-zA-Z_]+)-([0-9]+)$", + TABLE_REGIONAL_KEY_PREFIX )) .unwrap(); } @@ -42,26 +51,92 @@ pub fn build_schema_prefix(catalog_name: impl AsRef) -> String { format!("{}-{}-", SCHEMA_KEY_PREFIX, catalog_name.as_ref()) } -pub fn build_table_prefix(catalog_name: impl AsRef, schema_name: impl AsRef) -> String { +pub fn build_table_global_prefix( + catalog_name: impl AsRef, + schema_name: impl AsRef, +) -> String { format!( "{}-{}-{}-", - TABLE_KEY_PREFIX, + TABLE_GLOBAL_KEY_PREFIX, catalog_name.as_ref(), schema_name.as_ref() ) } -pub struct TableKey { +pub fn build_table_regional_prefix( + catalog_name: impl AsRef, + schema_name: impl AsRef, +) -> String { + format!( + "{}-{}-{}-", + TABLE_REGIONAL_KEY_PREFIX, + catalog_name.as_ref(), + schema_name.as_ref() + ) +} + +/// Table global info has only one key across all datanodes so it does not have `node_id` field. +pub struct TableGlobalKey { + pub catalog_name: String, + pub schema_name: String, + pub table_name: String, +} + +impl Display for TableGlobalKey { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_str(TABLE_GLOBAL_KEY_PREFIX)?; + f.write_str("-")?; + f.write_str(&self.catalog_name)?; + f.write_str("-")?; + f.write_str(&self.schema_name)?; + f.write_str("-")?; + f.write_str(&self.table_name) + } +} + +impl TableGlobalKey { + pub fn parse>(s: S) -> Result { + let key = s.as_ref(); + let captures = TABLE_GLOBAL_KEY_PATTERN + .captures(key) + .context(InvalidCatalogSnafu { key })?; + ensure!(captures.len() == 4, InvalidCatalogSnafu { key }); + + Ok(Self { + catalog_name: captures[1].to_string(), + schema_name: captures[2].to_string(), + table_name: captures[3].to_string(), + }) + } +} + +/// Table global info contains necessary info for a datanode to create table regions, including +/// table id, table meta(schema...), region id allocation across datanodes. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct TableGlobalValue { + /// Table id is the same across all datanodes. + pub id: TableId, + /// Id of datanode that created the global table info kv. only for debugging. + pub node_id: u64, + /// Allocation of region ids across all datanodes. + pub regions_id_map: HashMap>, + /// Node id -> region ids + pub meta: RawTableMeta, + /// Partition rules for table + pub partition_rules: String, +} + +/// Table regional info that varies between datanode, so it contains a `node_id` field. +pub struct TableRegionalKey { pub catalog_name: String, pub schema_name: String, pub table_name: String, - pub version: TableVersion, pub node_id: u64, } -impl Display for TableKey { +impl Display for TableRegionalKey { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.write_str(TABLE_KEY_PREFIX)?; + f.write_str(TABLE_REGIONAL_KEY_PREFIX)?; f.write_str("-")?; f.write_str(&self.catalog_name)?; f.write_str("-")?; @@ -69,68 +144,47 @@ impl Display for TableKey { f.write_str("-")?; f.write_str(&self.table_name)?; f.write_str("-")?; - f.serialize_u64(self.version)?; - f.write_str("-")?; f.serialize_u64(self.node_id) } } -impl TableKey { +impl TableRegionalKey { pub fn parse>(s: S) -> Result { let key = s.as_ref(); - let captures = TABLE_KEY_PATTERN + let captures = TABLE_REGIONAL_KEY_PATTERN .captures(key) .context(InvalidCatalogSnafu { key })?; - ensure!(captures.len() == 6, InvalidCatalogSnafu { key }); - - let version = - u64::from_str(&captures[4]).map_err(|_| InvalidCatalogSnafu { key }.build())?; - let node_id_str = captures[5].to_string(); - let node_id = u64::from_str(&node_id_str) - .map_err(|_| ParseNodeIdSnafu { key: node_id_str }.build())?; + ensure!(captures.len() == 5, InvalidCatalogSnafu { key }); + let node_id = captures[4] + .to_string() + .parse() + .map_err(|_| InvalidCatalogSnafu { key }.build())?; Ok(Self { catalog_name: captures[1].to_string(), schema_name: captures[2].to_string(), table_name: captures[3].to_string(), - version, node_id, }) } } -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -pub struct TableValue { - pub id: TableId, - pub node_id: u64, +/// Regional table info of specific datanode, including table version on that datanode and +/// region ids allocated by metasrv. +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct TableRegionalValue { + pub version: TableVersion, pub regions_ids: Vec, - pub meta: RawTableMeta, -} - -impl TableValue { - pub fn parse(s: impl AsRef) -> Result { - serde_json::from_str(s.as_ref()) - .context(DeserializeCatalogEntryValueSnafu { raw: s.as_ref() }) - } - - pub fn as_bytes(&self) -> Result, Error> { - Ok(serde_json::to_string(self) - .context(SerializeCatalogEntryValueSnafu)? - .into_bytes()) - } } pub struct CatalogKey { pub catalog_name: String, - pub node_id: u64, } impl Display for CatalogKey { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str(CATALOG_KEY_PREFIX)?; f.write_str("-")?; - f.write_str(&self.catalog_name)?; - f.write_str("-")?; - f.serialize_u64(self.node_id) + f.write_str(&self.catalog_name) } } @@ -140,15 +194,9 @@ impl CatalogKey { let captures = CATALOG_KEY_PATTERN .captures(key) .context(InvalidCatalogSnafu { key })?; - ensure!(captures.len() == 3, InvalidCatalogSnafu { key }); - - let node_id_str = captures[2].to_string(); - let node_id = u64::from_str(&node_id_str) - .map_err(|_| ParseNodeIdSnafu { key: node_id_str }.build())?; - + ensure!(captures.len() == 2, InvalidCatalogSnafu { key }); Ok(Self { catalog_name: captures[1].to_string(), - node_id, }) } } @@ -156,18 +204,9 @@ impl CatalogKey { #[derive(Debug, Serialize, Deserialize)] pub struct CatalogValue; -impl CatalogValue { - pub fn to_bytes(&self) -> Result, Error> { - Ok(serde_json::to_string(self) - .context(SerializeCatalogEntryValueSnafu)? - .into_bytes()) - } -} - pub struct SchemaKey { pub catalog_name: String, pub schema_name: String, - pub node_id: u64, } impl Display for SchemaKey { @@ -176,9 +215,7 @@ impl Display for SchemaKey { f.write_str("-")?; f.write_str(&self.catalog_name)?; f.write_str("-")?; - f.write_str(&self.schema_name)?; - f.write_str("-")?; - f.serialize_u64(self.node_id) + f.write_str(&self.schema_name) } } @@ -188,16 +225,10 @@ impl SchemaKey { let captures = SCHEMA_KEY_PATTERN .captures(key) .context(InvalidCatalogSnafu { key })?; - ensure!(captures.len() == 4, InvalidCatalogSnafu { key }); - - let node_id_str = captures[3].to_string(); - let node_id = u64::from_str(&node_id_str) - .map_err(|_| ParseNodeIdSnafu { key: node_id_str }.build())?; - + ensure!(captures.len() == 3, InvalidCatalogSnafu { key }); Ok(Self { catalog_name: captures[1].to_string(), schema_name: captures[2].to_string(), - node_id, }) } } @@ -205,14 +236,32 @@ impl SchemaKey { #[derive(Debug, Serialize, Deserialize)] pub struct SchemaValue; -impl SchemaValue { - pub fn to_bytes(&self) -> Result, Error> { - Ok(serde_json::to_string(self) - .context(SerializeCatalogEntryValueSnafu)? - .into_bytes()) - } +macro_rules! define_catalog_value { + ( $($val_ty: ty), *) => { + $( + impl $val_ty { + pub fn parse(s: impl AsRef) -> Result { + serde_json::from_str(s.as_ref()) + .context(DeserializeCatalogEntryValueSnafu { raw: s.as_ref() }) + } + + pub fn as_bytes(&self) -> Result, Error> { + Ok(serde_json::to_string(self) + .context(SerializeCatalogEntryValueSnafu)? + .into_bytes()) + } + } + )* + } } +define_catalog_value!( + TableRegionalValue, + TableGlobalValue, + CatalogValue, + SchemaValue +); + #[cfg(test)] mod tests { use datatypes::prelude::ConcreteDataType; @@ -222,32 +271,28 @@ mod tests { #[test] fn test_parse_catalog_key() { - let key = "__c-C-2"; + let key = "__c-C"; let catalog_key = CatalogKey::parse(key).unwrap(); assert_eq!("C", catalog_key.catalog_name); - assert_eq!(2, catalog_key.node_id); assert_eq!(key, catalog_key.to_string()); } #[test] fn test_parse_schema_key() { - let key = "__s-C-S-3"; + let key = "__s-C-S"; let schema_key = SchemaKey::parse(key).unwrap(); assert_eq!("C", schema_key.catalog_name); assert_eq!("S", schema_key.schema_name); - assert_eq!(3, schema_key.node_id); assert_eq!(key, schema_key.to_string()); } #[test] fn test_parse_table_key() { - let key = "__t-C-S-T-42-1"; - let entry = TableKey::parse(key).unwrap(); + let key = "__tg-C-S-T"; + let entry = TableGlobalKey::parse(key).unwrap(); assert_eq!("C", entry.catalog_name); assert_eq!("S", entry.schema_name); assert_eq!("T", entry.table_name); - assert_eq!(1, entry.node_id); - assert_eq!(42, entry.version); assert_eq!(key, &entry.to_string()); } @@ -256,8 +301,8 @@ mod tests { assert_eq!("__c-", build_catalog_prefix()); assert_eq!("__s-CATALOG-", build_schema_prefix("CATALOG")); assert_eq!( - "__t-CATALOG-SCHEMA-", - build_table_prefix("CATALOG", "SCHEMA") + "__tg-CATALOG-SCHEMA-", + build_table_global_prefix("CATALOG", "SCHEMA") ); } @@ -281,14 +326,15 @@ mod tests { region_numbers: vec![1], }; - let value = TableValue { + let value = TableGlobalValue { id: 42, - node_id: 32, - regions_ids: vec![1, 2, 3], + node_id: 0, + regions_id_map: HashMap::from([(0, vec![1, 2, 3])]), meta, + partition_rules: "{}".to_string(), }; let serialized = serde_json::to_string(&value).unwrap(); - let deserialized = TableValue::parse(&serialized).unwrap(); + let deserialized = TableGlobalValue::parse(&serialized).unwrap(); assert_eq!(value, deserialized); } } diff --git a/src/common/catalog/src/lib.rs b/src/common/catalog/src/lib.rs index 3f40c38d46..eb016c2dd0 100644 --- a/src/common/catalog/src/lib.rs +++ b/src/common/catalog/src/lib.rs @@ -3,6 +3,7 @@ pub mod error; mod helper; pub use helper::{ - build_catalog_prefix, build_schema_prefix, build_table_prefix, CatalogKey, CatalogValue, - SchemaKey, SchemaValue, TableKey, TableValue, + build_catalog_prefix, build_schema_prefix, build_table_global_prefix, + build_table_regional_prefix, CatalogKey, CatalogValue, SchemaKey, SchemaValue, TableGlobalKey, + TableGlobalValue, TableRegionalKey, TableRegionalValue, }; diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs index a24ff02274..89d4b4dcee 100644 --- a/src/datanode/src/instance/grpc.rs +++ b/src/datanode/src/instance/grpc.rs @@ -65,7 +65,11 @@ impl Instance { insert_batches: &[InsertBatch], ) -> Result<()> { // Create table automatically, build schema from data. - let table_id = self.catalog_manager.next_table_id(); + let table_id = self + .catalog_manager + .next_table_id() + .await + .context(CatalogSnafu)?; let create_table_request = insert::build_create_table_request( catalog_name, schema_name, diff --git a/src/datanode/src/instance/sql.rs b/src/datanode/src/instance/sql.rs index 28698ab00a..c1c1ca9f6d 100644 --- a/src/datanode/src/instance/sql.rs +++ b/src/datanode/src/instance/sql.rs @@ -10,7 +10,7 @@ use servers::query_handler::SqlQueryHandler; use snafu::prelude::*; use sql::statements::statement::Statement; -use crate::error::{ExecuteSqlSnafu, Result}; +use crate::error::{CatalogSnafu, ExecuteSqlSnafu, Result}; use crate::instance::Instance; use crate::metric; use crate::sql::SqlRequest; @@ -49,7 +49,11 @@ impl Instance { } Statement::Create(c) => { - let table_id = self.catalog_manager.next_table_id(); + let table_id = self + .catalog_manager + .next_table_id() + .await + .context(CatalogSnafu)?; let _engine_name = c.engine.clone(); // TODO(hl): Select table engine by engine_name diff --git a/src/datanode/src/server/grpc/ddl.rs b/src/datanode/src/server/grpc/ddl.rs index 87e6e161a2..7f12d25ff1 100644 --- a/src/datanode/src/server/grpc/ddl.rs +++ b/src/datanode/src/server/grpc/ddl.rs @@ -11,14 +11,14 @@ use futures::TryFutureExt; use snafu::prelude::*; use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest, CreateTableRequest}; -use crate::error::{self, ColumnDefaultConstraintSnafu, MissingFieldSnafu, Result}; +use crate::error::{self, CatalogSnafu, ColumnDefaultConstraintSnafu, MissingFieldSnafu, Result}; use crate::instance::Instance; use crate::server::grpc::handler::AdminResultBuilder; use crate::sql::SqlRequest; impl Instance { pub(crate) async fn handle_create(&self, expr: CreateExpr) -> AdminResult { - let request = self.create_expr_to_request(expr); + let request = self.create_expr_to_request(expr).await; let result = futures::future::ready(request) .and_then(|request| self.sql_handler().execute(SqlRequest::Create(request))) .await; @@ -63,7 +63,7 @@ impl Instance { } } - fn create_expr_to_request(&self, expr: CreateExpr) -> Result { + async fn create_expr_to_request(&self, expr: CreateExpr) -> Result { let schema = create_table_schema(&expr)?; let primary_key_indices = expr @@ -76,8 +76,6 @@ impl Instance { }) .collect::>>()?; - let table_id = self.catalog_manager().next_table_id(); - let catalog_name = expr .catalog_name .unwrap_or_else(|| DEFAULT_CATALOG_NAME.to_string()); @@ -85,6 +83,12 @@ impl Instance { .schema_name .unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string()); + let table_id = self + .catalog_manager() + .next_table_id() + .await + .context(CatalogSnafu)?; + let region_id = expr .table_options .get(&"region_id".to_string()) @@ -196,7 +200,7 @@ mod tests { instance.start().await.unwrap(); let expr = testing_create_expr(); - let request = instance.create_expr_to_request(expr).unwrap(); + let request = instance.create_expr_to_request(expr).await.unwrap(); assert_eq!(request.id, common_catalog::consts::MIN_USER_TABLE_ID); assert_eq!(request.catalog_name, "greptime".to_string()); assert_eq!(request.schema_name, "public".to_string()); @@ -208,7 +212,7 @@ mod tests { let mut expr = testing_create_expr(); expr.primary_keys = vec!["host".to_string(), "not-exist-column".to_string()]; - let result = instance.create_expr_to_request(expr); + let result = instance.create_expr_to_request(expr).await; assert!(result.is_err()); assert!(result .unwrap_err()