From aa569f7d6b263bbe428f2dd5ee56184c1c477f63 Mon Sep 17 00:00:00 2001 From: JeremyHi Date: Mon, 19 Feb 2024 14:51:34 +0800 Subject: [PATCH] feat: batch get physical table routes (#3319) * feat: batch get physical table routes * chore: by comment --- src/common/meta/src/error.rs | 6 ++- src/common/meta/src/key/table_route.rs | 64 +++++++++++++++++++++++++- 2 files changed, 67 insertions(+), 3 deletions(-) diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 383006e3cc..dc4c0cf51c 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -374,6 +374,9 @@ pub enum Error { #[snafu(display("The tasks of create tables cannot be empty"))] EmptyCreateTableTasks { location: Location }, + + #[snafu(display("Metadata corruption: {}", err_msg))] + MetadataCorruption { err_msg: String, location: Location }, } pub type Result = std::result::Result; @@ -417,7 +420,8 @@ impl ErrorExt for Error { | CreateKafkaWalTopic { .. } | EmptyTopicPool { .. } | UnexpectedLogicalRouteTable { .. } - | ProcedureOutput { .. } => StatusCode::Unexpected, + | ProcedureOutput { .. } + | MetadataCorruption { .. } => StatusCode::Unexpected, SendMessage { .. } | GetKvCache { .. } diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index 7886dc7997..713b1fa2bf 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fmt::Display; use serde::{Deserialize, Serialize}; @@ -22,7 +22,8 @@ use table::metadata::TableId; use super::{DeserializedValueWithBytes, TableMetaValue}; use crate::error::{ - Result, SerdeJsonSnafu, TableRouteNotFoundSnafu, UnexpectedLogicalRouteTableSnafu, + MetadataCorruptionSnafu, Result, SerdeJsonSnafu, TableRouteNotFoundSnafu, + UnexpectedLogicalRouteTableSnafu, }; use crate::key::{to_removed_key, RegionDistribution, TableMetaKey, TABLE_ROUTE_PREFIX}; use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse}; @@ -388,6 +389,65 @@ impl TableRouteManager { } } + pub async fn batch_get_physical_table_routes( + &self, + logical_or_physical_table_ids: &[TableId], + ) -> Result> { + let table_routes = self.batch_get(logical_or_physical_table_ids).await?; + + let mut physical_table_routes = HashMap::with_capacity(table_routes.len()); + let mut logical_table_ids = HashMap::with_capacity(table_routes.len()); + + for (table_id, table_route) in table_routes { + match table_route { + TableRouteValue::Physical(x) => { + physical_table_routes.insert(table_id, x); + } + TableRouteValue::Logical(x) => { + logical_table_ids.insert(table_id, x.physical_table_id()); + } + } + } + + if logical_table_ids.is_empty() { + return Ok(physical_table_routes); + } + + let physical_table_ids = logical_table_ids + .values() + .cloned() + .collect::>() + .into_iter() + .collect::>(); + let table_routes = self.batch_get(&physical_table_ids).await?; + + for (logical_table_id, physical_table_id) in logical_table_ids { + let table_route = + table_routes + .get(&physical_table_id) + .context(TableRouteNotFoundSnafu { + table_id: physical_table_id, + })?; + match table_route { + TableRouteValue::Physical(x) => { + physical_table_routes.insert(logical_table_id, x.clone()); + } + TableRouteValue::Logical(x) => { + // Never get here, because we use a physical table id cannot obtain a logical table. + MetadataCorruptionSnafu { + err_msg: format!( + "logical table {} {:?} cannot be resolved to a physical table.", + logical_table_id, x + ), + } + .fail()?; + } + } + } + + Ok(physical_table_routes) + } + /// It may return a subset of the `table_ids`. pub async fn batch_get( &self,