mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-04 12:22:55 +00:00
fix: DeserializedValueWithBytes::from_inner misusing (#3676)
* fix: fix `DeserializedValueWithBytes::from_inner` misusing * Update src/common/meta/src/key.rs --------- Co-authored-by: tison <wander4096@gmail.com>
This commit is contained in:
@@ -35,6 +35,7 @@ use crate::ddl::DdlContext;
|
||||
use crate::error::{DecodeJsonSnafu, Error, MetadataCorruptionSnafu, Result};
|
||||
use crate::key::table_info::TableInfoValue;
|
||||
use crate::key::table_route::PhysicalTableRouteValue;
|
||||
use crate::key::DeserializedValueWithBytes;
|
||||
use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
|
||||
use crate::rpc::ddl::AlterTableTask;
|
||||
use crate::rpc::router::find_leaders;
|
||||
@@ -245,10 +246,10 @@ pub struct AlterTablesData {
|
||||
tasks: Vec<AlterTableTask>,
|
||||
/// Table info values before the alter operation.
|
||||
/// Corresponding one-to-one with the AlterTableTask in tasks.
|
||||
table_info_values: Vec<TableInfoValue>,
|
||||
table_info_values: Vec<DeserializedValueWithBytes<TableInfoValue>>,
|
||||
/// Physical table info
|
||||
physical_table_id: TableId,
|
||||
physical_table_info: Option<TableInfoValue>,
|
||||
physical_table_info: Option<DeserializedValueWithBytes<TableInfoValue>>,
|
||||
physical_table_route: Option<PhysicalTableRouteValue>,
|
||||
physical_columns: Vec<ColumnMetadata>,
|
||||
}
|
||||
|
||||
@@ -24,6 +24,7 @@ use crate::error::{
|
||||
use crate::key::table_info::TableInfoValue;
|
||||
use crate::key::table_name::TableNameKey;
|
||||
use crate::key::table_route::TableRouteValue;
|
||||
use crate::key::DeserializedValueWithBytes;
|
||||
use crate::rpc::ddl::AlterTableTask;
|
||||
|
||||
impl AlterLogicalTablesProcedure {
|
||||
@@ -61,11 +62,9 @@ impl AlterLogicalTablesProcedure {
|
||||
.get_full_table_info(self.data.physical_table_id)
|
||||
.await?;
|
||||
|
||||
let physical_table_info = physical_table_info
|
||||
.with_context(|| TableInfoNotFoundSnafu {
|
||||
table: format!("table id - {}", self.data.physical_table_id),
|
||||
})?
|
||||
.into_inner();
|
||||
let physical_table_info = physical_table_info.with_context(|| TableInfoNotFoundSnafu {
|
||||
table: format!("table id - {}", self.data.physical_table_id),
|
||||
})?;
|
||||
let physical_table_route = physical_table_route
|
||||
.context(TableRouteNotFoundSnafu {
|
||||
table_id: self.data.physical_table_id,
|
||||
@@ -99,9 +98,9 @@ impl AlterLogicalTablesProcedure {
|
||||
async fn get_all_table_info_values(
|
||||
&self,
|
||||
table_ids: &[TableId],
|
||||
) -> Result<Vec<TableInfoValue>> {
|
||||
) -> Result<Vec<DeserializedValueWithBytes<TableInfoValue>>> {
|
||||
let table_info_manager = self.context.table_metadata_manager.table_info_manager();
|
||||
let mut table_info_map = table_info_manager.batch_get(table_ids).await?;
|
||||
let mut table_info_map = table_info_manager.batch_get_raw(table_ids).await?;
|
||||
let mut table_info_values = Vec::with_capacity(table_ids.len());
|
||||
for (table_id, task) in table_ids.iter().zip(self.data.tasks.iter()) {
|
||||
let table_info_value =
|
||||
|
||||
@@ -33,6 +33,7 @@ impl AlterLogicalTablesProcedure {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Safety: must exist.
|
||||
let physical_table_info = self.data.physical_table_info.as_ref().unwrap();
|
||||
|
||||
// Generates new table info
|
||||
@@ -45,10 +46,7 @@ impl AlterLogicalTablesProcedure {
|
||||
// Updates physical table's metadata
|
||||
self.context
|
||||
.table_metadata_manager
|
||||
.update_table_info(
|
||||
DeserializedValueWithBytes::from_inner(physical_table_info.clone()),
|
||||
new_raw_table_info,
|
||||
)
|
||||
.update_table_info(physical_table_info.clone(), new_raw_table_info)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
@@ -77,7 +75,9 @@ impl AlterLogicalTablesProcedure {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn build_update_metadata(&self) -> Result<Vec<(TableInfoValue, RawTableInfo)>> {
|
||||
pub(crate) fn build_update_metadata(
|
||||
&self,
|
||||
) -> Result<Vec<(DeserializedValueWithBytes<TableInfoValue>, RawTableInfo)>> {
|
||||
let mut table_info_values_to_update = Vec::with_capacity(self.data.tasks.len());
|
||||
for (task, table) in self
|
||||
.data
|
||||
@@ -94,8 +94,8 @@ impl AlterLogicalTablesProcedure {
|
||||
fn build_new_table_info(
|
||||
&self,
|
||||
task: &AlterTableTask,
|
||||
table: &TableInfoValue,
|
||||
) -> Result<(TableInfoValue, RawTableInfo)> {
|
||||
table: &DeserializedValueWithBytes<TableInfoValue>,
|
||||
) -> Result<(DeserializedValueWithBytes<TableInfoValue>, RawTableInfo)> {
|
||||
// Builds new_meta
|
||||
let table_info = TableInfo::try_from(table.table_info.clone())
|
||||
.context(error::ConvertRawTableInfoSnafu)?;
|
||||
|
||||
@@ -489,16 +489,14 @@ async fn handle_drop_table_task(
|
||||
.await?;
|
||||
let (_, table_route_value) = table_metadata_manager
|
||||
.table_route_manager()
|
||||
.get_physical_table_route(table_id)
|
||||
.table_route_storage()
|
||||
.get_raw_physical_table_route(table_id)
|
||||
.await?;
|
||||
|
||||
let table_info_value = table_info_value.with_context(|| TableInfoNotFoundSnafu {
|
||||
table: table_ref.to_string(),
|
||||
})?;
|
||||
|
||||
let table_route_value =
|
||||
DeserializedValueWithBytes::from_inner(TableRouteValue::Physical(table_route_value));
|
||||
|
||||
let (id, _) = ddl_manager
|
||||
.submit_drop_table_task(
|
||||
cluster_id,
|
||||
|
||||
@@ -283,7 +283,7 @@ impl<T: Serialize + DeserializeOwned + TableMetaValue> DeserializedValueWithByte
|
||||
self.bytes.to_vec()
|
||||
}
|
||||
|
||||
/// Notes: used for test purpose.
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
pub fn from_inner(inner: T) -> Self {
|
||||
let bytes = serde_json::to_vec(&inner).unwrap();
|
||||
|
||||
@@ -687,7 +687,7 @@ impl TableMetadataManager {
|
||||
|
||||
pub async fn batch_update_table_info_values(
|
||||
&self,
|
||||
table_info_value_pairs: Vec<(TableInfoValue, RawTableInfo)>,
|
||||
table_info_value_pairs: Vec<(DeserializedValueWithBytes<TableInfoValue>, RawTableInfo)>,
|
||||
) -> Result<()> {
|
||||
let len = table_info_value_pairs.len();
|
||||
let mut txns = Vec::with_capacity(len);
|
||||
@@ -708,7 +708,7 @@ impl TableMetadataManager {
|
||||
let (update_table_info_txn, on_update_table_info_failure) =
|
||||
self.table_info_manager().build_update_txn(
|
||||
table_id,
|
||||
&DeserializedValueWithBytes::from_inner(table_info_value),
|
||||
&table_info_value,
|
||||
&new_table_info_value,
|
||||
)?;
|
||||
|
||||
|
||||
@@ -209,6 +209,38 @@ impl TableInfoManager {
|
||||
|
||||
Ok(values)
|
||||
}
|
||||
|
||||
/// Returns batch of `DeserializedValueWithBytes<TableInfoValue>`.
|
||||
pub async fn batch_get_raw(
|
||||
&self,
|
||||
table_ids: &[TableId],
|
||||
) -> Result<HashMap<TableId, DeserializedValueWithBytes<TableInfoValue>>> {
|
||||
let lookup_table = table_ids
|
||||
.iter()
|
||||
.map(|id| (TableInfoKey::new(*id).as_raw_key(), id))
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
let resp = self
|
||||
.kv_backend
|
||||
.batch_get(BatchGetRequest {
|
||||
keys: lookup_table.keys().cloned().collect::<Vec<_>>(),
|
||||
})
|
||||
.await?;
|
||||
|
||||
let values = resp
|
||||
.kvs
|
||||
.iter()
|
||||
.map(|kv| {
|
||||
Ok((
|
||||
// Safety: must exist.
|
||||
**lookup_table.get(kv.key()).unwrap(),
|
||||
DeserializedValueWithBytes::from_inner_slice(&kv.value)?,
|
||||
))
|
||||
})
|
||||
.collect::<Result<HashMap<_, _>>>()?;
|
||||
|
||||
Ok(values)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -517,6 +517,37 @@ impl TableRouteStorage {
|
||||
.transpose()
|
||||
}
|
||||
|
||||
/// Returns the physical `DeserializedValueWithBytes<TableRouteValue>` recursively.
|
||||
///
|
||||
/// Returns a [TableRouteNotFound](crate::error::Error::TableRouteNotFound) Error if:
|
||||
/// - the physical table(`logical_or_physical_table_id`) does not exist
|
||||
/// - the corresponding physical table of the logical table(`logical_or_physical_table_id`) does not exist.
|
||||
pub async fn get_raw_physical_table_route(
|
||||
&self,
|
||||
logical_or_physical_table_id: TableId,
|
||||
) -> Result<(TableId, DeserializedValueWithBytes<TableRouteValue>)> {
|
||||
let table_route =
|
||||
self.get_raw(logical_or_physical_table_id)
|
||||
.await?
|
||||
.context(TableRouteNotFoundSnafu {
|
||||
table_id: logical_or_physical_table_id,
|
||||
})?;
|
||||
|
||||
match table_route.get_inner_ref() {
|
||||
TableRouteValue::Physical(_) => Ok((logical_or_physical_table_id, table_route)),
|
||||
TableRouteValue::Logical(x) => {
|
||||
let physical_table_id = x.physical_table_id();
|
||||
let physical_table_route =
|
||||
self.get_raw(physical_table_id)
|
||||
.await?
|
||||
.context(TableRouteNotFoundSnafu {
|
||||
table_id: physical_table_id,
|
||||
})?;
|
||||
Ok((physical_table_id, physical_table_route))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns batch of [`TableRouteValue`] that respects the order of `table_ids`.
|
||||
pub async fn batch_get(&self, table_ids: &[TableId]) -> Result<Vec<Option<TableRouteValue>>> {
|
||||
let keys = table_ids
|
||||
|
||||
Reference in New Issue
Block a user