refactor!: compare with origin bytes during the transactions (#2538)

* refactor: compare with origin bytes during the transaction

* refactor: use serialize_str instead

* Update src/common/meta/src/key.rs

Co-authored-by: JeremyHi <jiachun_feng@proton.me>

* chore: apply suggestions from CR

---------

Co-authored-by: JeremyHi <jiachun_feng@proton.me>
This commit is contained in:
Weny Xu
2023-10-09 17:17:19 +09:00
committed by GitHub
parent ce3c10a86e
commit 81ccb58fb4
17 changed files with 317 additions and 105 deletions

1
Cargo.lock generated
View File

@@ -1854,6 +1854,7 @@ dependencies = [
"async-stream",
"async-trait",
"base64 0.21.3",
"bytes",
"chrono",
"common-catalog",
"common-error",

View File

@@ -229,6 +229,7 @@ impl CatalogManager for KvBackendCatalogManager {
.get(table_id)
.await
.context(TableMetadataManagerSnafu)?
.map(|v| v.into_inner())
else {
return Ok(None);
};

View File

@@ -13,6 +13,7 @@ arrow-flight.workspace = true
async-stream.workspace = true
async-trait.workspace = true
base64 = "0.21"
bytes = "1.4"
common-catalog = { workspace = true }
common-error = { workspace = true }
common-grpc-expr.workspace = true

View File

@@ -45,6 +45,7 @@ use crate::error::{
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::key::DeserializedValueWithBytes;
use crate::metrics;
use crate::rpc::ddl::AlterTableTask;
use crate::rpc::router::{find_leader_regions, find_leaders};
@@ -63,7 +64,7 @@ impl AlterTableProcedure {
pub fn new(
cluster_id: u64,
task: AlterTableTask,
table_info_value: TableInfoValue,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
context: DdlContext,
) -> Result<Self> {
let alter_kind = task
@@ -191,7 +192,8 @@ impl AlterTableProcedure {
.await?
.with_context(|| TableRouteNotFoundSnafu {
table_name: table_ref.to_string(),
})?;
})?
.into_inner();
let leaders = find_leaders(&region_routes);
let mut alter_region_tasks = Vec::with_capacity(leaders.len());
@@ -413,7 +415,7 @@ pub struct AlterTableData {
state: AlterTableState,
task: AlterTableTask,
/// Table info value before alteration.
table_info_value: TableInfoValue,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
cluster_id: u64,
/// Next column id of the table if the task adds columns to the table.
next_column_id: Option<ColumnId>,
@@ -422,7 +424,7 @@ pub struct AlterTableData {
impl AlterTableData {
pub fn new(
task: AlterTableTask,
table_info_value: TableInfoValue,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
cluster_id: u64,
next_column_id: Option<ColumnId>,
) -> Self {

View File

@@ -39,6 +39,7 @@ use crate::error::{self, Result};
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::key::DeserializedValueWithBytes;
use crate::metrics;
use crate::rpc::ddl::DropTableTask;
use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute};
@@ -55,8 +56,8 @@ impl DropTableProcedure {
pub fn new(
cluster_id: u64,
task: DropTableTask,
table_route_value: TableRouteValue,
table_info_value: TableInfoValue,
table_route_value: DeserializedValueWithBytes<TableRouteValue>,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
context: DdlContext,
) -> Self {
Self {
@@ -231,16 +232,16 @@ pub struct DropTableData {
pub state: DropTableState,
pub cluster_id: u64,
pub task: DropTableTask,
pub table_route_value: TableRouteValue,
pub table_info_value: TableInfoValue,
pub table_route_value: DeserializedValueWithBytes<TableRouteValue>,
pub table_info_value: DeserializedValueWithBytes<TableInfoValue>,
}
impl DropTableData {
pub fn new(
cluster_id: u64,
task: DropTableTask,
table_route_value: TableRouteValue,
table_info_value: TableInfoValue,
table_route_value: DeserializedValueWithBytes<TableRouteValue>,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
) -> Self {
Self {
state: DropTableState::Prepare,

View File

@@ -35,6 +35,7 @@ use crate::ddl::DdlContext;
use crate::error::{Result, TableNotFoundSnafu};
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::DeserializedValueWithBytes;
use crate::metrics;
use crate::rpc::ddl::TruncateTableTask;
use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute};
@@ -90,7 +91,7 @@ impl TruncateTableProcedure {
pub(crate) fn new(
cluster_id: u64,
task: TruncateTableTask,
table_info_value: TableInfoValue,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
region_routes: Vec<RegionRoute>,
context: DdlContext,
) -> Self {
@@ -188,7 +189,7 @@ pub struct TruncateTableData {
state: TruncateTableState,
cluster_id: u64,
task: TruncateTableTask,
table_info_value: TableInfoValue,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
region_routes: Vec<RegionRoute>,
}
@@ -196,7 +197,7 @@ impl TruncateTableData {
pub fn new(
cluster_id: u64,
task: TruncateTableTask,
table_info_value: TableInfoValue,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
region_routes: Vec<RegionRoute>,
) -> Self {
Self {

View File

@@ -35,7 +35,7 @@ use crate::error::{
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::key::TableMetadataManagerRef;
use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use crate::rpc::ddl::DdlTask::{AlterTable, CreateTable, DropTable, TruncateTable};
use crate::rpc::ddl::{
AlterTableTask, CreateTableTask, DropTableTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse,
@@ -144,7 +144,7 @@ impl DdlManager {
&self,
cluster_id: u64,
alter_table_task: AlterTableTask,
table_info_value: TableInfoValue,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
) -> Result<ProcedureId> {
let context = self.create_context();
@@ -176,8 +176,8 @@ impl DdlManager {
&self,
cluster_id: u64,
drop_table_task: DropTableTask,
table_info_value: TableInfoValue,
table_route_value: TableRouteValue,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
table_route_value: DeserializedValueWithBytes<TableRouteValue>,
) -> Result<ProcedureId> {
let context = self.create_context();
@@ -198,7 +198,7 @@ impl DdlManager {
&self,
cluster_id: u64,
truncate_table_task: TruncateTableTask,
table_info_value: TableInfoValue,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
region_routes: Vec<RegionRoute>,
) -> Result<ProcedureId> {
let context = self.create_context();
@@ -252,7 +252,7 @@ async fn handle_truncate_table_task(
table_name: table_ref.to_string(),
})?;
let table_route = table_route_value.region_routes;
let table_route = table_route_value.into_inner().region_routes;
let id = ddl_manager
.submit_truncate_table_task(

View File

@@ -56,12 +56,17 @@ pub mod table_region;
pub mod table_route;
use std::collections::{BTreeMap, HashMap};
use std::fmt::Debug;
use std::ops::Deref;
use std::sync::Arc;
use bytes::Bytes;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue};
use lazy_static::lazy_static;
use regex::Regex;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::RegionNumber;
use table::metadata::{RawTableInfo, TableId};
@@ -155,6 +160,116 @@ macro_rules! ensure_values {
};
}
/// A struct containing a deserialized value(`inner`) and an original bytes.
///
/// - Serialize behaviors:
///
/// The `inner` field will be ignored.
///
/// - Deserialize behaviors:
///
/// The `inner` field will be deserialized from the `bytes` field.
pub struct DeserializedValueWithBytes<T: DeserializeOwned + Serialize> {
// The original bytes of the inner.
bytes: Bytes,
// The value was deserialized from the original bytes.
inner: T,
}
impl<T: DeserializeOwned + Serialize> Deref for DeserializedValueWithBytes<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<T: DeserializeOwned + Serialize + Debug> Debug for DeserializedValueWithBytes<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"DeserializedValueWithBytes(inner: {:?}, bytes: {:?})",
self.inner, self.bytes
)
}
}
impl<T: DeserializeOwned + Serialize> Serialize for DeserializedValueWithBytes<T> {
/// - Serialize behaviors:
///
/// The `inner` field will be ignored.
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
// Safety: The original bytes are always JSON encoded.
// It's more efficiently than `serialize_bytes`.
serializer.serialize_str(&String::from_utf8_lossy(&self.bytes))
}
}
impl<'de, T: DeserializeOwned + Serialize> Deserialize<'de> for DeserializedValueWithBytes<T> {
/// - Deserialize behaviors:
///
/// The `inner` field will be deserialized from the `bytes` field.
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let buf = String::deserialize(deserializer)?;
let bytes = Bytes::from(buf);
let value = DeserializedValueWithBytes::from_inner_bytes(bytes)
.map_err(|err| serde::de::Error::custom(err.to_string()))?;
Ok(value)
}
}
impl<T: Serialize + DeserializeOwned + Clone> Clone for DeserializedValueWithBytes<T> {
fn clone(&self) -> Self {
Self {
bytes: self.bytes.clone(),
inner: self.inner.clone(),
}
}
}
impl<T: Serialize + DeserializeOwned> DeserializedValueWithBytes<T> {
/// Returns a struct containing a deserialized value and an original `bytes`.
/// It accepts original bytes of inner.
pub fn from_inner_bytes(bytes: Bytes) -> Result<Self> {
let inner = serde_json::from_slice(&bytes).context(error::SerdeJsonSnafu)?;
Ok(Self { bytes, inner })
}
/// Returns a struct containing a deserialized value and an original `bytes`.
/// It accepts original bytes of inner.
pub fn from_inner_slice(bytes: &[u8]) -> Result<Self> {
Self::from_inner_bytes(Bytes::copy_from_slice(bytes))
}
pub fn into_inner(self) -> T {
self.inner
}
/// Returns original `bytes`
pub fn into_bytes(&self) -> Vec<u8> {
self.bytes.to_vec()
}
#[cfg(feature = "testing")]
/// Notes: used for test purpose.
pub fn from_inner(inner: T) -> Self {
let bytes = serde_json::to_vec(&inner).unwrap();
Self {
bytes: Bytes::from(bytes),
inner,
}
}
}
impl TableMetadataManager {
pub fn new(kv_backend: KvBackendRef) -> Self {
TableMetadataManager {
@@ -212,7 +327,10 @@ impl TableMetadataManager {
pub async fn get_full_table_info(
&self,
table_id: TableId,
) -> Result<(Option<TableInfoValue>, Option<TableRouteValue>)> {
) -> Result<(
Option<DeserializedValueWithBytes<TableInfoValue>>,
Option<DeserializedValueWithBytes<TableRouteValue>>,
)> {
let (get_table_route_txn, table_route_decoder) =
self.table_route_manager.build_get_txn(table_id);
@@ -291,15 +409,17 @@ impl TableMetadataManager {
// Checks whether metadata was already created.
if !r.succeeded {
let remote_table_info =
on_create_table_info_failure(&r.responses)?.context(error::UnexpectedSnafu {
let remote_table_info = on_create_table_info_failure(&r.responses)?
.context(error::UnexpectedSnafu {
err_msg: "Reads the empty table info during the create table metadata",
})?;
})?
.into_inner();
let remote_table_route =
on_create_table_route_failure(&r.responses)?.context(error::UnexpectedSnafu {
let remote_table_route = on_create_table_route_failure(&r.responses)?
.context(error::UnexpectedSnafu {
err_msg: "Reads the empty table route during the create table metadata",
})?;
})?
.into_inner();
let op_name = "the creating table metadata";
ensure_values!(remote_table_info, table_info_value, op_name);
@@ -313,8 +433,8 @@ impl TableMetadataManager {
/// The caller MUST ensure it has the exclusive access to `TableNameKey`.
pub async fn delete_table_metadata(
&self,
table_info_value: &TableInfoValue,
table_route_value: &TableRouteValue,
table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
) -> Result<()> {
let table_info = &table_info_value.table_info;
let table_id = table_info.ident.table_id;
@@ -364,7 +484,7 @@ impl TableMetadataManager {
/// and the new `TableNameKey` MUST be empty.
pub async fn rename_table(
&self,
current_table_info_value: TableInfoValue,
current_table_info_value: DeserializedValueWithBytes<TableInfoValue>,
new_table_name: String,
) -> Result<()> {
let current_table_info = &current_table_info_value.table_info;
@@ -389,7 +509,9 @@ impl TableMetadataManager {
table_id,
)?;
let new_table_info_value = current_table_info_value.with_update(move |table_info| {
let new_table_info_value = current_table_info_value
.inner
.with_update(move |table_info| {
table_info.name = new_table_name;
});
@@ -404,10 +526,11 @@ impl TableMetadataManager {
// Checks whether metadata was already updated.
if !r.succeeded {
let remote_table_info =
on_update_table_info_failure(&r.responses)?.context(error::UnexpectedSnafu {
let remote_table_info = on_update_table_info_failure(&r.responses)?
.context(error::UnexpectedSnafu {
err_msg: "Reads the empty table info during the rename table metadata",
})?;
})?
.into_inner();
let op_name = "the renaming table metadata";
ensure_values!(remote_table_info, new_table_info_value, op_name);
@@ -419,7 +542,7 @@ impl TableMetadataManager {
/// Updates table info and returns an error if different metadata exists.
pub async fn update_table_info(
&self,
current_table_info_value: TableInfoValue,
current_table_info_value: DeserializedValueWithBytes<TableInfoValue>,
new_table_info: RawTableInfo,
) -> Result<()> {
let table_id = current_table_info_value.table_info.ident.table_id;
@@ -435,10 +558,11 @@ impl TableMetadataManager {
// Checks whether metadata was already updated.
if !r.succeeded {
let remote_table_info =
on_update_table_info_failure(&r.responses)?.context(error::UnexpectedSnafu {
let remote_table_info = on_update_table_info_failure(&r.responses)?
.context(error::UnexpectedSnafu {
err_msg: "Reads the empty table info during the updating table info",
})?;
})?
.into_inner();
let op_name = "the updating table info";
ensure_values!(remote_table_info, new_table_info_value, op_name);
@@ -450,7 +574,7 @@ impl TableMetadataManager {
&self,
table_id: TableId,
region_info: RegionInfo,
current_table_route_value: TableRouteValue,
current_table_route_value: DeserializedValueWithBytes<TableRouteValue>,
new_region_routes: Vec<RegionRoute>,
new_region_options: &HashMap<String, String>,
) -> Result<()> {
@@ -480,10 +604,11 @@ impl TableMetadataManager {
// Checks whether metadata was already updated.
if !r.succeeded {
let remote_table_route =
on_update_table_route_failure(&r.responses)?.context(error::UnexpectedSnafu {
let remote_table_route = on_update_table_route_failure(&r.responses)?
.context(error::UnexpectedSnafu {
err_msg: "Reads the empty table route during the updating table route",
})?;
})?
.into_inner();
let op_name = "the updating table route";
ensure_values!(remote_table_route, new_table_route_value, op_name);
@@ -559,6 +684,7 @@ mod tests {
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use bytes::Bytes;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaBuilder};
use futures::TryStreamExt;
@@ -570,11 +696,39 @@ mod tests {
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::key::{to_removed_key, TableMetadataManager};
use crate::key::{to_removed_key, DeserializedValueWithBytes, TableMetadataManager};
use crate::kv_backend::memory::MemoryKvBackend;
use crate::peer::Peer;
use crate::rpc::router::{region_distribution, Region, RegionRoute};
#[test]
fn test_deserialized_value_with_bytes() {
let region_route = new_test_region_route();
let region_routes = vec![region_route.clone()];
let expected_region_routes =
TableRouteValue::new(vec![region_route.clone(), region_route.clone()]);
let expected = serde_json::to_vec(&expected_region_routes).unwrap();
// Serialize behaviors:
// The inner field will be ignored.
let value = DeserializedValueWithBytes {
// ignored
inner: TableRouteValue::new(region_routes.clone()),
bytes: Bytes::from(expected.clone()),
};
let encoded = serde_json::to_vec(&value).unwrap();
// Deserialize behaviors:
// The inner field will be deserialized from the bytes field.
let decoded: DeserializedValueWithBytes<TableRouteValue> =
serde_json::from_slice(&encoded).unwrap();
assert_eq!(decoded.inner, expected_region_routes);
assert_eq!(decoded.bytes, expected);
}
#[test]
fn test_to_removed_key() {
let key = "test_key";
@@ -664,8 +818,14 @@ mod tests {
.await
.unwrap();
assert_eq!(remote_table_info.unwrap().table_info, table_info);
assert_eq!(remote_table_route.unwrap().region_routes, region_routes);
assert_eq!(
remote_table_info.unwrap().into_inner().table_info,
table_info
);
assert_eq!(
remote_table_route.unwrap().into_inner().region_routes,
region_routes
);
}
#[tokio::test]
@@ -678,7 +838,8 @@ mod tests {
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
let table_id = table_info.ident.table_id;
let datanode_id = 2;
let table_route_value = TableRouteValue::new(region_routes.clone());
let table_route_value =
DeserializedValueWithBytes::from_inner(TableRouteValue::new(region_routes.clone()));
// creates metadata.
table_metadata_manager
@@ -686,7 +847,8 @@ mod tests {
.await
.unwrap();
let table_info_value = TableInfoValue::new(table_info.clone());
let table_info_value =
DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
// deletes metadata.
table_metadata_manager
@@ -727,7 +889,8 @@ mod tests {
.get_removed(table_id)
.await
.unwrap()
.unwrap();
.unwrap()
.into_inner();
assert_eq!(removed_table_info.table_info, table_info);
let removed_table_route = table_metadata_manager
@@ -735,7 +898,8 @@ mod tests {
.get_removed(table_id)
.await
.unwrap()
.unwrap();
.unwrap()
.into_inner();
assert_eq!(removed_table_route.region_routes, region_routes);
}
@@ -754,7 +918,9 @@ mod tests {
.await
.unwrap();
let new_table_name = "another_name".to_string();
let table_info_value = TableInfoValue::new(table_info.clone());
let table_info_value =
DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
table_metadata_manager
.rename_table(table_info_value.clone(), new_table_name.clone())
.await
@@ -766,7 +932,8 @@ mod tests {
.unwrap();
let mut modified_table_info = table_info.clone();
modified_table_info.name = "hi".to_string();
let modified_table_info_value = table_info_value.update(modified_table_info);
let modified_table_info_value =
DeserializedValueWithBytes::from_inner(table_info_value.update(modified_table_info));
// if the table_info_value is wrong, it should return an error.
// The ABA problem.
assert!(table_metadata_manager
@@ -820,7 +987,8 @@ mod tests {
.unwrap();
let mut new_table_info = table_info.clone();
new_table_info.name = "hi".to_string();
let current_table_info_value = TableInfoValue::new(table_info.clone());
let current_table_info_value =
DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
// should be ok.
table_metadata_manager
.update_table_info(current_table_info_value.clone(), new_table_info.clone())
@@ -838,12 +1006,15 @@ mod tests {
.get(table_id)
.await
.unwrap()
.unwrap();
.unwrap()
.into_inner();
assert_eq!(updated_table_info.table_info, new_table_info);
let mut wrong_table_info = table_info.clone();
wrong_table_info.name = "wrong".to_string();
let wrong_table_info_value = current_table_info_value.update(wrong_table_info);
let wrong_table_info_value = DeserializedValueWithBytes::from_inner(
current_table_info_value.update(wrong_table_info),
);
// if the current_table_info_value is wrong, it should return an error.
// The ABA problem.
assert!(table_metadata_manager
@@ -882,7 +1053,8 @@ mod tests {
let engine = table_info.meta.engine.as_str();
let region_storage_path =
region_storage_path(&table_info.catalog_name, &table_info.schema_name);
let current_table_route_value = TableRouteValue::new(region_routes.clone());
let current_table_route_value =
DeserializedValueWithBytes::from_inner(TableRouteValue::new(region_routes.clone()));
// creates metadata.
table_metadata_manager
.create_table_metadata(table_info.clone(), region_routes.clone())
@@ -927,7 +1099,11 @@ mod tests {
.await
.unwrap();
let current_table_route_value = current_table_route_value.update(new_region_routes.clone());
let current_table_route_value = DeserializedValueWithBytes::from_inner(
current_table_route_value
.inner
.update(new_region_routes.clone()),
);
let new_region_routes = vec![new_region_route(2, 4), new_region_route(5, 5)];
// it should be ok.
table_metadata_manager
@@ -948,12 +1124,13 @@ mod tests {
// if the current_table_route_value is wrong, it should return an error.
// The ABA problem.
let wrong_table_route_value = current_table_route_value.update(vec![
let wrong_table_route_value =
DeserializedValueWithBytes::from_inner(current_table_route_value.update(vec![
new_region_route(1, 1),
new_region_route(2, 2),
new_region_route(3, 3),
new_region_route(4, 4),
]);
]));
assert!(table_metadata_manager
.update_table_route(
table_id,

View File

@@ -16,7 +16,7 @@ use serde::{Deserialize, Serialize};
use table::engine::TableReference;
use table::metadata::{RawTableInfo, TableId};
use super::TABLE_INFO_KEY_PREFIX;
use super::{DeserializedValueWithBytes, TABLE_INFO_KEY_PREFIX};
use crate::error::Result;
use crate::key::{to_removed_key, TableMetaKey};
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse};
@@ -103,7 +103,7 @@ impl TableInfoManager {
table_id: TableId,
) -> (
Txn,
impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<TableInfoValue>>,
impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<DeserializedValueWithBytes<TableInfoValue>>>,
) {
let key = TableInfoKey::new(table_id);
let raw_key = key.as_raw_key();
@@ -119,7 +119,7 @@ impl TableInfoManager {
table_info_value: &TableInfoValue,
) -> Result<(
Txn,
impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<TableInfoValue>>,
impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<DeserializedValueWithBytes<TableInfoValue>>>,
)> {
let key = TableInfoKey::new(table_id);
let raw_key = key.as_raw_key();
@@ -143,15 +143,15 @@ impl TableInfoManager {
pub(crate) fn build_update_txn(
&self,
table_id: TableId,
current_table_info_value: &TableInfoValue,
current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
new_table_info_value: &TableInfoValue,
) -> Result<(
Txn,
impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<TableInfoValue>>,
impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<DeserializedValueWithBytes<TableInfoValue>>>,
)> {
let key = TableInfoKey::new(table_id);
let raw_key = key.as_raw_key();
let raw_value = current_table_info_value.try_as_raw_value()?;
let raw_value = current_table_info_value.into_bytes();
let txn = Txn::new()
.when(vec![Compare::with_value(
@@ -172,11 +172,11 @@ impl TableInfoManager {
pub(crate) fn build_delete_txn(
&self,
table_id: TableId,
table_info_value: &TableInfoValue,
table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
) -> Result<Txn> {
let key = TableInfoKey::new(table_id);
let raw_key = key.as_raw_key();
let raw_value = table_info_value.try_as_raw_value()?;
let raw_value = table_info_value.into_bytes();
let removed_key = to_removed_key(&String::from_utf8_lossy(&raw_key));
let txn = Txn::new().and_then(vec![
@@ -189,7 +189,8 @@ impl TableInfoManager {
fn build_decode_fn(
raw_key: Vec<u8>,
) -> impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<TableInfoValue>> {
) -> impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<DeserializedValueWithBytes<TableInfoValue>>>
{
move |kvs: &Vec<TxnOpResponse>| {
kvs.iter()
.filter_map(|resp| {
@@ -201,29 +202,35 @@ impl TableInfoManager {
})
.flat_map(|r| &r.kvs)
.find(|kv| kv.key == raw_key)
.map(|kv| TableInfoValue::try_from_raw_value(&kv.value))
.map(|kv| DeserializedValueWithBytes::from_inner_slice(&kv.value))
.transpose()
}
}
#[cfg(test)]
pub async fn get_removed(&self, table_id: TableId) -> Result<Option<TableInfoValue>> {
pub async fn get_removed(
&self,
table_id: TableId,
) -> Result<Option<DeserializedValueWithBytes<TableInfoValue>>> {
let key = TableInfoKey::new(table_id).to_string();
let removed_key = to_removed_key(&key).into_bytes();
self.kv_backend
.get(&removed_key)
.await?
.map(|x| TableInfoValue::try_from_raw_value(&x.value))
.map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value))
.transpose()
}
pub async fn get(&self, table_id: TableId) -> Result<Option<TableInfoValue>> {
pub async fn get(
&self,
table_id: TableId,
) -> Result<Option<DeserializedValueWithBytes<TableInfoValue>>> {
let key = TableInfoKey::new(table_id);
let raw_key = key.as_raw_key();
self.kv_backend
.get(&raw_key)
.await?
.map(|x| TableInfoValue::try_from_raw_value(&x.value))
.map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value))
.transpose()
}
}

View File

@@ -17,6 +17,7 @@ use std::fmt::Display;
use serde::{Deserialize, Serialize};
use table::metadata::TableId;
use super::DeserializedValueWithBytes;
use crate::error::Result;
use crate::key::{to_removed_key, RegionDistribution, TableMetaKey, TABLE_ROUTE_PREFIX};
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse};
@@ -81,7 +82,7 @@ impl TableRouteManager {
table_id: TableId,
) -> (
Txn,
impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<TableRouteValue>>,
impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>>,
) {
let key = TableRouteKey::new(table_id);
let raw_key = key.as_raw_key();
@@ -97,7 +98,7 @@ impl TableRouteManager {
table_route_value: &TableRouteValue,
) -> Result<(
Txn,
impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<TableRouteValue>>,
impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>>,
)> {
let key = TableRouteKey::new(table_id);
let raw_key = key.as_raw_key();
@@ -121,15 +122,15 @@ impl TableRouteManager {
pub(crate) fn build_update_txn(
&self,
table_id: TableId,
current_table_route_value: &TableRouteValue,
current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
new_table_route_value: &TableRouteValue,
) -> Result<(
Txn,
impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<TableRouteValue>>,
impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>>,
)> {
let key = TableRouteKey::new(table_id);
let raw_key = key.as_raw_key();
let raw_value = current_table_route_value.try_as_raw_value()?;
let raw_value = current_table_route_value.into_bytes();
let new_raw_value: Vec<u8> = new_table_route_value.try_as_raw_value()?;
let txn = Txn::new()
@@ -148,11 +149,11 @@ impl TableRouteManager {
pub(crate) fn build_delete_txn(
&self,
table_id: TableId,
table_route_value: &TableRouteValue,
table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
) -> Result<Txn> {
let key = TableRouteKey::new(table_id);
let raw_key = key.as_raw_key();
let raw_value = table_route_value.try_as_raw_value()?;
let raw_value = table_route_value.into_bytes();
let removed_key = to_removed_key(&String::from_utf8_lossy(&raw_key));
let txn = Txn::new().and_then(vec![
@@ -165,7 +166,8 @@ impl TableRouteManager {
fn build_decode_fn(
raw_key: Vec<u8>,
) -> impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<TableRouteValue>> {
) -> impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>>
{
move |response: &Vec<TxnOpResponse>| {
response
.iter()
@@ -178,28 +180,34 @@ impl TableRouteManager {
})
.flat_map(|r| &r.kvs)
.find(|kv| kv.key == raw_key)
.map(|kv| TableRouteValue::try_from_raw_value(&kv.value))
.map(|kv| DeserializedValueWithBytes::from_inner_slice(&kv.value))
.transpose()
}
}
pub async fn get(&self, table_id: TableId) -> Result<Option<TableRouteValue>> {
pub async fn get(
&self,
table_id: TableId,
) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>> {
let key = TableRouteKey::new(table_id);
self.kv_backend
.get(&key.as_raw_key())
.await?
.map(|kv| TableRouteValue::try_from_raw_value(&kv.value))
.map(|kv| DeserializedValueWithBytes::from_inner_slice(&kv.value))
.transpose()
}
#[cfg(test)]
pub async fn get_removed(&self, table_id: TableId) -> Result<Option<TableRouteValue>> {
pub async fn get_removed(
&self,
table_id: TableId,
) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>> {
let key = TableRouteKey::new(table_id).to_string();
let removed_key = to_removed_key(&key).into_bytes();
self.kv_backend
.get(&removed_key)
.await?
.map(|x| TableRouteValue::try_from_raw_value(&x.value))
.map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value))
.transpose()
}
@@ -209,7 +217,7 @@ impl TableRouteManager {
) -> Result<Option<RegionDistribution>> {
self.get(table_id)
.await?
.map(|table_route| region_distribution(&table_route.region_routes))
.map(|table_route| region_distribution(&table_route.into_inner().region_routes))
.transpose()
}
}

View File

@@ -70,6 +70,7 @@ impl ActivateRegion {
.await
.context(error::TableMetadataManagerSnafu)?
.context(error::TableInfoNotFoundSnafu { table_id })?
.into_inner()
.table_info;
let region_storage_path =

View File

@@ -220,6 +220,7 @@ mod tests {
.await
.unwrap()
.unwrap()
.into_inner()
.region_routes
}
@@ -373,7 +374,8 @@ mod tests {
.get(table_id)
.await
.unwrap()
.unwrap();
.unwrap()
.into_inner();
let peers = &extract_all_peers(&table_route_value.region_routes);
let actual = &table_route_value.region_routes;
@@ -392,7 +394,8 @@ mod tests {
.get(table_id)
.await
.unwrap()
.unwrap();
.unwrap()
.into_inner();
let map = region_distribution(&table_route_value.region_routes).unwrap();
assert_eq!(map.len(), 2);

View File

@@ -31,6 +31,7 @@ use common_meta::ddl::create_table::*;
use common_meta::ddl::drop_table::DropTableProcedure;
use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::DeserializedValueWithBytes;
use common_meta::rpc::ddl::{AlterTableTask, CreateTableTask, DropTableTask};
use common_meta::rpc::router::{find_leaders, RegionRoute};
use common_procedure::Status;
@@ -235,8 +236,8 @@ async fn test_on_datanode_drop_regions() {
let procedure = DropTableProcedure::new(
1,
drop_table_task,
TableRouteValue::new(region_routes),
TableInfoValue::new(test_data::new_table_info()),
DeserializedValueWithBytes::from_inner(TableRouteValue::new(region_routes)),
DeserializedValueWithBytes::from_inner(TableInfoValue::new(test_data::new_table_info())),
test_data::new_ddl_context(datanode_manager),
);
@@ -299,7 +300,7 @@ fn test_create_alter_region_request() {
let procedure = AlterTableProcedure::new(
1,
alter_table_task,
TableInfoValue::new(test_data::new_table_info()),
DeserializedValueWithBytes::from_inner(TableInfoValue::new(test_data::new_table_info())),
test_data::new_ddl_context(Arc::new(DatanodeClients::default())),
)
.unwrap();
@@ -364,7 +365,7 @@ async fn test_submit_alter_region_requests() {
let mut procedure = AlterTableProcedure::new(
1,
alter_table_task,
TableInfoValue::new(table_info),
DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info)),
context,
)
.unwrap();

View File

@@ -32,7 +32,9 @@ pub(crate) async fn fetch_table(
.context(TableMetadataManagerSnafu)?;
if let Some(table_info) = table_info {
let table_route = table_route.context(TableRouteNotFoundSnafu { table_id })?;
let table_route = table_route
.context(TableRouteNotFoundSnafu { table_id })?
.into_inner();
let table = Table {
id: table_id as u64,
@@ -44,7 +46,7 @@ pub(crate) async fn fetch_table(
.try_into()
.context(error::TableRouteConversionSnafu)?;
Ok(Some((table_info, table_route_value)))
Ok(Some((table_info.into_inner(), table_route_value)))
} else {
Ok(None)
}

View File

@@ -71,7 +71,8 @@ impl PartitionRuleManager {
.get(table_id)
.await
.context(error::TableRouteManagerSnafu)?
.context(error::FindTableRoutesSnafu { table_id })?;
.context(error::FindTableRoutesSnafu { table_id })?
.into_inner();
Ok(RegionRoutes(route.region_routes))
}
@@ -87,7 +88,8 @@ impl PartitionRuleManager {
.get(table_id)
.await
.context(error::TableRouteManagerSnafu)?
.context(error::FindTableRoutesSnafu { table_id })?;
.context(error::FindTableRoutesSnafu { table_id })?
.into_inner();
let mut datanodes = HashMap::with_capacity(regions.len());
let region_map = convert_to_region_map(&route.region_routes);
for region in regions.iter() {
@@ -110,7 +112,8 @@ impl PartitionRuleManager {
.get(table_id)
.await
.context(error::TableRouteManagerSnafu)?
.context(error::FindTableRoutesSnafu { table_id })?;
.context(error::FindTableRoutesSnafu { table_id })?
.into_inner();
let mut peers = Vec::with_capacity(route.region_routes.len());
for peer in &route.region_routes {
@@ -129,7 +132,8 @@ impl PartitionRuleManager {
.get(table_id)
.await
.context(error::TableRouteManagerSnafu)?
.context(error::FindTableRoutesSnafu { table_id })?;
.context(error::FindTableRoutesSnafu { table_id })?
.into_inner();
ensure!(
!route.region_routes.is_empty(),
error::FindTableRoutesSnafu { table_id }

View File

@@ -516,7 +516,8 @@ CREATE TABLE {table_name} (
.get(table_id)
.await
.unwrap()
.unwrap();
.unwrap()
.into_inner();
let region_to_dn_map = region_distribution(&table_route_value.region_routes)
.unwrap()

View File

@@ -213,7 +213,8 @@ mod tests {
.get(table_id)
.await
.unwrap()
.unwrap();
.unwrap()
.into_inner();
let region_to_dn_map = region_distribution(&table_route_value.region_routes)
.unwrap()