refactor: refactor table metadata manager (#2159)

* refactor: table-metadata-manager

* feat: remove comparing when deleting metadata

* fix: fix comment typos

* chore: apply suggestions from CR

* test: add tests for updating DatanodeTable

* fix: fix clippy

* chore: apply suggestions from CR

* refactor: improve update table route tests

* refactor: return Txn instead of TxnRequest

* chore: apply suggestions from CR

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2023-08-16 14:43:03 +08:00
committed by GitHub
parent 1afe96e397
commit 8ea1763033
7 changed files with 952 additions and 10 deletions

View File

@@ -62,17 +62,20 @@ use std::sync::Arc;
use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue};
use lazy_static::lazy_static;
use regex::Regex;
use snafu::ResultExt;
use snafu::{ensure, OptionExt, ResultExt};
use table::metadata::{RawTableInfo, TableId};
use table_info::{TableInfoKey, TableInfoManager, TableInfoValue};
use table_name::{TableNameKey, TableNameManager, TableNameValue};
use table_region::{TableRegionKey, TableRegionManager, TableRegionValue};
use self::catalog_name::{CatalogManager, CatalogNameValue};
use self::schema_name::{SchemaManager, SchemaNameValue};
use self::table_route::TableRouteValue;
use crate::error::{InvalidTableMetadataSnafu, Result, SerdeJsonSnafu};
use self::table_route::{TableRouteManager, TableRouteValue};
use crate::error::{self, Error, InvalidTableMetadataSnafu, Result, SerdeJsonSnafu};
pub use crate::key::table_route::{TableRouteKey, TABLE_ROUTE_PREFIX};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::KvBackendRef;
use crate::rpc::router::{region_distribution, RegionRoute};
pub const REMOVED_PREFIX: &str = "__removed";
@@ -130,6 +133,22 @@ pub struct TableMetadataManager {
datanode_table_manager: DatanodeTableManager,
catalog_manager: CatalogManager,
schema_manager: SchemaManager,
table_route_manager: TableRouteManager,
kv_backend: KvBackendRef,
}
macro_rules! ensure_values {
($got:expr, $expected_value:expr, $name:expr) => {
ensure!(
$got == $expected_value,
error::UnexpectedSnafu {
err_msg: format!(
"Reads the different value: {:?} during {}, expected: {:?}",
$got, $name, $expected_value
)
}
);
};
}
impl TableMetadataManager {
@@ -140,7 +159,9 @@ impl TableMetadataManager {
table_region_manager: TableRegionManager::new(kv_backend.clone()),
datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
catalog_manager: CatalogManager::new(kv_backend.clone()),
schema_manager: SchemaManager::new(kv_backend),
schema_manager: SchemaManager::new(kv_backend.clone()),
table_route_manager: TableRouteManager::new(kv_backend.clone()),
kv_backend,
}
}
@@ -167,6 +188,255 @@ impl TableMetadataManager {
pub fn schema_manager(&self) -> &SchemaManager {
&self.schema_manager
}
pub fn table_route_manager(&self) -> &TableRouteManager {
&self.table_route_manager
}
/// Creates metadata for table and returns an error if different metadata exists.
/// The caller MUST ensure it has the exclusive access to `TableNameKey`.
pub async fn create_table_metadata(
&self,
table_info: RawTableInfo,
region_routes: Vec<RegionRoute>,
) -> Result<()> {
let table_id = table_info.ident.table_id;
// Creates table name.
let table_name = TableNameKey::new(
&table_info.catalog_name,
&table_info.schema_name,
&table_info.name,
);
let create_table_name_txn = self
.table_name_manager()
.build_create_txn(&table_name, table_id)?;
// Creates table info.
let table_info_value = TableInfoValue::new(table_info);
let (create_table_info_txn, on_create_table_info_failure) = self
.table_info_manager()
.build_create_txn(table_id, &table_info_value)?;
// Creates datanode table key value pairs.
let distribution = region_distribution(&region_routes)?;
let create_datanode_table_txn = self
.datanode_table_manager()
.build_create_txn(table_id, distribution)?;
// Creates table route.
let table_route_value = TableRouteValue::new(region_routes);
let (create_table_route_txn, on_create_table_route_failure) = self
.table_route_manager()
.build_create_txn(table_id, &table_route_value)?;
let txn = Txn::merge_all(vec![
create_table_name_txn,
create_table_info_txn,
create_datanode_table_txn,
create_table_route_txn,
]);
let r = self.kv_backend.txn(txn).await?;
// Checks whether metadata was already created.
if !r.succeeded {
let remote_table_info =
on_create_table_info_failure(&r.responses)?.context(error::UnexpectedSnafu {
err_msg: "Reads the empty table info during the create table metadata",
})?;
let remote_table_route =
on_create_table_route_failure(&r.responses)?.context(error::UnexpectedSnafu {
err_msg: "Reads the empty table route during the create table metadata",
})?;
let op_name = "the creating table metadata";
ensure_values!(remote_table_info, table_info_value, op_name);
ensure_values!(remote_table_route, table_route_value, op_name);
}
Ok(())
}
/// Deletes metadata for table.
/// The caller MUST ensure it has the exclusive access to `TableNameKey`.
pub async fn delete_table_metadata(
&self,
table_info_value: TableInfoValue,
region_routes: Vec<RegionRoute>,
) -> Result<()> {
let table_info = &table_info_value.table_info;
let table_id = table_info.ident.table_id;
// Deletes table name.
let table_name = TableNameKey::new(
&table_info.catalog_name,
&table_info.schema_name,
&table_info.name,
);
let delete_table_name_txn = self
.table_name_manager()
.build_delete_txn(&table_name, table_id)?;
// Deletes table info.
let delete_table_info_txn = self
.table_info_manager()
.build_delete_txn(table_id, &table_info_value)?;
// Deletes datanode table key value pairs.
let distribution = region_distribution(&region_routes)?;
let delete_datanode_txn = self
.datanode_table_manager()
.build_delete_txn(table_id, distribution)?;
// Deletes table route.
let table_route_value = TableRouteValue::new(region_routes);
let delete_table_route_txn = self
.table_route_manager()
.build_delete_txn(table_id, &table_route_value)?;
let txn = Txn::merge_all(vec![
delete_table_name_txn,
delete_table_info_txn,
delete_datanode_txn,
delete_table_route_txn,
]);
// It's always successes.
let _ = self.kv_backend.txn(txn).await?;
Ok(())
}
/// Renames the table name and returns an error if different metadata exists.
/// The caller MUST ensure it has the exclusive access to old and new `TableNameKey`s,
/// and the new `TableNameKey` MUST be empty.
pub async fn rename_table(
&self,
current_table_info_value: TableInfoValue,
new_table_name: String,
) -> Result<()> {
let current_table_info = &current_table_info_value.table_info;
let table_id = current_table_info.ident.table_id;
let table_name_key = TableNameKey::new(
&current_table_info.catalog_name,
&current_table_info.schema_name,
&current_table_info.name,
);
let new_table_name_key = TableNameKey::new(
&current_table_info.catalog_name,
&current_table_info.schema_name,
&new_table_name,
);
// Updates table name.
let update_table_name_txn = self.table_name_manager().build_update_txn(
&table_name_key,
&new_table_name_key,
table_id,
)?;
let mut new_table_info_value = current_table_info_value.clone();
new_table_info_value.table_info.name = new_table_name;
// Updates table info.
let (update_table_info_txn, on_update_table_info_failure) = self
.table_info_manager()
.build_update_txn(table_id, &current_table_info_value, &new_table_info_value)?;
let txn = Txn::merge_all(vec![update_table_name_txn, update_table_info_txn]);
let r = self.kv_backend.txn(txn).await?;
// Checks whether metadata was already updated.
if !r.succeeded {
let remote_table_info =
on_update_table_info_failure(&r.responses)?.context(error::UnexpectedSnafu {
err_msg: "Reads the empty table info during the rename table metadata",
})?;
let op_name = "the renaming table metadata";
ensure_values!(remote_table_info, new_table_info_value, op_name);
}
Ok(())
}
/// Updates table info and returns an error if different metadata exists.
pub async fn update_table_info(
&self,
current_table_info_value: TableInfoValue,
new_table_info: RawTableInfo,
) -> Result<()> {
let table_id = current_table_info_value.table_info.ident.table_id;
let new_table_info_value = current_table_info_value.update(new_table_info);
// Updates table info.
let (update_table_info_txn, on_update_table_info_failure) = self
.table_info_manager()
.build_update_txn(table_id, &current_table_info_value, &new_table_info_value)?;
let r = self.kv_backend.txn(update_table_info_txn).await?;
// Checks whether metadata was already updated.
if !r.succeeded {
let remote_table_info =
on_update_table_info_failure(&r.responses)?.context(error::UnexpectedSnafu {
err_msg: "Reads the empty table info during the updating table info",
})?;
let op_name = "the updating table info";
ensure_values!(remote_table_info, new_table_info_value, op_name);
}
Ok(())
}
pub async fn update_table_route(
&self,
table_id: TableId,
current_table_route_value: TableRouteValue,
new_region_routes: Vec<RegionRoute>,
) -> Result<()> {
// Updates the datanode table key value pairs.
let current_region_distribution =
region_distribution(&current_table_route_value.region_routes)?;
let new_region_distribution = region_distribution(&new_region_routes)?;
let update_datanode_table_txn = self.datanode_table_manager().build_update_txn(
table_id,
current_region_distribution,
new_region_distribution,
)?;
// Updates the table_route.
let new_table_route_value = current_table_route_value.update(new_region_routes);
let (update_table_route_txn, on_update_table_route_failure) = self
.table_route_manager()
.build_update_txn(table_id, &current_table_route_value, &new_table_route_value)?;
let txn = Txn::merge_all(vec![update_datanode_table_txn, update_table_route_txn]);
let r = self.kv_backend.txn(txn).await?;
// Checks whether metadata was already updated.
if !r.succeeded {
let remote_table_route =
on_update_table_route_failure(&r.responses)?.context(error::UnexpectedSnafu {
err_msg: "Reads the empty table route during the updating table route",
})?;
let op_name = "the updating table route";
ensure_values!(remote_table_route, new_table_route_value, op_name);
}
Ok(())
}
}
macro_rules! impl_table_meta_key {
@@ -192,6 +462,10 @@ macro_rules! impl_table_meta_value {
($($val_ty: ty), *) => {
$(
impl $val_ty {
pub fn try_from_raw_value_ref(raw_value: &[u8]) -> Result<Self> {
serde_json::from_slice(raw_value).context(SerdeJsonSnafu)
}
pub fn try_from_raw_value(raw_value: Vec<u8>) -> Result<Self> {
let raw_value = String::from_utf8(raw_value).map_err(|e| {
InvalidTableMetadataSnafu { err_msg: e.to_string() }.build()
@@ -200,15 +474,28 @@ macro_rules! impl_table_meta_value {
}
pub fn try_as_raw_value(&self) -> Result<Vec<u8>> {
serde_json::to_string(self)
.map(|x| x.into_bytes())
.context(SerdeJsonSnafu)
serde_json::to_vec(self).context(SerdeJsonSnafu)
}
}
)*
}
}
macro_rules! impl_try_from {
($($val_ty: ty), *) => {
$(
impl<'a> TryFrom<&'a Vec<u8>> for $val_ty {
type Error = Error;
fn try_from(value: &'a Vec<u8>) -> Result<Self> {
serde_json::from_slice(value).context(SerdeJsonSnafu)
}
}
)*
};
}
impl_try_from! {TableInfoValue, TableRouteValue}
impl_table_meta_value! {
CatalogNameValue,
SchemaNameValue,
@@ -221,7 +508,20 @@ impl_table_meta_value! {
#[cfg(test)]
mod tests {
use crate::key::to_removed_key;
use std::collections::BTreeMap;
use std::sync::Arc;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaBuilder};
use table::metadata::{RawTableInfo, TableInfo, TableInfoBuilder, TableMetaBuilder};
use super::datanode_table::DatanodeTableKey;
use crate::key::table_info::TableInfoValue;
use crate::key::table_route::TableRouteValue;
use crate::key::{to_removed_key, TableMetadataManager};
use crate::kv_backend::memory::MemoryKvBackend;
use crate::peer::Peer;
use crate::rpc::router::{region_distribution, Region, RegionRoute};
#[test]
fn test_to_removed_key() {
@@ -229,4 +529,261 @@ mod tests {
let removed = "__removed-test_key";
assert_eq!(removed, to_removed_key(key));
}
fn new_test_table_info() -> TableInfo {
let column_schemas = vec![
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
];
let schema = SchemaBuilder::try_from(column_schemas)
.unwrap()
.version(123)
.build()
.unwrap();
let meta = TableMetaBuilder::default()
.schema(Arc::new(schema))
.primary_key_indices(vec![0])
.engine("engine")
.next_column_id(3)
.build()
.unwrap();
TableInfoBuilder::default()
.table_id(10)
.table_version(5)
.name("mytable")
.meta(meta)
.build()
.unwrap()
}
fn new_test_region_route() -> RegionRoute {
new_region_route(1, 2)
}
fn new_region_route(region_id: u64, datanode: u64) -> RegionRoute {
RegionRoute {
region: Region {
id: region_id.into(),
name: "r1".to_string(),
partition: None,
attrs: BTreeMap::new(),
},
leader_peer: Some(Peer::new(datanode, "a2")),
follower_peers: vec![],
}
}
#[tokio::test]
async fn test_create_table_metadata() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let table_metadata_manager = TableMetadataManager::new(mem_kv);
let table_info: RawTableInfo = new_test_table_info().into();
let region_route = new_test_region_route();
let region_routes = vec![region_route.clone()];
// creates metadata.
table_metadata_manager
.create_table_metadata(table_info.clone(), region_routes.clone())
.await
.unwrap();
// if metadata was already created, it should be ok.
table_metadata_manager
.create_table_metadata(table_info.clone(), region_routes.clone())
.await
.unwrap();
let mut modified_region_routes = region_routes.clone();
modified_region_routes.push(region_route);
// if remote metadata was exists, it should return an error.
assert!(table_metadata_manager
.create_table_metadata(table_info, modified_region_routes)
.await
.is_err());
}
#[tokio::test]
async fn test_delete_table_metadata() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let table_metadata_manager = TableMetadataManager::new(mem_kv);
let table_info: RawTableInfo = new_test_table_info().into();
let region_route = new_test_region_route();
let region_routes = vec![region_route.clone()];
// creates metadata.
table_metadata_manager
.create_table_metadata(table_info.clone(), region_routes.clone())
.await
.unwrap();
let table_info_value = TableInfoValue::new(table_info);
// deletes metadata.
table_metadata_manager
.delete_table_metadata(table_info_value.clone(), region_routes.clone())
.await
.unwrap();
// if metadata was already deleted, it should be ok.
table_metadata_manager
.delete_table_metadata(table_info_value.clone(), region_routes.clone())
.await
.unwrap();
}
#[tokio::test]
async fn test_rename_table() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let table_metadata_manager = TableMetadataManager::new(mem_kv);
let table_info: RawTableInfo = new_test_table_info().into();
let region_route = new_test_region_route();
let region_routes = vec![region_route.clone()];
// creates metadata.
table_metadata_manager
.create_table_metadata(table_info.clone(), region_routes.clone())
.await
.unwrap();
let new_table_name = "another_name".to_string();
let table_info_value = TableInfoValue::new(table_info.clone());
table_metadata_manager
.rename_table(table_info_value.clone(), new_table_name.clone())
.await
.unwrap();
// if remote metadata was updated, it should be ok.
table_metadata_manager
.rename_table(table_info_value.clone(), new_table_name.clone())
.await
.unwrap();
let mut modified_table_info = table_info;
modified_table_info.name = "hi".to_string();
let modified_table_info_value = table_info_value.update(modified_table_info);
// if the table_info_value is wrong, it should return an error.
// The ABA problem.
assert!(table_metadata_manager
.rename_table(modified_table_info_value.clone(), new_table_name.clone())
.await
.is_err())
}
#[tokio::test]
async fn test_update_table_info() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let table_metadata_manager = TableMetadataManager::new(mem_kv);
let table_info: RawTableInfo = new_test_table_info().into();
let region_route = new_test_region_route();
let region_routes = vec![region_route.clone()];
// creates metadata.
table_metadata_manager
.create_table_metadata(table_info.clone(), region_routes.clone())
.await
.unwrap();
let mut new_table_info = table_info.clone();
new_table_info.name = "hi".to_string();
let current_table_info_value = TableInfoValue::new(table_info.clone());
// should be ok.
table_metadata_manager
.update_table_info(current_table_info_value.clone(), new_table_info.clone())
.await
.unwrap();
// if table info was updated, it should be ok.
table_metadata_manager
.update_table_info(current_table_info_value.clone(), new_table_info.clone())
.await
.unwrap();
let mut wrong_table_info = table_info.clone();
wrong_table_info.name = "wrong".to_string();
let wrong_table_info_value = current_table_info_value.update(wrong_table_info);
// if the current_table_info_value is wrong, it should return an error.
// The ABA problem.
assert!(table_metadata_manager
.update_table_info(wrong_table_info_value, new_table_info)
.await
.is_err())
}
async fn assert_datanode_table(
table_metadata_manager: &TableMetadataManager,
table_id: u32,
region_routes: &[RegionRoute],
) {
let region_distribution = region_distribution(region_routes).unwrap();
for (datanode, regions) in region_distribution {
let got = table_metadata_manager
.datanode_table_manager()
.get(&DatanodeTableKey::new(datanode, table_id))
.await
.unwrap()
.unwrap();
assert_eq!(got.regions, regions)
}
}
#[tokio::test]
async fn test_update_table_route() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let table_metadata_manager = TableMetadataManager::new(mem_kv);
let table_info: RawTableInfo = new_test_table_info().into();
let region_route = new_test_region_route();
let region_routes = vec![region_route.clone()];
let table_id = table_info.ident.table_id;
let current_table_route_value = TableRouteValue::new(region_routes.clone());
// creates metadata.
table_metadata_manager
.create_table_metadata(table_info.clone(), region_routes.clone())
.await
.unwrap();
assert_datanode_table(&table_metadata_manager, table_id, &region_routes).await;
let new_region_routes = vec![
new_region_route(1, 1),
new_region_route(2, 2),
new_region_route(3, 3),
];
// it should be ok.
table_metadata_manager
.update_table_route(
table_id,
current_table_route_value.clone(),
new_region_routes.clone(),
)
.await
.unwrap();
assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
// if the table route was updated. it should be ok.
table_metadata_manager
.update_table_route(
table_id,
current_table_route_value.clone(),
new_region_routes.clone(),
)
.await
.unwrap();
let current_table_route_value = current_table_route_value.update(new_region_routes.clone());
let new_region_routes = vec![new_region_route(4, 4), new_region_route(5, 5)];
// it should be ok.
table_metadata_manager
.update_table_route(
table_id,
current_table_route_value.clone(),
new_region_routes.clone(),
)
.await
.unwrap();
assert_datanode_table(&table_metadata_manager, table_id, &new_region_routes).await;
// if the current_table_route_value is wrong, it should return an error.
// The ABA problem.
let wrong_table_route_value = current_table_route_value.update(vec![
new_region_route(1, 1),
new_region_route(2, 2),
new_region_route(3, 3),
new_region_route(4, 4),
]);
assert!(table_metadata_manager
.update_table_route(table_id, wrong_table_route_value, new_region_routes)
.await
.is_err());
}
}

View File

@@ -17,6 +17,7 @@ use snafu::{ensure, OptionExt};
use store_api::storage::RegionNumber;
use table::metadata::TableId;
use super::table_region::RegionDistribution;
use super::{DATANODE_TABLE_KEY_PATTERN, DATANODE_TABLE_KEY_PREFIX};
use crate::error::{InvalidTableMetadataSnafu, MoveRegionSnafu, Result, UnexpectedSnafu};
use crate::key::{to_removed_key, TableMetaKey};
@@ -105,6 +106,88 @@ impl DatanodeTableManager {
.transpose()
}
/// Builds the create datanode table transactions. It only executes while the primary keys comparing successes.
pub fn build_create_txn(
&self,
table_id: TableId,
distribution: RegionDistribution,
) -> Result<Txn> {
let txns = distribution
.into_iter()
.map(|(datanode_id, regions)| {
let key = DatanodeTableKey::new(datanode_id, table_id);
let val = DatanodeTableValue::new(table_id, regions);
Ok(TxnOp::Put(key.as_raw_key(), val.try_as_raw_value()?))
})
.collect::<Result<Vec<_>>>()?;
let txn = Txn::default().and_then(txns);
Ok(txn)
}
/// Builds the update datanode table transactions. It only executes while the primary keys comparing successes.
pub(crate) fn build_update_txn(
&self,
table_id: TableId,
current_region_distribution: RegionDistribution,
new_region_distribution: RegionDistribution,
) -> Result<Txn> {
let mut opts = Vec::new();
// Removes the old datanode table key value pairs
for current_datanode in current_region_distribution.keys() {
if !new_region_distribution.contains_key(current_datanode) {
let key = DatanodeTableKey::new(*current_datanode, table_id);
let raw_key = key.as_raw_key();
opts.push(TxnOp::Delete(raw_key))
}
}
for (datanode, regions) in new_region_distribution.into_iter() {
if let Some(current_region) = current_region_distribution.get(&datanode) {
// Updates if need.
if *current_region != regions {
let key = DatanodeTableKey::new(datanode, table_id);
let raw_key = key.as_raw_key();
let val = DatanodeTableValue::new(table_id, regions).try_as_raw_value()?;
opts.push(TxnOp::Put(raw_key, val));
}
} else {
// New datanodes
let key = DatanodeTableKey::new(datanode, table_id);
let raw_key = key.as_raw_key();
let val = DatanodeTableValue::new(table_id, regions).try_as_raw_value()?;
opts.push(TxnOp::Put(raw_key, val));
}
}
let txn = Txn::default().and_then(opts);
Ok(txn)
}
/// Builds the delete datanode table transactions. It only executes while the primary keys comparing successes.
pub fn build_delete_txn(
&self,
table_id: TableId,
distribution: RegionDistribution,
) -> Result<Txn> {
let txns = distribution
.into_keys()
.map(|datanode_id| {
let key = DatanodeTableKey::new(datanode_id, table_id);
let raw_key = key.as_raw_key();
Ok(TxnOp::Delete(raw_key))
})
.collect::<Result<Vec<_>>>()?;
let txn = Txn::default().and_then(txns);
Ok(txn)
}
/// Create DatanodeTable key and value. If the key already exists, check if the value is the same.
pub async fn create(
&self,

View File

@@ -19,6 +19,7 @@ use table::metadata::{RawTableInfo, TableId};
use super::TABLE_INFO_KEY_PREFIX;
use crate::error::{Result, UnexpectedSnafu};
use crate::key::{to_removed_key, TableMetaKey};
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::{CompareAndPutRequest, MoveValueRequest};
@@ -51,6 +52,13 @@ impl TableInfoValue {
version: 0,
}
}
pub(crate) fn update(&self, new_table_info: RawTableInfo) -> Self {
Self {
table_info: new_table_info,
version: self.version + 1,
}
}
}
pub struct TableInfoManager {
@@ -62,6 +70,100 @@ impl TableInfoManager {
Self { kv_backend }
}
/// Builds a create table info transaction, it expected the `__table_info/{table_id}` wasn't occupied.
pub(crate) fn build_create_txn(
&self,
table_id: TableId,
table_info_value: &TableInfoValue,
) -> Result<(
Txn,
impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<TableInfoValue>>,
)> {
let key = TableInfoKey::new(table_id);
let raw_key = key.as_raw_key();
let txn = Txn::default()
.when(vec![Compare::with_not_exist_value(
raw_key.clone(),
CompareOp::Equal,
)])
.and_then(vec![TxnOp::Put(
raw_key.clone(),
table_info_value.try_as_raw_value()?,
)])
.or_else(vec![TxnOp::Get(raw_key.clone())]);
Ok((txn, Self::build_decode_fn(raw_key)))
}
/// Builds a update table info transaction, it expected the remote value equals the `current_current_table_info_value`.
/// It retrieves the latest value if the comparing failed.
pub(crate) fn build_update_txn(
&self,
table_id: TableId,
current_table_info_value: &TableInfoValue,
new_table_info_value: &TableInfoValue,
) -> Result<(
Txn,
impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<TableInfoValue>>,
)> {
let key = TableInfoKey::new(table_id);
let raw_key = key.as_raw_key();
let raw_value = current_table_info_value.try_as_raw_value()?;
let txn = Txn::default()
.when(vec![Compare::with_value(
raw_key.clone(),
CompareOp::Equal,
raw_value.clone(),
)])
.and_then(vec![TxnOp::Put(
raw_key.clone(),
new_table_info_value.try_as_raw_value()?,
)])
.or_else(vec![TxnOp::Get(raw_key.clone())]);
Ok((txn, Self::build_decode_fn(raw_key)))
}
/// Builds a delete table info transaction.
pub(crate) fn build_delete_txn(
&self,
table_id: TableId,
table_info_value: &TableInfoValue,
) -> Result<Txn> {
let key = TableInfoKey::new(table_id);
let raw_key = key.as_raw_key();
let raw_value = table_info_value.try_as_raw_value()?;
let removed_key = to_removed_key(&String::from_utf8_lossy(&raw_key));
let txn = Txn::default().and_then(vec![
TxnOp::Delete(raw_key.clone()),
TxnOp::Put(removed_key.into_bytes(), raw_value),
]);
Ok(txn)
}
fn build_decode_fn(
raw_key: Vec<u8>,
) -> impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<TableInfoValue>> {
move |kvs: &Vec<TxnOpResponse>| {
kvs.iter()
.filter_map(|resp| {
if let TxnOpResponse::ResponseGet(r) = resp {
Some(r)
} else {
None
}
})
.flat_map(|r| &r.kvs)
.find(|kv| kv.key == raw_key)
.map(|kv| TableInfoValue::try_from(&kv.value))
.transpose()
}
}
pub async fn get(&self, table_id: TableId) -> Result<Option<TableInfoValue>> {
let key = TableInfoKey::new(table_id);
let raw_key = key.as_raw_key();

View File

@@ -150,6 +150,59 @@ impl TableNameManager {
Self { kv_backend }
}
/// Builds a create table name transaction. It only executes while the primary keys comparing successes.
pub(crate) fn build_create_txn(
&self,
key: &TableNameKey<'_>,
table_id: TableId,
) -> Result<Txn> {
let raw_key = key.as_raw_key();
let value = TableNameValue::new(table_id);
let raw_value = value.try_as_raw_value()?;
let txn = Txn::default().and_then(vec![TxnOp::Put(raw_key, raw_value)]);
Ok(txn)
}
/// Builds a update table name transaction. It only executes while the primary keys comparing successes.
pub(crate) fn build_update_txn(
&self,
key: &TableNameKey<'_>,
new_key: &TableNameKey<'_>,
table_id: TableId,
) -> Result<Txn> {
let raw_key = key.as_raw_key();
let new_raw_key = new_key.as_raw_key();
let value = TableNameValue::new(table_id);
let raw_value = value.try_as_raw_value()?;
let txn = Txn::default().and_then(vec![
TxnOp::Delete(raw_key),
TxnOp::Put(new_raw_key, raw_value),
]);
Ok(txn)
}
/// Builds a delete table name transaction. It only executes while the primary keys comparing successes.
pub(crate) fn build_delete_txn(
&self,
key: &TableNameKey<'_>,
table_id: TableId,
) -> Result<Txn> {
let raw_key = key.as_raw_key();
let value = TableNameValue::new(table_id);
let raw_value = value.try_as_raw_value()?;
let removed_key = to_removed_key(&String::from_utf8_lossy(&raw_key));
let txn = Txn::default().and_then(vec![
TxnOp::Delete(raw_key),
TxnOp::Put(removed_key.into_bytes(), raw_value),
]);
Ok(txn)
}
/// Create TableName key and value. If the key already exists, check if the value is the same.
pub async fn create(&self, key: &TableNameKey<'_>, table_id: TableId) -> Result<()> {
let raw_key = key.as_raw_key();

View File

@@ -15,15 +15,19 @@
use std::fmt::Display;
use api::v1::meta::TableName;
use futures::future::try_join_all;
use serde::__private::de;
use serde::{Deserialize, Serialize};
use snafu::ensure;
use table::metadata::TableId;
use crate::error::{Result, UnexpectedSnafu};
use crate::key::{to_removed_key, TableMetaKey};
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse, TxnRequest};
use crate::kv_backend::KvBackendRef;
use crate::rpc::router::{RegionRoute, Table, TableRoute};
use crate::rpc::store::{CompareAndPutRequest, MoveValueRequest};
use crate::rpc::router::{region_distribution, RegionRoute, Table, TableRoute};
use crate::rpc::store::{BatchGetRequest, CompareAndPutRequest, MoveValueRequest};
use crate::rpc::KeyValue;
pub const TABLE_ROUTE_PREFIX: &str = "__meta_table_route";
@@ -53,6 +57,13 @@ impl TableRouteValue {
version: 0,
}
}
pub fn update(&self, region_routes: Vec<RegionRoute>) -> Self {
Self {
region_routes,
version: self.version + 1,
}
}
}
impl TableMetaKey for NextTableRouteKey {
@@ -76,6 +87,99 @@ impl TableRouteManager {
Self { kv_backend }
}
/// Builds a create table route transaction. it expected the `__table_route/{table_id}` wasn't occupied.
pub(crate) fn build_create_txn(
&self,
table_id: TableId,
table_route_value: &TableRouteValue,
) -> Result<(
Txn,
impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<TableRouteValue>>,
)> {
let key = NextTableRouteKey::new(table_id);
let raw_key = key.as_raw_key();
let txn = Txn::default()
.when(vec![Compare::with_not_exist_value(
raw_key.clone(),
CompareOp::Equal,
)])
.and_then(vec![TxnOp::Put(
raw_key.clone(),
table_route_value.try_as_raw_value()?,
)])
.or_else(vec![TxnOp::Get(raw_key.clone())]);
Ok((txn, Self::build_decode_fn(raw_key)))
}
/// Builds a update table route transaction, it expected the remote value equals the `current_table_route_value`.
/// It retrieves the latest value if the comparing failed.
pub(crate) fn build_update_txn(
&self,
table_id: TableId,
current_table_route_value: &TableRouteValue,
new_table_route_value: &TableRouteValue,
) -> Result<(
Txn,
impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<TableRouteValue>>,
)> {
let key = NextTableRouteKey::new(table_id);
let raw_key = key.as_raw_key();
let raw_value = current_table_route_value.try_as_raw_value()?;
let new_raw_value: Vec<u8> = new_table_route_value.try_as_raw_value()?;
let txn = Txn::default()
.when(vec![Compare::with_value(
raw_key.clone(),
CompareOp::Equal,
raw_value.clone(),
)])
.and_then(vec![TxnOp::Put(raw_key.clone(), new_raw_value)])
.or_else(vec![TxnOp::Get(raw_key.clone())]);
Ok((txn, Self::build_decode_fn(raw_key)))
}
/// Builds a delete table route transaction, it expected the remote value equals the `table_route_value`.
pub(crate) fn build_delete_txn(
&self,
table_id: TableId,
table_route_value: &TableRouteValue,
) -> Result<Txn> {
let key = NextTableRouteKey::new(table_id);
let raw_key = key.as_raw_key();
let raw_value = table_route_value.try_as_raw_value()?;
let removed_key = to_removed_key(&String::from_utf8_lossy(&raw_key));
let txn = Txn::default().and_then(vec![
TxnOp::Delete(raw_key),
TxnOp::Put(removed_key.into_bytes(), raw_value),
]);
Ok(txn)
}
fn build_decode_fn(
raw_key: Vec<u8>,
) -> impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<TableRouteValue>> {
move |response: &Vec<TxnOpResponse>| {
response
.iter()
.filter_map(|resp| {
if let TxnOpResponse::ResponseGet(r) = resp {
Some(r)
} else {
None
}
})
.flat_map(|r| &r.kvs)
.find(|kv| kv.key == raw_key)
.map(|kv| TableRouteValue::try_from(&kv.value))
.transpose()
}
}
pub async fn get(&self, table_id: TableId) -> Result<Option<TableRouteValue>> {
let key = NextTableRouteKey::new(table_id);
self.kv_backend

View File

@@ -100,6 +100,14 @@ pub struct TxnRequest {
pub failure: Vec<TxnOp>,
}
impl TxnRequest {
pub fn extend(&mut self, other: TxnRequest) {
self.compare.extend(other.compare);
self.success.extend(other.success);
self.failure.extend(other.failure);
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum TxnOpResponse {
ResponsePut(PutResponse),
@@ -121,6 +129,23 @@ pub struct Txn {
}
impl Txn {
pub fn merge_all<T: IntoIterator<Item = Txn>>(values: T) -> Self {
values
.into_iter()
.reduce(|acc, e| acc.merge(e))
.unwrap_or_default()
}
pub fn merge(mut self, other: Txn) -> Self {
self.c_when |= other.c_when;
self.c_then |= other.c_then;
self.c_else |= other.c_else;
self.req.extend(other.req);
self
}
pub fn new() -> Self {
Txn::default()
}

View File

@@ -26,6 +26,7 @@ use store_api::storage::{RegionId, RegionNumber};
use table::metadata::TableId;
use crate::error::{self, Result};
use crate::key::table_region::RegionDistribution;
use crate::peer::Peer;
use crate::rpc::util;
use crate::table_name::TableName;
@@ -73,6 +74,23 @@ impl TryFrom<PbRouteResponse> for RouteResponse {
}
}
pub(crate) fn region_distribution(region_routes: &[RegionRoute]) -> Result<RegionDistribution> {
let mut regions_id_map = RegionDistribution::new();
for route in region_routes.iter() {
let node_id = route
.leader_peer
.as_ref()
.context(error::UnexpectedSnafu {
err_msg: "leader not found",
})?
.id;
let region_id = route.region.id.region_number();
regions_id_map.entry(node_id).or_default().push(region_id);
}
Ok(regions_id_map)
}
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
pub struct TableRoute {
pub table: Table,