From b51089fa61854a9a802b2b286ef1ba6f7c82e2c5 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Tue, 9 Apr 2024 09:48:35 +0800 Subject: [PATCH] fix: `DeserializedValueWithBytes::from_inner` misusing (#3676) * fix: fix `DeserializedValueWithBytes::from_inner` misusing * Update src/common/meta/src/key.rs --------- Co-authored-by: tison --- .../meta/src/ddl/alter_logical_tables.rs | 5 +-- .../src/ddl/alter_logical_tables/metadata.rs | 13 ++++---- .../alter_logical_tables/update_metadata.rs | 14 ++++---- src/common/meta/src/ddl_manager.rs | 6 ++-- src/common/meta/src/key.rs | 6 ++-- src/common/meta/src/key/table_info.rs | 32 +++++++++++++++++++ src/common/meta/src/key/table_route.rs | 31 ++++++++++++++++++ 7 files changed, 84 insertions(+), 23 deletions(-) diff --git a/src/common/meta/src/ddl/alter_logical_tables.rs b/src/common/meta/src/ddl/alter_logical_tables.rs index a5a64b0efd..6819f18941 100644 --- a/src/common/meta/src/ddl/alter_logical_tables.rs +++ b/src/common/meta/src/ddl/alter_logical_tables.rs @@ -35,6 +35,7 @@ use crate::ddl::DdlContext; use crate::error::{DecodeJsonSnafu, Error, MetadataCorruptionSnafu, Result}; use crate::key::table_info::TableInfoValue; use crate::key::table_route::PhysicalTableRouteValue; +use crate::key::DeserializedValueWithBytes; use crate::lock_key::{CatalogLock, SchemaLock, TableLock}; use crate::rpc::ddl::AlterTableTask; use crate::rpc::router::find_leaders; @@ -245,10 +246,10 @@ pub struct AlterTablesData { tasks: Vec, /// Table info values before the alter operation. /// Corresponding one-to-one with the AlterTableTask in tasks. - table_info_values: Vec, + table_info_values: Vec>, /// Physical table info physical_table_id: TableId, - physical_table_info: Option, + physical_table_info: Option>, physical_table_route: Option, physical_columns: Vec, } diff --git a/src/common/meta/src/ddl/alter_logical_tables/metadata.rs b/src/common/meta/src/ddl/alter_logical_tables/metadata.rs index 60dde7a634..8734b4ef37 100644 --- a/src/common/meta/src/ddl/alter_logical_tables/metadata.rs +++ b/src/common/meta/src/ddl/alter_logical_tables/metadata.rs @@ -24,6 +24,7 @@ use crate::error::{ use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; use crate::key::table_route::TableRouteValue; +use crate::key::DeserializedValueWithBytes; use crate::rpc::ddl::AlterTableTask; impl AlterLogicalTablesProcedure { @@ -61,11 +62,9 @@ impl AlterLogicalTablesProcedure { .get_full_table_info(self.data.physical_table_id) .await?; - let physical_table_info = physical_table_info - .with_context(|| TableInfoNotFoundSnafu { - table: format!("table id - {}", self.data.physical_table_id), - })? - .into_inner(); + let physical_table_info = physical_table_info.with_context(|| TableInfoNotFoundSnafu { + table: format!("table id - {}", self.data.physical_table_id), + })?; let physical_table_route = physical_table_route .context(TableRouteNotFoundSnafu { table_id: self.data.physical_table_id, @@ -99,9 +98,9 @@ impl AlterLogicalTablesProcedure { async fn get_all_table_info_values( &self, table_ids: &[TableId], - ) -> Result> { + ) -> Result>> { let table_info_manager = self.context.table_metadata_manager.table_info_manager(); - let mut table_info_map = table_info_manager.batch_get(table_ids).await?; + let mut table_info_map = table_info_manager.batch_get_raw(table_ids).await?; let mut table_info_values = Vec::with_capacity(table_ids.len()); for (table_id, task) in table_ids.iter().zip(self.data.tasks.iter()) { let table_info_value = diff --git a/src/common/meta/src/ddl/alter_logical_tables/update_metadata.rs b/src/common/meta/src/ddl/alter_logical_tables/update_metadata.rs index 3c8623ea24..528261cbc7 100644 --- a/src/common/meta/src/ddl/alter_logical_tables/update_metadata.rs +++ b/src/common/meta/src/ddl/alter_logical_tables/update_metadata.rs @@ -33,6 +33,7 @@ impl AlterLogicalTablesProcedure { return Ok(()); } + // Safety: must exist. let physical_table_info = self.data.physical_table_info.as_ref().unwrap(); // Generates new table info @@ -45,10 +46,7 @@ impl AlterLogicalTablesProcedure { // Updates physical table's metadata self.context .table_metadata_manager - .update_table_info( - DeserializedValueWithBytes::from_inner(physical_table_info.clone()), - new_raw_table_info, - ) + .update_table_info(physical_table_info.clone(), new_raw_table_info) .await?; Ok(()) @@ -77,7 +75,9 @@ impl AlterLogicalTablesProcedure { Ok(()) } - pub(crate) fn build_update_metadata(&self) -> Result> { + pub(crate) fn build_update_metadata( + &self, + ) -> Result, RawTableInfo)>> { let mut table_info_values_to_update = Vec::with_capacity(self.data.tasks.len()); for (task, table) in self .data @@ -94,8 +94,8 @@ impl AlterLogicalTablesProcedure { fn build_new_table_info( &self, task: &AlterTableTask, - table: &TableInfoValue, - ) -> Result<(TableInfoValue, RawTableInfo)> { + table: &DeserializedValueWithBytes, + ) -> Result<(DeserializedValueWithBytes, RawTableInfo)> { // Builds new_meta let table_info = TableInfo::try_from(table.table_info.clone()) .context(error::ConvertRawTableInfoSnafu)?; diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 82411ad793..d45b1a4710 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -489,16 +489,14 @@ async fn handle_drop_table_task( .await?; let (_, table_route_value) = table_metadata_manager .table_route_manager() - .get_physical_table_route(table_id) + .table_route_storage() + .get_raw_physical_table_route(table_id) .await?; let table_info_value = table_info_value.with_context(|| TableInfoNotFoundSnafu { table: table_ref.to_string(), })?; - let table_route_value = - DeserializedValueWithBytes::from_inner(TableRouteValue::Physical(table_route_value)); - let (id, _) = ddl_manager .submit_drop_table_task( cluster_id, diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index ed03e9c7cf..ec6ba05585 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -283,7 +283,7 @@ impl DeserializedValueWithByte self.bytes.to_vec() } - /// Notes: used for test purpose. + #[cfg(any(test, feature = "testing"))] pub fn from_inner(inner: T) -> Self { let bytes = serde_json::to_vec(&inner).unwrap(); @@ -687,7 +687,7 @@ impl TableMetadataManager { pub async fn batch_update_table_info_values( &self, - table_info_value_pairs: Vec<(TableInfoValue, RawTableInfo)>, + table_info_value_pairs: Vec<(DeserializedValueWithBytes, RawTableInfo)>, ) -> Result<()> { let len = table_info_value_pairs.len(); let mut txns = Vec::with_capacity(len); @@ -708,7 +708,7 @@ impl TableMetadataManager { let (update_table_info_txn, on_update_table_info_failure) = self.table_info_manager().build_update_txn( table_id, - &DeserializedValueWithBytes::from_inner(table_info_value), + &table_info_value, &new_table_info_value, )?; diff --git a/src/common/meta/src/key/table_info.rs b/src/common/meta/src/key/table_info.rs index 495653dd67..a96142ded1 100644 --- a/src/common/meta/src/key/table_info.rs +++ b/src/common/meta/src/key/table_info.rs @@ -209,6 +209,38 @@ impl TableInfoManager { Ok(values) } + + /// Returns batch of `DeserializedValueWithBytes`. + pub async fn batch_get_raw( + &self, + table_ids: &[TableId], + ) -> Result>> { + let lookup_table = table_ids + .iter() + .map(|id| (TableInfoKey::new(*id).as_raw_key(), id)) + .collect::>(); + + let resp = self + .kv_backend + .batch_get(BatchGetRequest { + keys: lookup_table.keys().cloned().collect::>(), + }) + .await?; + + let values = resp + .kvs + .iter() + .map(|kv| { + Ok(( + // Safety: must exist. + **lookup_table.get(kv.key()).unwrap(), + DeserializedValueWithBytes::from_inner_slice(&kv.value)?, + )) + }) + .collect::>>()?; + + Ok(values) + } } #[cfg(test)] diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index 82a5a4e1f6..3a87f2eb9d 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -517,6 +517,37 @@ impl TableRouteStorage { .transpose() } + /// Returns the physical `DeserializedValueWithBytes` recursively. + /// + /// Returns a [TableRouteNotFound](crate::error::Error::TableRouteNotFound) Error if: + /// - the physical table(`logical_or_physical_table_id`) does not exist + /// - the corresponding physical table of the logical table(`logical_or_physical_table_id`) does not exist. + pub async fn get_raw_physical_table_route( + &self, + logical_or_physical_table_id: TableId, + ) -> Result<(TableId, DeserializedValueWithBytes)> { + let table_route = + self.get_raw(logical_or_physical_table_id) + .await? + .context(TableRouteNotFoundSnafu { + table_id: logical_or_physical_table_id, + })?; + + match table_route.get_inner_ref() { + TableRouteValue::Physical(_) => Ok((logical_or_physical_table_id, table_route)), + TableRouteValue::Logical(x) => { + let physical_table_id = x.physical_table_id(); + let physical_table_route = + self.get_raw(physical_table_id) + .await? + .context(TableRouteNotFoundSnafu { + table_id: physical_table_id, + })?; + Ok((physical_table_id, physical_table_route)) + } + } + } + /// Returns batch of [`TableRouteValue`] that respects the order of `table_ids`. pub async fn batch_get(&self, table_ids: &[TableId]) -> Result>> { let keys = table_ids