diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index e06122b62f..027c6a5896 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -62,17 +62,20 @@ use std::sync::Arc; use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue}; use lazy_static::lazy_static; use regex::Regex; -use snafu::ResultExt; +use snafu::{ensure, OptionExt, ResultExt}; +use table::metadata::{RawTableInfo, TableId}; use table_info::{TableInfoKey, TableInfoManager, TableInfoValue}; use table_name::{TableNameKey, TableNameManager, TableNameValue}; use table_region::{TableRegionKey, TableRegionManager, TableRegionValue}; use self::catalog_name::{CatalogManager, CatalogNameValue}; use self::schema_name::{SchemaManager, SchemaNameValue}; -use self::table_route::TableRouteValue; -use crate::error::{InvalidTableMetadataSnafu, Result, SerdeJsonSnafu}; +use self::table_route::{TableRouteManager, TableRouteValue}; +use crate::error::{self, Error, InvalidTableMetadataSnafu, Result, SerdeJsonSnafu}; pub use crate::key::table_route::{TableRouteKey, TABLE_ROUTE_PREFIX}; +use crate::kv_backend::txn::Txn; use crate::kv_backend::KvBackendRef; +use crate::rpc::router::{region_distribution, RegionRoute}; pub const REMOVED_PREFIX: &str = "__removed"; @@ -130,6 +133,22 @@ pub struct TableMetadataManager { datanode_table_manager: DatanodeTableManager, catalog_manager: CatalogManager, schema_manager: SchemaManager, + table_route_manager: TableRouteManager, + kv_backend: KvBackendRef, +} + +macro_rules! ensure_values { + ($got:expr, $expected_value:expr, $name:expr) => { + ensure!( + $got == $expected_value, + error::UnexpectedSnafu { + err_msg: format!( + "Reads the different value: {:?} during {}, expected: {:?}", + $got, $name, $expected_value + ) + } + ); + }; } impl TableMetadataManager { @@ -140,7 +159,9 @@ impl TableMetadataManager { table_region_manager: TableRegionManager::new(kv_backend.clone()), datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()), catalog_manager: CatalogManager::new(kv_backend.clone()), - schema_manager: SchemaManager::new(kv_backend), + schema_manager: SchemaManager::new(kv_backend.clone()), + table_route_manager: TableRouteManager::new(kv_backend.clone()), + kv_backend, } } @@ -167,6 +188,255 @@ impl TableMetadataManager { pub fn schema_manager(&self) -> &SchemaManager { &self.schema_manager } + + pub fn table_route_manager(&self) -> &TableRouteManager { + &self.table_route_manager + } + + /// Creates metadata for table and returns an error if different metadata exists. + /// The caller MUST ensure it has the exclusive access to `TableNameKey`. + pub async fn create_table_metadata( + &self, + table_info: RawTableInfo, + region_routes: Vec, + ) -> Result<()> { + let table_id = table_info.ident.table_id; + + // Creates table name. + let table_name = TableNameKey::new( + &table_info.catalog_name, + &table_info.schema_name, + &table_info.name, + ); + let create_table_name_txn = self + .table_name_manager() + .build_create_txn(&table_name, table_id)?; + + // Creates table info. + let table_info_value = TableInfoValue::new(table_info); + let (create_table_info_txn, on_create_table_info_failure) = self + .table_info_manager() + .build_create_txn(table_id, &table_info_value)?; + + // Creates datanode table key value pairs. + let distribution = region_distribution(®ion_routes)?; + let create_datanode_table_txn = self + .datanode_table_manager() + .build_create_txn(table_id, distribution)?; + + // Creates table route. + let table_route_value = TableRouteValue::new(region_routes); + let (create_table_route_txn, on_create_table_route_failure) = self + .table_route_manager() + .build_create_txn(table_id, &table_route_value)?; + + let txn = Txn::merge_all(vec![ + create_table_name_txn, + create_table_info_txn, + create_datanode_table_txn, + create_table_route_txn, + ]); + + let r = self.kv_backend.txn(txn).await?; + + // Checks whether metadata was already created. + if !r.succeeded { + let remote_table_info = + on_create_table_info_failure(&r.responses)?.context(error::UnexpectedSnafu { + err_msg: "Reads the empty table info during the create table metadata", + })?; + + let remote_table_route = + on_create_table_route_failure(&r.responses)?.context(error::UnexpectedSnafu { + err_msg: "Reads the empty table route during the create table metadata", + })?; + + let op_name = "the creating table metadata"; + ensure_values!(remote_table_info, table_info_value, op_name); + ensure_values!(remote_table_route, table_route_value, op_name); + } + + Ok(()) + } + + /// Deletes metadata for table. + /// The caller MUST ensure it has the exclusive access to `TableNameKey`. + pub async fn delete_table_metadata( + &self, + table_info_value: TableInfoValue, + region_routes: Vec, + ) -> Result<()> { + let table_info = &table_info_value.table_info; + let table_id = table_info.ident.table_id; + + // Deletes table name. + let table_name = TableNameKey::new( + &table_info.catalog_name, + &table_info.schema_name, + &table_info.name, + ); + + let delete_table_name_txn = self + .table_name_manager() + .build_delete_txn(&table_name, table_id)?; + + // Deletes table info. + let delete_table_info_txn = self + .table_info_manager() + .build_delete_txn(table_id, &table_info_value)?; + + // Deletes datanode table key value pairs. + let distribution = region_distribution(®ion_routes)?; + let delete_datanode_txn = self + .datanode_table_manager() + .build_delete_txn(table_id, distribution)?; + + // Deletes table route. + let table_route_value = TableRouteValue::new(region_routes); + let delete_table_route_txn = self + .table_route_manager() + .build_delete_txn(table_id, &table_route_value)?; + + let txn = Txn::merge_all(vec![ + delete_table_name_txn, + delete_table_info_txn, + delete_datanode_txn, + delete_table_route_txn, + ]); + + // It's always successes. + let _ = self.kv_backend.txn(txn).await?; + + Ok(()) + } + + /// Renames the table name and returns an error if different metadata exists. + /// The caller MUST ensure it has the exclusive access to old and new `TableNameKey`s, + /// and the new `TableNameKey` MUST be empty. + pub async fn rename_table( + &self, + current_table_info_value: TableInfoValue, + new_table_name: String, + ) -> Result<()> { + let current_table_info = ¤t_table_info_value.table_info; + let table_id = current_table_info.ident.table_id; + + let table_name_key = TableNameKey::new( + ¤t_table_info.catalog_name, + ¤t_table_info.schema_name, + ¤t_table_info.name, + ); + + let new_table_name_key = TableNameKey::new( + ¤t_table_info.catalog_name, + ¤t_table_info.schema_name, + &new_table_name, + ); + + // Updates table name. + let update_table_name_txn = self.table_name_manager().build_update_txn( + &table_name_key, + &new_table_name_key, + table_id, + )?; + + let mut new_table_info_value = current_table_info_value.clone(); + new_table_info_value.table_info.name = new_table_name; + + // Updates table info. + let (update_table_info_txn, on_update_table_info_failure) = self + .table_info_manager() + .build_update_txn(table_id, ¤t_table_info_value, &new_table_info_value)?; + + let txn = Txn::merge_all(vec![update_table_name_txn, update_table_info_txn]); + + let r = self.kv_backend.txn(txn).await?; + + // Checks whether metadata was already updated. + if !r.succeeded { + let remote_table_info = + on_update_table_info_failure(&r.responses)?.context(error::UnexpectedSnafu { + err_msg: "Reads the empty table info during the rename table metadata", + })?; + + let op_name = "the renaming table metadata"; + ensure_values!(remote_table_info, new_table_info_value, op_name); + } + + Ok(()) + } + + /// Updates table info and returns an error if different metadata exists. + pub async fn update_table_info( + &self, + current_table_info_value: TableInfoValue, + new_table_info: RawTableInfo, + ) -> Result<()> { + let table_id = current_table_info_value.table_info.ident.table_id; + + let new_table_info_value = current_table_info_value.update(new_table_info); + + // Updates table info. + let (update_table_info_txn, on_update_table_info_failure) = self + .table_info_manager() + .build_update_txn(table_id, ¤t_table_info_value, &new_table_info_value)?; + + let r = self.kv_backend.txn(update_table_info_txn).await?; + + // Checks whether metadata was already updated. + if !r.succeeded { + let remote_table_info = + on_update_table_info_failure(&r.responses)?.context(error::UnexpectedSnafu { + err_msg: "Reads the empty table info during the updating table info", + })?; + + let op_name = "the updating table info"; + ensure_values!(remote_table_info, new_table_info_value, op_name); + } + Ok(()) + } + + pub async fn update_table_route( + &self, + table_id: TableId, + current_table_route_value: TableRouteValue, + new_region_routes: Vec, + ) -> Result<()> { + // Updates the datanode table key value pairs. + let current_region_distribution = + region_distribution(¤t_table_route_value.region_routes)?; + let new_region_distribution = region_distribution(&new_region_routes)?; + + let update_datanode_table_txn = self.datanode_table_manager().build_update_txn( + table_id, + current_region_distribution, + new_region_distribution, + )?; + + // Updates the table_route. + let new_table_route_value = current_table_route_value.update(new_region_routes); + + let (update_table_route_txn, on_update_table_route_failure) = self + .table_route_manager() + .build_update_txn(table_id, ¤t_table_route_value, &new_table_route_value)?; + + let txn = Txn::merge_all(vec![update_datanode_table_txn, update_table_route_txn]); + + let r = self.kv_backend.txn(txn).await?; + + // Checks whether metadata was already updated. + if !r.succeeded { + let remote_table_route = + on_update_table_route_failure(&r.responses)?.context(error::UnexpectedSnafu { + err_msg: "Reads the empty table route during the updating table route", + })?; + + let op_name = "the updating table route"; + ensure_values!(remote_table_route, new_table_route_value, op_name); + } + + Ok(()) + } } macro_rules! impl_table_meta_key { @@ -192,6 +462,10 @@ macro_rules! impl_table_meta_value { ($($val_ty: ty), *) => { $( impl $val_ty { + pub fn try_from_raw_value_ref(raw_value: &[u8]) -> Result { + serde_json::from_slice(raw_value).context(SerdeJsonSnafu) + } + pub fn try_from_raw_value(raw_value: Vec) -> Result { let raw_value = String::from_utf8(raw_value).map_err(|e| { InvalidTableMetadataSnafu { err_msg: e.to_string() }.build() @@ -200,15 +474,28 @@ macro_rules! impl_table_meta_value { } pub fn try_as_raw_value(&self) -> Result> { - serde_json::to_string(self) - .map(|x| x.into_bytes()) - .context(SerdeJsonSnafu) + serde_json::to_vec(self).context(SerdeJsonSnafu) } } )* } } +macro_rules! impl_try_from { + ($($val_ty: ty), *) => { + $( + impl<'a> TryFrom<&'a Vec> for $val_ty { + type Error = Error; + fn try_from(value: &'a Vec) -> Result { + serde_json::from_slice(value).context(SerdeJsonSnafu) + } + } + )* + }; +} + +impl_try_from! {TableInfoValue, TableRouteValue} + impl_table_meta_value! { CatalogNameValue, SchemaNameValue, @@ -221,7 +508,20 @@ impl_table_meta_value! { #[cfg(test)] mod tests { - use crate::key::to_removed_key; + use std::collections::BTreeMap; + use std::sync::Arc; + + use datatypes::prelude::ConcreteDataType; + use datatypes::schema::{ColumnSchema, SchemaBuilder}; + use table::metadata::{RawTableInfo, TableInfo, TableInfoBuilder, TableMetaBuilder}; + + use super::datanode_table::DatanodeTableKey; + use crate::key::table_info::TableInfoValue; + use crate::key::table_route::TableRouteValue; + use crate::key::{to_removed_key, TableMetadataManager}; + use crate::kv_backend::memory::MemoryKvBackend; + use crate::peer::Peer; + use crate::rpc::router::{region_distribution, Region, RegionRoute}; #[test] fn test_to_removed_key() { @@ -229,4 +529,261 @@ mod tests { let removed = "__removed-test_key"; assert_eq!(removed, to_removed_key(key)); } + + fn new_test_table_info() -> TableInfo { + let column_schemas = vec![ + ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true), + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true), + ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true), + ]; + let schema = SchemaBuilder::try_from(column_schemas) + .unwrap() + .version(123) + .build() + .unwrap(); + + let meta = TableMetaBuilder::default() + .schema(Arc::new(schema)) + .primary_key_indices(vec![0]) + .engine("engine") + .next_column_id(3) + .build() + .unwrap(); + TableInfoBuilder::default() + .table_id(10) + .table_version(5) + .name("mytable") + .meta(meta) + .build() + .unwrap() + } + + fn new_test_region_route() -> RegionRoute { + new_region_route(1, 2) + } + + fn new_region_route(region_id: u64, datanode: u64) -> RegionRoute { + RegionRoute { + region: Region { + id: region_id.into(), + name: "r1".to_string(), + partition: None, + attrs: BTreeMap::new(), + }, + leader_peer: Some(Peer::new(datanode, "a2")), + follower_peers: vec![], + } + } + + #[tokio::test] + async fn test_create_table_metadata() { + let mem_kv = Arc::new(MemoryKvBackend::default()); + let table_metadata_manager = TableMetadataManager::new(mem_kv); + let table_info: RawTableInfo = new_test_table_info().into(); + let region_route = new_test_region_route(); + let region_routes = vec![region_route.clone()]; + // creates metadata. + table_metadata_manager + .create_table_metadata(table_info.clone(), region_routes.clone()) + .await + .unwrap(); + // if metadata was already created, it should be ok. + table_metadata_manager + .create_table_metadata(table_info.clone(), region_routes.clone()) + .await + .unwrap(); + let mut modified_region_routes = region_routes.clone(); + modified_region_routes.push(region_route); + // if remote metadata was exists, it should return an error. + assert!(table_metadata_manager + .create_table_metadata(table_info, modified_region_routes) + .await + .is_err()); + } + + #[tokio::test] + async fn test_delete_table_metadata() { + let mem_kv = Arc::new(MemoryKvBackend::default()); + let table_metadata_manager = TableMetadataManager::new(mem_kv); + let table_info: RawTableInfo = new_test_table_info().into(); + let region_route = new_test_region_route(); + let region_routes = vec![region_route.clone()]; + // creates metadata. + table_metadata_manager + .create_table_metadata(table_info.clone(), region_routes.clone()) + .await + .unwrap(); + let table_info_value = TableInfoValue::new(table_info); + // deletes metadata. + table_metadata_manager + .delete_table_metadata(table_info_value.clone(), region_routes.clone()) + .await + .unwrap(); + // if metadata was already deleted, it should be ok. + table_metadata_manager + .delete_table_metadata(table_info_value.clone(), region_routes.clone()) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_rename_table() { + let mem_kv = Arc::new(MemoryKvBackend::default()); + let table_metadata_manager = TableMetadataManager::new(mem_kv); + let table_info: RawTableInfo = new_test_table_info().into(); + let region_route = new_test_region_route(); + let region_routes = vec![region_route.clone()]; + // creates metadata. + table_metadata_manager + .create_table_metadata(table_info.clone(), region_routes.clone()) + .await + .unwrap(); + let new_table_name = "another_name".to_string(); + let table_info_value = TableInfoValue::new(table_info.clone()); + table_metadata_manager + .rename_table(table_info_value.clone(), new_table_name.clone()) + .await + .unwrap(); + // if remote metadata was updated, it should be ok. + table_metadata_manager + .rename_table(table_info_value.clone(), new_table_name.clone()) + .await + .unwrap(); + let mut modified_table_info = table_info; + modified_table_info.name = "hi".to_string(); + let modified_table_info_value = table_info_value.update(modified_table_info); + // if the table_info_value is wrong, it should return an error. + // The ABA problem. + assert!(table_metadata_manager + .rename_table(modified_table_info_value.clone(), new_table_name.clone()) + .await + .is_err()) + } + + #[tokio::test] + async fn test_update_table_info() { + let mem_kv = Arc::new(MemoryKvBackend::default()); + let table_metadata_manager = TableMetadataManager::new(mem_kv); + let table_info: RawTableInfo = new_test_table_info().into(); + let region_route = new_test_region_route(); + let region_routes = vec![region_route.clone()]; + // creates metadata. + table_metadata_manager + .create_table_metadata(table_info.clone(), region_routes.clone()) + .await + .unwrap(); + let mut new_table_info = table_info.clone(); + new_table_info.name = "hi".to_string(); + let current_table_info_value = TableInfoValue::new(table_info.clone()); + // should be ok. + table_metadata_manager + .update_table_info(current_table_info_value.clone(), new_table_info.clone()) + .await + .unwrap(); + // if table info was updated, it should be ok. + table_metadata_manager + .update_table_info(current_table_info_value.clone(), new_table_info.clone()) + .await + .unwrap(); + let mut wrong_table_info = table_info.clone(); + wrong_table_info.name = "wrong".to_string(); + let wrong_table_info_value = current_table_info_value.update(wrong_table_info); + // if the current_table_info_value is wrong, it should return an error. + // The ABA problem. + assert!(table_metadata_manager + .update_table_info(wrong_table_info_value, new_table_info) + .await + .is_err()) + } + + async fn assert_datanode_table( + table_metadata_manager: &TableMetadataManager, + table_id: u32, + region_routes: &[RegionRoute], + ) { + let region_distribution = region_distribution(region_routes).unwrap(); + for (datanode, regions) in region_distribution { + let got = table_metadata_manager + .datanode_table_manager() + .get(&DatanodeTableKey::new(datanode, table_id)) + .await + .unwrap() + .unwrap(); + + assert_eq!(got.regions, regions) + } + } + + #[tokio::test] + async fn test_update_table_route() { + let mem_kv = Arc::new(MemoryKvBackend::default()); + let table_metadata_manager = TableMetadataManager::new(mem_kv); + let table_info: RawTableInfo = new_test_table_info().into(); + let region_route = new_test_region_route(); + let region_routes = vec![region_route.clone()]; + let table_id = table_info.ident.table_id; + let current_table_route_value = TableRouteValue::new(region_routes.clone()); + // creates metadata. + table_metadata_manager + .create_table_metadata(table_info.clone(), region_routes.clone()) + .await + .unwrap(); + assert_datanode_table(&table_metadata_manager, table_id, ®ion_routes).await; + let new_region_routes = vec![ + new_region_route(1, 1), + new_region_route(2, 2), + new_region_route(3, 3), + ]; + // it should be ok. + table_metadata_manager + .update_table_route( + table_id, + current_table_route_value.clone(), + new_region_routes.clone(), + ) + .await + .unwrap(); + assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await; + + // if the table route was updated. it should be ok. + table_metadata_manager + .update_table_route( + table_id, + current_table_route_value.clone(), + new_region_routes.clone(), + ) + .await + .unwrap(); + + let current_table_route_value = current_table_route_value.update(new_region_routes.clone()); + let new_region_routes = vec![new_region_route(4, 4), new_region_route(5, 5)]; + // it should be ok. + table_metadata_manager + .update_table_route( + table_id, + current_table_route_value.clone(), + new_region_routes.clone(), + ) + .await + .unwrap(); + assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await; + + // if the current_table_route_value is wrong, it should return an error. + // The ABA problem. + let wrong_table_route_value = current_table_route_value.update(vec![ + new_region_route(1, 1), + new_region_route(2, 2), + new_region_route(3, 3), + new_region_route(4, 4), + ]); + assert!(table_metadata_manager + .update_table_route(table_id, wrong_table_route_value, new_region_routes) + .await + .is_err()); + } } diff --git a/src/common/meta/src/key/datanode_table.rs b/src/common/meta/src/key/datanode_table.rs index 3f621800ba..ea77b102f2 100644 --- a/src/common/meta/src/key/datanode_table.rs +++ b/src/common/meta/src/key/datanode_table.rs @@ -17,6 +17,7 @@ use snafu::{ensure, OptionExt}; use store_api::storage::RegionNumber; use table::metadata::TableId; +use super::table_region::RegionDistribution; use super::{DATANODE_TABLE_KEY_PATTERN, DATANODE_TABLE_KEY_PREFIX}; use crate::error::{InvalidTableMetadataSnafu, MoveRegionSnafu, Result, UnexpectedSnafu}; use crate::key::{to_removed_key, TableMetaKey}; @@ -105,6 +106,88 @@ impl DatanodeTableManager { .transpose() } + /// Builds the create datanode table transactions. It only executes while the primary keys comparing successes. + pub fn build_create_txn( + &self, + table_id: TableId, + distribution: RegionDistribution, + ) -> Result { + let txns = distribution + .into_iter() + .map(|(datanode_id, regions)| { + let key = DatanodeTableKey::new(datanode_id, table_id); + let val = DatanodeTableValue::new(table_id, regions); + + Ok(TxnOp::Put(key.as_raw_key(), val.try_as_raw_value()?)) + }) + .collect::>>()?; + + let txn = Txn::default().and_then(txns); + + Ok(txn) + } + + /// Builds the update datanode table transactions. It only executes while the primary keys comparing successes. + pub(crate) fn build_update_txn( + &self, + table_id: TableId, + current_region_distribution: RegionDistribution, + new_region_distribution: RegionDistribution, + ) -> Result { + let mut opts = Vec::new(); + + // Removes the old datanode table key value pairs + for current_datanode in current_region_distribution.keys() { + if !new_region_distribution.contains_key(current_datanode) { + let key = DatanodeTableKey::new(*current_datanode, table_id); + let raw_key = key.as_raw_key(); + opts.push(TxnOp::Delete(raw_key)) + } + } + + for (datanode, regions) in new_region_distribution.into_iter() { + if let Some(current_region) = current_region_distribution.get(&datanode) { + // Updates if need. + if *current_region != regions { + let key = DatanodeTableKey::new(datanode, table_id); + let raw_key = key.as_raw_key(); + let val = DatanodeTableValue::new(table_id, regions).try_as_raw_value()?; + opts.push(TxnOp::Put(raw_key, val)); + } + } else { + // New datanodes + let key = DatanodeTableKey::new(datanode, table_id); + let raw_key = key.as_raw_key(); + let val = DatanodeTableValue::new(table_id, regions).try_as_raw_value()?; + opts.push(TxnOp::Put(raw_key, val)); + } + } + + let txn = Txn::default().and_then(opts); + Ok(txn) + } + + /// Builds the delete datanode table transactions. It only executes while the primary keys comparing successes. + pub fn build_delete_txn( + &self, + table_id: TableId, + distribution: RegionDistribution, + ) -> Result { + let txns = distribution + .into_keys() + .map(|datanode_id| { + let key = DatanodeTableKey::new(datanode_id, table_id); + let raw_key = key.as_raw_key(); + + Ok(TxnOp::Delete(raw_key)) + }) + .collect::>>()?; + + let txn = Txn::default().and_then(txns); + + Ok(txn) + } + /// Create DatanodeTable key and value. If the key already exists, check if the value is the same. pub async fn create( &self, diff --git a/src/common/meta/src/key/table_info.rs b/src/common/meta/src/key/table_info.rs index a3969c5d4e..d19f1ca8f0 100644 --- a/src/common/meta/src/key/table_info.rs +++ b/src/common/meta/src/key/table_info.rs @@ -19,6 +19,7 @@ use table::metadata::{RawTableInfo, TableId}; use super::TABLE_INFO_KEY_PREFIX; use crate::error::{Result, UnexpectedSnafu}; use crate::key::{to_removed_key, TableMetaKey}; +use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse}; use crate::kv_backend::KvBackendRef; use crate::rpc::store::{CompareAndPutRequest, MoveValueRequest}; @@ -51,6 +52,13 @@ impl TableInfoValue { version: 0, } } + + pub(crate) fn update(&self, new_table_info: RawTableInfo) -> Self { + Self { + table_info: new_table_info, + version: self.version + 1, + } + } } pub struct TableInfoManager { @@ -62,6 +70,100 @@ impl TableInfoManager { Self { kv_backend } } + /// Builds a create table info transaction, it expected the `__table_info/{table_id}` wasn't occupied. + pub(crate) fn build_create_txn( + &self, + table_id: TableId, + table_info_value: &TableInfoValue, + ) -> Result<( + Txn, + impl FnOnce(&Vec) -> Result>, + )> { + let key = TableInfoKey::new(table_id); + let raw_key = key.as_raw_key(); + + let txn = Txn::default() + .when(vec![Compare::with_not_exist_value( + raw_key.clone(), + CompareOp::Equal, + )]) + .and_then(vec![TxnOp::Put( + raw_key.clone(), + table_info_value.try_as_raw_value()?, + )]) + .or_else(vec![TxnOp::Get(raw_key.clone())]); + + Ok((txn, Self::build_decode_fn(raw_key))) + } + + /// Builds a update table info transaction, it expected the remote value equals the `current_current_table_info_value`. + /// It retrieves the latest value if the comparing failed. + pub(crate) fn build_update_txn( + &self, + table_id: TableId, + current_table_info_value: &TableInfoValue, + new_table_info_value: &TableInfoValue, + ) -> Result<( + Txn, + impl FnOnce(&Vec) -> Result>, + )> { + let key = TableInfoKey::new(table_id); + let raw_key = key.as_raw_key(); + let raw_value = current_table_info_value.try_as_raw_value()?; + + let txn = Txn::default() + .when(vec![Compare::with_value( + raw_key.clone(), + CompareOp::Equal, + raw_value.clone(), + )]) + .and_then(vec![TxnOp::Put( + raw_key.clone(), + new_table_info_value.try_as_raw_value()?, + )]) + .or_else(vec![TxnOp::Get(raw_key.clone())]); + + Ok((txn, Self::build_decode_fn(raw_key))) + } + + /// Builds a delete table info transaction. + pub(crate) fn build_delete_txn( + &self, + table_id: TableId, + table_info_value: &TableInfoValue, + ) -> Result { + let key = TableInfoKey::new(table_id); + let raw_key = key.as_raw_key(); + let raw_value = table_info_value.try_as_raw_value()?; + let removed_key = to_removed_key(&String::from_utf8_lossy(&raw_key)); + + let txn = Txn::default().and_then(vec![ + TxnOp::Delete(raw_key.clone()), + TxnOp::Put(removed_key.into_bytes(), raw_value), + ]); + + Ok(txn) + } + + fn build_decode_fn( + raw_key: Vec, + ) -> impl FnOnce(&Vec) -> Result> { + move |kvs: &Vec| { + kvs.iter() + .filter_map(|resp| { + if let TxnOpResponse::ResponseGet(r) = resp { + Some(r) + } else { + None + } + }) + .flat_map(|r| &r.kvs) + .find(|kv| kv.key == raw_key) + .map(|kv| TableInfoValue::try_from(&kv.value)) + .transpose() + } + } + pub async fn get(&self, table_id: TableId) -> Result> { let key = TableInfoKey::new(table_id); let raw_key = key.as_raw_key(); diff --git a/src/common/meta/src/key/table_name.rs b/src/common/meta/src/key/table_name.rs index 3b729ab20e..c64fa2d988 100644 --- a/src/common/meta/src/key/table_name.rs +++ b/src/common/meta/src/key/table_name.rs @@ -150,6 +150,59 @@ impl TableNameManager { Self { kv_backend } } + /// Builds a create table name transaction. It only executes while the primary keys comparing successes. + pub(crate) fn build_create_txn( + &self, + key: &TableNameKey<'_>, + table_id: TableId, + ) -> Result { + let raw_key = key.as_raw_key(); + let value = TableNameValue::new(table_id); + let raw_value = value.try_as_raw_value()?; + + let txn = Txn::default().and_then(vec![TxnOp::Put(raw_key, raw_value)]); + + Ok(txn) + } + + /// Builds a update table name transaction. It only executes while the primary keys comparing successes. + pub(crate) fn build_update_txn( + &self, + key: &TableNameKey<'_>, + new_key: &TableNameKey<'_>, + table_id: TableId, + ) -> Result { + let raw_key = key.as_raw_key(); + let new_raw_key = new_key.as_raw_key(); + let value = TableNameValue::new(table_id); + let raw_value = value.try_as_raw_value()?; + + let txn = Txn::default().and_then(vec![ + TxnOp::Delete(raw_key), + TxnOp::Put(new_raw_key, raw_value), + ]); + Ok(txn) + } + + /// Builds a delete table name transaction. It only executes while the primary keys comparing successes. + pub(crate) fn build_delete_txn( + &self, + key: &TableNameKey<'_>, + table_id: TableId, + ) -> Result { + let raw_key = key.as_raw_key(); + let value = TableNameValue::new(table_id); + let raw_value = value.try_as_raw_value()?; + let removed_key = to_removed_key(&String::from_utf8_lossy(&raw_key)); + + let txn = Txn::default().and_then(vec![ + TxnOp::Delete(raw_key), + TxnOp::Put(removed_key.into_bytes(), raw_value), + ]); + + Ok(txn) + } + /// Create TableName key and value. If the key already exists, check if the value is the same. pub async fn create(&self, key: &TableNameKey<'_>, table_id: TableId) -> Result<()> { let raw_key = key.as_raw_key(); diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index b4a06d0b2c..8f1f656b1a 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -15,15 +15,19 @@ use std::fmt::Display; use api::v1::meta::TableName; +use futures::future::try_join_all; +use serde::__private::de; use serde::{Deserialize, Serialize}; use snafu::ensure; use table::metadata::TableId; use crate::error::{Result, UnexpectedSnafu}; use crate::key::{to_removed_key, TableMetaKey}; +use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse, TxnRequest}; use crate::kv_backend::KvBackendRef; -use crate::rpc::router::{RegionRoute, Table, TableRoute}; -use crate::rpc::store::{CompareAndPutRequest, MoveValueRequest}; +use crate::rpc::router::{region_distribution, RegionRoute, Table, TableRoute}; +use crate::rpc::store::{BatchGetRequest, CompareAndPutRequest, MoveValueRequest}; +use crate::rpc::KeyValue; pub const TABLE_ROUTE_PREFIX: &str = "__meta_table_route"; @@ -53,6 +57,13 @@ impl TableRouteValue { version: 0, } } + + pub fn update(&self, region_routes: Vec) -> Self { + Self { + region_routes, + version: self.version + 1, + } + } } impl TableMetaKey for NextTableRouteKey { @@ -76,6 +87,99 @@ impl TableRouteManager { Self { kv_backend } } + /// Builds a create table route transaction. it expected the `__table_route/{table_id}` wasn't occupied. + pub(crate) fn build_create_txn( + &self, + table_id: TableId, + table_route_value: &TableRouteValue, + ) -> Result<( + Txn, + impl FnOnce(&Vec) -> Result>, + )> { + let key = NextTableRouteKey::new(table_id); + let raw_key = key.as_raw_key(); + + let txn = Txn::default() + .when(vec![Compare::with_not_exist_value( + raw_key.clone(), + CompareOp::Equal, + )]) + .and_then(vec![TxnOp::Put( + raw_key.clone(), + table_route_value.try_as_raw_value()?, + )]) + .or_else(vec![TxnOp::Get(raw_key.clone())]); + + Ok((txn, Self::build_decode_fn(raw_key))) + } + + /// Builds a update table route transaction, it expected the remote value equals the `current_table_route_value`. + /// It retrieves the latest value if the comparing failed. + pub(crate) fn build_update_txn( + &self, + table_id: TableId, + current_table_route_value: &TableRouteValue, + new_table_route_value: &TableRouteValue, + ) -> Result<( + Txn, + impl FnOnce(&Vec) -> Result>, + )> { + let key = NextTableRouteKey::new(table_id); + let raw_key = key.as_raw_key(); + let raw_value = current_table_route_value.try_as_raw_value()?; + let new_raw_value: Vec = new_table_route_value.try_as_raw_value()?; + + let txn = Txn::default() + .when(vec![Compare::with_value( + raw_key.clone(), + CompareOp::Equal, + raw_value.clone(), + )]) + .and_then(vec![TxnOp::Put(raw_key.clone(), new_raw_value)]) + .or_else(vec![TxnOp::Get(raw_key.clone())]); + + Ok((txn, Self::build_decode_fn(raw_key))) + } + + /// Builds a delete table route transaction, it expected the remote value equals the `table_route_value`. + pub(crate) fn build_delete_txn( + &self, + table_id: TableId, + table_route_value: &TableRouteValue, + ) -> Result { + let key = NextTableRouteKey::new(table_id); + let raw_key = key.as_raw_key(); + let raw_value = table_route_value.try_as_raw_value()?; + let removed_key = to_removed_key(&String::from_utf8_lossy(&raw_key)); + + let txn = Txn::default().and_then(vec![ + TxnOp::Delete(raw_key), + TxnOp::Put(removed_key.into_bytes(), raw_value), + ]); + + Ok(txn) + } + + fn build_decode_fn( + raw_key: Vec, + ) -> impl FnOnce(&Vec) -> Result> { + move |response: &Vec| { + response + .iter() + .filter_map(|resp| { + if let TxnOpResponse::ResponseGet(r) = resp { + Some(r) + } else { + None + } + }) + .flat_map(|r| &r.kvs) + .find(|kv| kv.key == raw_key) + .map(|kv| TableRouteValue::try_from(&kv.value)) + .transpose() + } + } + pub async fn get(&self, table_id: TableId) -> Result> { let key = NextTableRouteKey::new(table_id); self.kv_backend diff --git a/src/common/meta/src/kv_backend/txn.rs b/src/common/meta/src/kv_backend/txn.rs index 4317ef5470..adf2b41a66 100644 --- a/src/common/meta/src/kv_backend/txn.rs +++ b/src/common/meta/src/kv_backend/txn.rs @@ -100,6 +100,14 @@ pub struct TxnRequest { pub failure: Vec, } +impl TxnRequest { + pub fn extend(&mut self, other: TxnRequest) { + self.compare.extend(other.compare); + self.success.extend(other.success); + self.failure.extend(other.failure); + } +} + #[derive(Debug, Clone, PartialEq)] pub enum TxnOpResponse { ResponsePut(PutResponse), @@ -121,6 +129,23 @@ pub struct Txn { } impl Txn { + pub fn merge_all>(values: T) -> Self { + values + .into_iter() + .reduce(|acc, e| acc.merge(e)) + .unwrap_or_default() + } + + pub fn merge(mut self, other: Txn) -> Self { + self.c_when |= other.c_when; + self.c_then |= other.c_then; + self.c_else |= other.c_else; + + self.req.extend(other.req); + + self + } + pub fn new() -> Self { Txn::default() } diff --git a/src/common/meta/src/rpc/router.rs b/src/common/meta/src/rpc/router.rs index 71fe8ccb4c..cec604ca75 100644 --- a/src/common/meta/src/rpc/router.rs +++ b/src/common/meta/src/rpc/router.rs @@ -26,6 +26,7 @@ use store_api::storage::{RegionId, RegionNumber}; use table::metadata::TableId; use crate::error::{self, Result}; +use crate::key::table_region::RegionDistribution; use crate::peer::Peer; use crate::rpc::util; use crate::table_name::TableName; @@ -73,6 +74,23 @@ impl TryFrom for RouteResponse { } } +pub(crate) fn region_distribution(region_routes: &[RegionRoute]) -> Result { + let mut regions_id_map = RegionDistribution::new(); + for route in region_routes.iter() { + let node_id = route + .leader_peer + .as_ref() + .context(error::UnexpectedSnafu { + err_msg: "leader not found", + })? + .id; + + let region_id = route.region.id.region_number(); + regions_id_map.entry(node_id).or_default().push(region_id); + } + Ok(regions_id_map) +} + #[derive(Debug, Clone, Deserialize, Serialize, PartialEq)] pub struct TableRoute { pub table: Table,