diff --git a/codecov.yml b/codecov.yml index 506f3445de..526ce56407 100644 --- a/codecov.yml +++ b/codecov.yml @@ -8,5 +8,6 @@ coverage: ignore: - "**/error*.rs" # ignore all error.rs files - "tests/runner/*.rs" # ignore integration test runner + - "tests-integration/**/*.rs" # ignore integration tests comment: # this is a top-level key layout: "diff" diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index 0d149c3af7..84f5ada3dd 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -206,12 +206,12 @@ impl CreateTableProcedure { .context .table_metadata_manager .table_route_manager() - .get(physical_table_id) + .try_get_physical_table_route(physical_table_id) .await? .context(TableRouteNotFoundSnafu { table_id: physical_table_id, })?; - let region_routes = physical_table_route.region_routes()?; + let region_routes = &physical_table_route.region_routes; let request_builder = self.new_region_request_builder(Some(physical_table_id))?; diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index c2007377e5..1d4ee73f9e 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -363,8 +363,10 @@ impl TableMetadataManager { Option>, Option>, )> { - let (get_table_route_txn, table_route_decoder) = - self.table_route_manager.build_get_txn(table_id); + let (get_table_route_txn, table_route_decoder) = self + .table_route_manager + .table_route_storage() + .build_get_txn(table_id); let (get_table_info_txn, table_info_decoder) = self.table_info_manager.build_get_txn(table_id); @@ -414,6 +416,7 @@ impl TableMetadataManager { let (create_table_route_txn, on_create_table_route_failure) = self .table_route_manager() + .table_route_storage() .build_create_txn(table_id, &table_route_value)?; let mut txn = Txn::merge_all(vec![ @@ -506,6 +509,7 @@ impl TableMetadataManager { let (create_table_route_txn, on_create_table_route_failure) = self .table_route_manager() + .table_route_storage() .build_create_txn(table_id, &table_route_value)?; txns.push(create_table_route_txn); @@ -579,6 +583,7 @@ impl TableMetadataManager { // Deletes table route. let delete_table_route_txn = self .table_route_manager() + .table_route_storage() .build_delete_txn(table_id, table_route_value)?; let txn = Txn::merge_all(vec![ @@ -713,6 +718,7 @@ impl TableMetadataManager { let (update_table_route_txn, on_update_table_route_failure) = self .table_route_manager() + .table_route_storage() .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?; let txn = Txn::merge_all(vec![update_datanode_table_txn, update_table_route_txn]); @@ -765,6 +771,7 @@ impl TableMetadataManager { let (update_table_route_txn, on_update_table_route_failure) = self .table_route_manager() + .table_route_storage() .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?; let r = self.kv_backend.txn(update_table_route_txn).await?; @@ -1096,6 +1103,7 @@ mod tests { assert!(table_metadata_manager .table_route_manager() + .table_route_storage() .get(table_id) .await .unwrap() @@ -1120,7 +1128,8 @@ mod tests { let removed_table_route = table_metadata_manager .table_route_manager() - .get_removed(table_id) + .table_route_storage() + .get_raw_removed(table_id) .await .unwrap() .unwrap() @@ -1316,6 +1325,7 @@ mod tests { let updated_route_value = table_metadata_manager .table_route_manager() + .table_route_storage() .get(table_id) .await .unwrap() diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index a932f14740..4cdf81140d 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -22,7 +22,7 @@ use table::metadata::TableId; use super::{txn_helper, DeserializedValueWithBytes, TableMetaValue}; use crate::error::{ - MetadataCorruptionSnafu, Result, SerdeJsonSnafu, TableRouteNotFoundSnafu, + self, MetadataCorruptionSnafu, Result, SerdeJsonSnafu, TableRouteNotFoundSnafu, UnexpectedLogicalRouteTableSnafu, }; use crate::key::{to_removed_key, RegionDistribution, TableMetaKey, TABLE_ROUTE_PREFIX}; @@ -77,7 +77,7 @@ impl TableRouteValue { err_msg: format!("{self:?} is a non-physical TableRouteValue."), } ); - let version = self.physical_table_route().version; + let version = self.as_physical_table_route_ref().version; Ok(Self::Physical(PhysicalTableRouteValue { region_routes, version: version + 1, @@ -95,7 +95,7 @@ impl TableRouteValue { err_msg: format!("{self:?} is a non-physical TableRouteValue."), } ); - Ok(self.physical_table_route().version) + Ok(self.as_physical_table_route_ref().version) } /// Returns the corresponding [RegionRoute], returns `None` if it's the specific region is not found. @@ -109,7 +109,7 @@ impl TableRouteValue { } ); Ok(self - .physical_table_route() + .as_physical_table_route_ref() .region_routes .iter() .find(|route| route.region.id == region_id) @@ -129,10 +129,25 @@ impl TableRouteValue { err_msg: format!("{self:?} is a non-physical TableRouteValue."), } ); - Ok(&self.physical_table_route().region_routes) + Ok(&self.as_physical_table_route_ref().region_routes) } - fn physical_table_route(&self) -> &PhysicalTableRouteValue { + /// Returns the reference of [`PhysicalTableRouteValue`]. + /// + /// # Panic + /// If it is not the [`PhysicalTableRouteValue`]. + fn as_physical_table_route_ref(&self) -> &PhysicalTableRouteValue { + match self { + TableRouteValue::Physical(x) => x, + _ => unreachable!("Mistakenly been treated as a Physical TableRoute: {self:?}"), + } + } + + /// Converts to [`PhysicalTableRouteValue`]. + /// + /// # Panic + /// If it is not the [`PhysicalTableRouteValue`]. + fn into_physical_table_route(self) -> PhysicalTableRouteValue { match self { TableRouteValue::Physical(x) => x, _ => unreachable!("Mistakenly been treated as a Physical TableRoute: {self:?}"), @@ -213,111 +228,53 @@ impl Display for TableRouteKey { } pub struct TableRouteManager { - kv_backend: KvBackendRef, + storage: TableRouteStorage, } impl TableRouteManager { pub fn new(kv_backend: KvBackendRef) -> Self { - Self { kv_backend } + Self { + storage: TableRouteStorage::new(kv_backend), + } } - pub(crate) fn build_get_txn( + /// Returns the [`PhysicalTableRouteValue`] in the first level, + /// It won't follow the [`LogicalTableRouteValue`] to find the next level [`PhysicalTableRouteValue`]. + /// + /// Returns an error if the first level value is not a [`PhysicalTableRouteValue`]. + pub async fn try_get_physical_table_route( &self, table_id: TableId, - ) -> ( - Txn, - impl FnOnce(&Vec) -> Result>>, - ) { - let key = TableRouteKey::new(table_id); - let raw_key = key.as_raw_key(); - let txn = Txn::new().and_then(vec![TxnOp::Get(raw_key.clone())]); - - (txn, txn_helper::build_txn_response_decoder_fn(raw_key)) - } - - /// Builds a create table route transaction. it expected the `__table_route/{table_id}` wasn't occupied. - pub fn build_create_txn( - &self, - table_id: TableId, - table_route_value: &TableRouteValue, - ) -> Result<( - Txn, - impl FnOnce(&Vec) -> Result>>, - )> { - let key = TableRouteKey::new(table_id); - let raw_key = key.as_raw_key(); - - let txn = txn_helper::build_put_if_absent_txn( - raw_key.clone(), - table_route_value.try_as_raw_value()?, - ); - - Ok((txn, txn_helper::build_txn_response_decoder_fn(raw_key))) - } - - /// Builds a update table route transaction, it expected the remote value equals the `current_table_route_value`. - /// It retrieves the latest value if the comparing failed. - pub(crate) fn build_update_txn( - &self, - table_id: TableId, - current_table_route_value: &DeserializedValueWithBytes, - new_table_route_value: &TableRouteValue, - ) -> Result<( - Txn, - impl FnOnce(&Vec) -> Result>>, - )> { - let key = TableRouteKey::new(table_id); - let raw_key = key.as_raw_key(); - let raw_value = current_table_route_value.get_raw_bytes(); - let new_raw_value: Vec = new_table_route_value.try_as_raw_value()?; - - let txn = txn_helper::build_compare_and_put_txn(raw_key.clone(), raw_value, new_raw_value); - - Ok((txn, txn_helper::build_txn_response_decoder_fn(raw_key))) - } - - /// Builds a delete table route transaction, it expected the remote value equals the `table_route_value`. - pub(crate) fn build_delete_txn( - &self, - table_id: TableId, - table_route_value: &DeserializedValueWithBytes, - ) -> Result { - let key = TableRouteKey::new(table_id); - let raw_key = key.as_raw_key(); - let raw_value = table_route_value.get_raw_bytes(); - let removed_key = to_removed_key(&String::from_utf8_lossy(&raw_key)); - - let txn = Txn::new().and_then(vec![ - TxnOp::Delete(raw_key), - TxnOp::Put(removed_key.into_bytes(), raw_value), - ]); - - Ok(txn) - } - - pub async fn get( - &self, - table_id: TableId, - ) -> Result>> { - let key = TableRouteKey::new(table_id); - self.kv_backend - .get(&key.as_raw_key()) - .await? - .map(|kv| DeserializedValueWithBytes::from_inner_slice(&kv.value)) - .transpose() + ) -> Result> { + match self.storage.get(table_id).await? { + Some(route) => { + ensure!( + route.is_physical(), + error::UnexpectedLogicalRouteTableSnafu { + err_msg: format!("{route:?} is a non-physical TableRouteValue.") + } + ); + Ok(Some(route.into_physical_table_route())) + } + None => Ok(None), + } } + /// Returns the [TableId] recursively. + /// + /// Returns a [TableRouteNotFound](crate::error::Error::TableRouteNotFound) Error if: + /// - the table(`logical_or_physical_table_id`) does not exist. pub async fn get_physical_table_id( &self, logical_or_physical_table_id: TableId, ) -> Result { let table_route = self + .storage .get(logical_or_physical_table_id) .await? .context(TableRouteNotFoundSnafu { table_id: logical_or_physical_table_id, - })? - .into_inner(); + })?; match table_route { TableRouteValue::Physical(_) => Ok(logical_or_physical_table_id), @@ -325,46 +282,58 @@ impl TableRouteManager { } } - /// Returns the [TableRouteValue::Physical] of table. + /// Returns the [TableRouteValue::Physical] recursively. /// /// Returns a [TableRouteNotFound](crate::error::Error::TableRouteNotFound) Error if: - /// - the physical table(`logical_or_physical_table_id`) does not exists - /// - the corresponding physical table of the logical table(`logical_or_physical_table_id`) does not exists. + /// - the physical table(`logical_or_physical_table_id`) does not exist + /// - the corresponding physical table of the logical table(`logical_or_physical_table_id`) does not exist. pub async fn get_physical_table_route( &self, logical_or_physical_table_id: TableId, ) -> Result<(TableId, PhysicalTableRouteValue)> { let table_route = self + .storage .get(logical_or_physical_table_id) .await? .context(TableRouteNotFoundSnafu { table_id: logical_or_physical_table_id, - })? - .into_inner(); + })?; match table_route { TableRouteValue::Physical(x) => Ok((logical_or_physical_table_id, x)), TableRouteValue::Logical(x) => { let physical_table_id = x.physical_table_id(); - let physical_table_route = - self.get(physical_table_id) - .await? - .context(TableRouteNotFoundSnafu { - table_id: physical_table_id, - })?; - Ok(( - physical_table_id, - physical_table_route.physical_table_route().clone(), - )) + let physical_table_route = self.storage.get(physical_table_id).await?.context( + TableRouteNotFoundSnafu { + table_id: physical_table_id, + }, + )?; + let physical_table_route = physical_table_route.into_physical_table_route(); + Ok((physical_table_id, physical_table_route)) } } } + /// Returns the [TableRouteValue::Physical] recursively. + /// + /// Returns a [TableRouteNotFound](crate::error::Error::TableRouteNotFound) Error if: + /// - one of the logical tables corresponding to the physical table does not exist. + /// + /// **Notes**: it may return a subset of `logical_or_physical_table_ids`. pub async fn batch_get_physical_table_routes( &self, logical_or_physical_table_ids: &[TableId], ) -> Result> { - let table_routes = self.batch_get(logical_or_physical_table_ids).await?; + let table_routes = self + .storage + .batch_get(logical_or_physical_table_ids) + .await?; + // Returns a subset of `logical_or_physical_table_ids`. + let table_routes = table_routes + .into_iter() + .zip(logical_or_physical_table_ids) + .filter_map(|(route, id)| route.map(|route| (*id, route))) + .collect::>(); let mut physical_table_routes = HashMap::with_capacity(table_routes.len()); let mut logical_table_ids = HashMap::with_capacity(table_routes.len()); @@ -384,13 +353,22 @@ impl TableRouteManager { return Ok(physical_table_routes); } + // Finds the logical tables corresponding to the physical tables. let physical_table_ids = logical_table_ids .values() .cloned() .collect::>() .into_iter() .collect::>(); - let table_routes = self.batch_get(&physical_table_ids).await?; + let table_routes = self + .table_route_storage() + .batch_get(&physical_table_ids) + .await?; + let table_routes = table_routes + .into_iter() + .zip(physical_table_ids) + .filter_map(|(route, id)| route.map(|route| (id, route))) + .collect::>(); for (logical_table_id, physical_table_id) in logical_table_ids { let table_route = @@ -419,40 +397,114 @@ impl TableRouteManager { Ok(physical_table_routes) } - /// It may return a subset of the `table_ids`. - pub async fn batch_get( + /// Returns [`RegionDistribution`] of the table(`table_id`). + pub async fn get_region_distribution( &self, - table_ids: &[TableId], - ) -> Result> { - let lookup_table = table_ids - .iter() - .map(|id| (TableRouteKey::new(*id).as_raw_key(), id)) - .collect::>(); + table_id: TableId, + ) -> Result> { + self.storage + .get(table_id) + .await? + .map(|table_route| Ok(region_distribution(table_route.region_routes()?))) + .transpose() + } - let resp = self - .kv_backend - .batch_get(BatchGetRequest { - keys: lookup_table.keys().cloned().collect::>(), - }) - .await?; + /// Returns low-level APIs. + pub fn table_route_storage(&self) -> &TableRouteStorage { + &self.storage + } +} - let values = resp - .kvs - .iter() - .map(|kv| { - Ok(( - // Safety: must exist. - **lookup_table.get(kv.key()).unwrap(), - TableRouteValue::try_from_raw_value(&kv.value)?, - )) - }) - .collect::>>()?; +/// Low-level operations of [TableRouteValue]. +pub struct TableRouteStorage { + kv_backend: KvBackendRef, +} - Ok(values) +impl TableRouteStorage { + pub fn new(kv_backend: KvBackendRef) -> Self { + Self { kv_backend } + } + + /// Builds a get table route transaction(readonly). + pub(crate) fn build_get_txn( + &self, + table_id: TableId, + ) -> ( + Txn, + impl FnOnce(&Vec) -> Result>>, + ) { + let key = TableRouteKey::new(table_id); + let raw_key = key.as_raw_key(); + let txn = Txn::new().and_then(vec![TxnOp::Get(raw_key.clone())]); + + (txn, txn_helper::build_txn_response_decoder_fn(raw_key)) + } + + /// Builds a create table route transaction, + /// it expected the `__table_route/{table_id}` wasn't occupied. + pub fn build_create_txn( + &self, + table_id: TableId, + table_route_value: &TableRouteValue, + ) -> Result<( + Txn, + impl FnOnce(&Vec) -> Result>>, + )> { + let key = TableRouteKey::new(table_id); + let raw_key = key.as_raw_key(); + + let txn = txn_helper::build_put_if_absent_txn( + raw_key.clone(), + table_route_value.try_as_raw_value()?, + ); + + Ok((txn, txn_helper::build_txn_response_decoder_fn(raw_key))) + } + + /// Builds a update table route transaction, + /// it expected the remote value equals the `current_table_route_value`. + /// It retrieves the latest value if the comparing failed. + pub(crate) fn build_update_txn( + &self, + table_id: TableId, + current_table_route_value: &DeserializedValueWithBytes, + new_table_route_value: &TableRouteValue, + ) -> Result<( + Txn, + impl FnOnce(&Vec) -> Result>>, + )> { + let key = TableRouteKey::new(table_id); + let raw_key = key.as_raw_key(); + let raw_value = current_table_route_value.get_raw_bytes(); + let new_raw_value: Vec = new_table_route_value.try_as_raw_value()?; + + let txn = txn_helper::build_compare_and_put_txn(raw_key.clone(), raw_value, new_raw_value); + + Ok((txn, txn_helper::build_txn_response_decoder_fn(raw_key))) + } + + /// Builds a delete table route transaction, + /// it expected the remote value equals the `table_route_value`. + pub(crate) fn build_delete_txn( + &self, + table_id: TableId, + table_route_value: &DeserializedValueWithBytes, + ) -> Result { + let key = TableRouteKey::new(table_id); + let raw_key = key.as_raw_key(); + let raw_value = table_route_value.get_raw_bytes(); + let removed_key = to_removed_key(&String::from_utf8_lossy(&raw_key)); + + let txn = Txn::new().and_then(vec![ + TxnOp::Delete(raw_key), + TxnOp::Put(removed_key.into_bytes(), raw_value), + ]); + + Ok(txn) } #[cfg(test)] - pub async fn get_removed( + pub async fn get_raw_removed( &self, table_id: TableId, ) -> Result>> { @@ -465,20 +517,64 @@ impl TableRouteManager { .transpose() } - pub async fn get_region_distribution( + /// Returns the [`TableRouteValue`]. + pub async fn get(&self, table_id: TableId) -> Result> { + let key = TableRouteKey::new(table_id); + self.kv_backend + .get(&key.as_raw_key()) + .await? + .map(|kv| TableRouteValue::try_from_raw_value(&kv.value)) + .transpose() + } + + /// Returns the [`TableRouteValue`] wrapped with [`DeserializedValueWithBytes`]. + pub async fn get_raw( &self, table_id: TableId, - ) -> Result> { - self.get(table_id) + ) -> Result>> { + let key = TableRouteKey::new(table_id); + self.kv_backend + .get(&key.as_raw_key()) .await? - .map(|table_route| Ok(region_distribution(table_route.region_routes()?))) + .map(|kv| DeserializedValueWithBytes::from_inner_slice(&kv.value)) .transpose() } + + /// Returns batch of [`TableRouteValue`] that respects the order of `table_ids`. + pub async fn batch_get(&self, table_ids: &[TableId]) -> Result>> { + let keys = table_ids + .iter() + .map(|id| TableRouteKey::new(*id).as_raw_key()) + .collect::>(); + let resp = self + .kv_backend + .batch_get(BatchGetRequest { keys: keys.clone() }) + .await?; + + let kvs = resp + .kvs + .into_iter() + .map(|kv| (kv.key, kv.value)) + .collect::>(); + keys.into_iter() + .map(|key| { + if let Some(value) = kvs.get(&key) { + Ok(Some(TableRouteValue::try_from_raw_value(value)?)) + } else { + Ok(None) + } + }) + .collect::>>() + } } #[cfg(test)] mod tests { + use std::sync::Arc; + use super::*; + use crate::kv_backend::memory::MemoryKvBackend; + use crate::kv_backend::TxnService; #[test] fn test_table_route_compatibility() { @@ -491,4 +587,81 @@ mod tests { r#"Physical(PhysicalTableRouteValue { region_routes: [RegionRoute { region: Region { id: 1(0, 1), name: "r1", partition: None, attrs: {} }, leader_peer: Some(Peer { id: 2, addr: "a2" }), follower_peers: [], leader_status: None, leader_down_since: None }, RegionRoute { region: Region { id: 1(0, 1), name: "r1", partition: None, attrs: {} }, leader_peer: Some(Peer { id: 2, addr: "a2" }), follower_peers: [], leader_status: None, leader_down_since: None }], version: 0 })"# ); } + + #[tokio::test] + async fn test_table_route_storage_get_raw_empty() { + let kv = Arc::new(MemoryKvBackend::default()); + let table_route_storage = TableRouteStorage::new(kv); + let table_route = table_route_storage.get_raw(1024).await.unwrap(); + assert!(table_route.is_none()); + } + + #[tokio::test] + async fn test_table_route_storage_get_raw() { + let kv = Arc::new(MemoryKvBackend::default()); + let table_route_storage = TableRouteStorage::new(kv.clone()); + let table_route = table_route_storage.get_raw(1024).await.unwrap(); + assert!(table_route.is_none()); + let table_route_manager = TableRouteManager::new(kv.clone()); + let table_route_value = TableRouteValue::Logical(LogicalTableRouteValue { + physical_table_id: 1023, + region_ids: vec![RegionId::new(1023, 1)], + }); + let (txn, _) = table_route_manager + .table_route_storage() + .build_create_txn(1024, &table_route_value) + .unwrap(); + let r = kv.txn(txn).await.unwrap(); + assert!(r.succeeded); + let table_route = table_route_storage.get_raw(1024).await.unwrap(); + assert!(table_route.is_some()); + let got = table_route.unwrap().inner; + assert_eq!(got, table_route_value); + } + + #[tokio::test] + async fn test_table_route_batch_get() { + let kv = Arc::new(MemoryKvBackend::default()); + let table_route_storage = TableRouteStorage::new(kv.clone()); + let routes = table_route_storage + .batch_get(&[1023, 1024, 1025]) + .await + .unwrap(); + + assert!(routes.iter().all(Option::is_none)); + let table_route_manager = TableRouteManager::new(kv.clone()); + let routes = [ + ( + 1024, + TableRouteValue::Logical(LogicalTableRouteValue { + physical_table_id: 1023, + region_ids: vec![RegionId::new(1023, 1)], + }), + ), + ( + 1025, + TableRouteValue::Logical(LogicalTableRouteValue { + physical_table_id: 1023, + region_ids: vec![RegionId::new(1023, 2)], + }), + ), + ]; + for (table_id, route) in &routes { + let (txn, _) = table_route_manager + .table_route_storage() + .build_create_txn(*table_id, route) + .unwrap(); + let r = kv.txn(txn).await.unwrap(); + assert!(r.succeeded); + } + + let results = table_route_storage + .batch_get(&[9999, 1025, 8888, 1024]) + .await + .unwrap(); + assert!(results[0].is_none()); + assert_eq!(results[1].as_ref().unwrap(), &routes[1].1); + assert!(results[2].is_none()); + assert_eq!(results[3].as_ref().unwrap(), &routes[0].1); + } } diff --git a/src/meta-srv/src/procedure/region_failover/deactivate_region.rs b/src/meta-srv/src/procedure/region_failover/deactivate_region.rs index 650c794126..d6e2c08894 100644 --- a/src/meta-srv/src/procedure/region_failover/deactivate_region.rs +++ b/src/meta-srv/src/procedure/region_failover/deactivate_region.rs @@ -52,7 +52,8 @@ impl DeactivateRegion { let table_route_value = ctx .table_metadata_manager .table_route_manager() - .get(table_id) + .table_route_storage() + .get_raw(table_id) .await .context(error::TableMetadataManagerSnafu)? .context(error::TableRouteNotFoundSnafu { table_id })?; @@ -201,6 +202,7 @@ mod tests { .context .table_metadata_manager .table_route_manager() + .table_route_storage() .get(table_id) .await .unwrap() diff --git a/src/meta-srv/src/procedure/region_failover/update_metadata.rs b/src/meta-srv/src/procedure/region_failover/update_metadata.rs index 542b02ca08..6302d20eee 100644 --- a/src/meta-srv/src/procedure/region_failover/update_metadata.rs +++ b/src/meta-srv/src/procedure/region_failover/update_metadata.rs @@ -82,7 +82,8 @@ impl UpdateRegionMetadata { let table_route_value = ctx .table_metadata_manager .table_route_manager() - .get(table_id) + .table_route_storage() + .get_raw(table_id) .await .context(error::TableMetadataManagerSnafu)? .context(TableRouteNotFoundSnafu { table_id })?; @@ -233,7 +234,8 @@ mod tests { env.context .table_metadata_manager .table_route_manager() - .get(table_id) + .table_route_storage() + .get_raw(table_id) .await .unwrap() .unwrap() @@ -396,11 +398,11 @@ mod tests { .context .table_metadata_manager .table_route_manager() + .table_route_storage() .get(table_id) .await .unwrap() - .unwrap() - .into_inner(); + .unwrap(); let peers = &extract_all_peers(table_route_value.region_routes().unwrap()); let actual = table_route_value.region_routes().unwrap(); @@ -416,11 +418,11 @@ mod tests { let manager = &env.context.table_metadata_manager; let table_route_value = manager .table_route_manager() + .table_route_storage() .get(table_id) .await .unwrap() - .unwrap() - .into_inner(); + .unwrap(); let map = region_distribution(table_route_value.region_routes().unwrap()); assert_eq!(map.len(), 2); diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index 77e1493cfc..011be7c88c 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -218,7 +218,8 @@ impl Context { let table_route = self .table_metadata_manager .table_route_manager() - .get(table_id) + .table_route_storage() + .get_raw(table_id) .await .context(error::TableMetadataManagerSnafu) .map_err(BoxedError::new) @@ -803,6 +804,7 @@ mod tests { .env() .table_metadata_manager() .table_route_manager() + .table_route_storage() .get(region_id.table_id()) .await .unwrap() diff --git a/src/meta-srv/src/procedure/region_migration/manager.rs b/src/meta-srv/src/procedure/region_migration/manager.rs index ad1b88efe0..000520ee57 100644 --- a/src/meta-srv/src/procedure/region_migration/manager.rs +++ b/src/meta-srv/src/procedure/region_migration/manager.rs @@ -177,6 +177,7 @@ impl RegionMigrationManager { .context_factory .table_metadata_manager .table_route_manager() + .table_route_storage() .get(region_id.table_id()) .await .context(error::TableMetadataManagerSnafu)? @@ -184,7 +185,7 @@ impl RegionMigrationManager { table_id: region_id.table_id(), })?; - Ok(table_route.into_inner()) + Ok(table_route) } /// Verifies the type of region migration table route. diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs index c09d18c896..8d22fd8104 100644 --- a/src/meta-srv/src/procedure/region_migration/test_util.rs +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -416,11 +416,11 @@ impl ProcedureMigrationTestSuite { .env .table_metadata_manager .table_route_manager() + .table_route_storage() .get(region_id.table_id()) .await .unwrap() - .unwrap() - .into_inner(); + .unwrap(); let region_routes = table_route.region_routes().unwrap(); let expected_leader_id = self.context.persistent_ctx.to_peer.id; diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs index e018053407..1404b8aca2 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs @@ -145,7 +145,8 @@ mod tests { let table_metadata_manager = env.table_metadata_manager(); let original_table_route = table_metadata_manager .table_route_manager() - .get(table_id) + .table_route_storage() + .get_raw(table_id) .await .unwrap() .unwrap(); @@ -201,6 +202,7 @@ mod tests { let latest_table_route = table_metadata_manager .table_route_manager() + .table_route_storage() .get(table_id) .await .unwrap() @@ -243,6 +245,7 @@ mod tests { let latest_table_route = table_metadata_manager .table_route_manager() + .table_route_storage() .get(table_id) .await .unwrap() diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs index 81e73e270a..c9253be2d5 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs @@ -134,7 +134,8 @@ mod tests { let table_metadata_manager = env.table_metadata_manager(); let old_table_route = table_metadata_manager .table_route_manager() - .get(table_id) + .table_route_storage() + .get_raw(table_id) .await .unwrap() .unwrap(); @@ -165,11 +166,11 @@ mod tests { let table_route = table_metadata_manager .table_route_manager() + .table_route_storage() .get(table_id) .await .unwrap() - .unwrap() - .into_inner(); + .unwrap(); assert_eq!( &expected_region_routes, table_route.region_routes().unwrap() @@ -229,11 +230,11 @@ mod tests { let table_route = table_metadata_manager .table_route_manager() + .table_route_storage() .get(table_id) .await .unwrap() - .unwrap() - .into_inner(); + .unwrap(); assert_eq!( &expected_region_routes, table_route.region_routes().unwrap() diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs index 2c5a5f61d0..9272491e19 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs @@ -335,7 +335,8 @@ mod tests { let table_metadata_manager = env.table_metadata_manager(); let original_table_route = table_metadata_manager .table_route_manager() - .get(table_id) + .table_route_storage() + .get_raw(table_id) .await .unwrap() .unwrap(); @@ -473,11 +474,11 @@ mod tests { let table_route = table_metadata_manager .table_route_manager() + .table_route_storage() .get(table_id) .await .unwrap() - .unwrap() - .into_inner(); + .unwrap(); let region_routes = table_route.region_routes().unwrap(); assert!(ctx.volatile_ctx.table_route.is_none()); diff --git a/src/meta-srv/src/procedure/tests.rs b/src/meta-srv/src/procedure/tests.rs index 39b984476a..669c9e56c0 100644 --- a/src/meta-srv/src/procedure/tests.rs +++ b/src/meta-srv/src/procedure/tests.rs @@ -261,6 +261,7 @@ async fn test_on_datanode_create_logical_regions() { let physical_route_txn = ctx .table_metadata_manager .table_route_manager() + .table_route_storage() .build_create_txn(physical_table_id, &physical_table_route) .unwrap() .0; diff --git a/src/meta-srv/src/region/lease_keeper.rs b/src/meta-srv/src/region/lease_keeper.rs index 9aea23232b..968deb7e12 100644 --- a/src/meta-srv/src/region/lease_keeper.rs +++ b/src/meta-srv/src/region/lease_keeper.rs @@ -105,11 +105,18 @@ impl RegionLeaseKeeper { // The subset of all table metadata. // TODO: considers storing all active regions in meta's memory. - let metadata_subset = table_route_manager + let table_routes = table_route_manager + .table_route_storage() .batch_get(table_ids) .await .context(error::TableMetadataManagerSnafu)?; + let metadata_subset = table_routes + .into_iter() + .zip(table_ids) + .filter_map(|(route, id)| route.map(|route| (*id, route))) + .collect::>(); + Ok(metadata_subset) } diff --git a/src/meta-srv/src/selector/load_based.rs b/src/meta-srv/src/selector/load_based.rs index 9573757a3f..5f89a5cfeb 100644 --- a/src/meta-srv/src/selector/load_based.rs +++ b/src/meta-srv/src/selector/load_based.rs @@ -138,6 +138,7 @@ async fn get_leader_peer_ids( ) -> Result> { table_metadata_manager .table_route_manager() + .table_route_storage() .get(table_id) .await .context(error::TableMetadataManagerSnafu) diff --git a/src/meta-srv/src/service/admin/route.rs b/src/meta-srv/src/service/admin/route.rs index 217133ad3d..4339e1a564 100644 --- a/src/meta-srv/src/service/admin/route.rs +++ b/src/meta-srv/src/service/admin/route.rs @@ -54,6 +54,7 @@ impl HttpHandler for RouteHandler { let table_route_value = self .table_metadata_manager .table_route_manager() + .table_route_storage() .get(table_id) .await .context(TableMetadataManagerSnafu)? diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs index 9076d48765..a7572066f7 100644 --- a/tests-integration/src/grpc.rs +++ b/tests-integration/src/grpc.rs @@ -516,11 +516,11 @@ CREATE TABLE {table_name} ( let table_route_value = instance .table_metadata_manager() .table_route_manager() + .table_route_storage() .get(table_id) .await .unwrap() - .unwrap() - .into_inner(); + .unwrap(); let region_to_dn_map = region_distribution( table_route_value diff --git a/tests-integration/src/instance.rs b/tests-integration/src/instance.rs index 556c4a2ebc..d1f4c8929d 100644 --- a/tests-integration/src/instance.rs +++ b/tests-integration/src/instance.rs @@ -211,11 +211,11 @@ mod tests { let table_route_value = manager .table_route_manager() + .table_route_storage() .get(table_id) .await .unwrap() - .unwrap() - .into_inner(); + .unwrap(); let region_to_dn_map = region_distribution( table_route_value