diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index b078576fde..fdc2de38b6 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -188,7 +188,71 @@ pub const CACHE_KEY_PREFIXES: [&str; 5] = [ NODE_ADDRESS_PREFIX, ]; -pub type RegionDistribution = BTreeMap>; +/// A set of regions with the same role. +#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize)] +pub struct RegionRoleSet { + /// Leader regions. + pub leader_regions: Vec, + /// Follower regions. + pub follower_regions: Vec, +} + +impl<'de> Deserialize<'de> for RegionRoleSet { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + #[derive(Deserialize)] + #[serde(untagged)] + enum RegionRoleSetOrLeaderOnly { + Full { + leader_regions: Vec, + follower_regions: Vec, + }, + LeaderOnly(Vec), + } + match RegionRoleSetOrLeaderOnly::deserialize(deserializer)? { + RegionRoleSetOrLeaderOnly::Full { + leader_regions, + follower_regions, + } => Ok(RegionRoleSet::new(leader_regions, follower_regions)), + RegionRoleSetOrLeaderOnly::LeaderOnly(leader_regions) => { + Ok(RegionRoleSet::new(leader_regions, vec![])) + } + } + } +} + +impl RegionRoleSet { + /// Create a new region role set. + pub fn new(leader_regions: Vec, follower_regions: Vec) -> Self { + Self { + leader_regions, + follower_regions, + } + } + + /// Add a leader region to the set. + pub fn add_leader_region(&mut self, region_number: RegionNumber) { + self.leader_regions.push(region_number); + } + + /// Add a follower region to the set. + pub fn add_follower_region(&mut self, region_number: RegionNumber) { + self.follower_regions.push(region_number); + } + + /// Sort the regions. + pub fn sort(&mut self) { + self.follower_regions.sort(); + self.leader_regions.sort(); + } +} + +/// The distribution of regions. +/// +/// The key is the datanode id, the value is the region role set. +pub type RegionDistribution = BTreeMap; /// The id of flow. pub type FlowId = u32; @@ -1368,7 +1432,8 @@ mod tests { use crate::key::table_name::TableNameKey; use crate::key::table_route::TableRouteValue; use crate::key::{ - DeserializedValueWithBytes, TableMetadataManager, ViewInfoValue, TOPIC_REGION_PREFIX, + DeserializedValueWithBytes, RegionDistribution, RegionRoleSet, TableMetadataManager, + ViewInfoValue, TOPIC_REGION_PREFIX, }; use crate::kv_backend::memory::MemoryKvBackend; use crate::kv_backend::KvBackend; @@ -1995,7 +2060,8 @@ mod tests { .unwrap() .unwrap(); - assert_eq!(got.regions, regions) + assert_eq!(got.regions, regions.leader_regions); + assert_eq!(got.follower_regions, regions.follower_regions); } } @@ -2412,4 +2478,28 @@ mod tests { assert_eq!(current_view_info.columns, new_columns); assert_eq!(current_view_info.plan_columns, new_plan_columns); } + + #[test] + fn test_region_role_set_deserialize() { + let s = r#"{"leader_regions": [1, 2, 3], "follower_regions": [4, 5, 6]}"#; + let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap(); + assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]); + assert_eq!(region_role_set.follower_regions, vec![4, 5, 6]); + + let s = r#"[1, 2, 3]"#; + let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap(); + assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]); + assert!(region_role_set.follower_regions.is_empty()); + } + + #[test] + fn test_region_distribution_deserialize() { + let s = r#"{"1": [1,2,3], "2": {"leader_regions": [7, 8, 9], "follower_regions": [10, 11, 12]}}"#; + let region_distribution: RegionDistribution = serde_json::from_str(s).unwrap(); + assert_eq!(region_distribution.len(), 2); + assert_eq!(region_distribution[&1].leader_regions, vec![1, 2, 3]); + assert!(region_distribution[&1].follower_regions.is_empty()); + assert_eq!(region_distribution[&2].leader_regions, vec![7, 8, 9]); + assert_eq!(region_distribution[&2].follower_regions, vec![10, 11, 12]); + } } diff --git a/src/common/meta/src/key/datanode_table.rs b/src/common/meta/src/key/datanode_table.rs index 7c182288ec..19b4e7da48 100644 --- a/src/common/meta/src/key/datanode_table.rs +++ b/src/common/meta/src/key/datanode_table.rs @@ -24,7 +24,7 @@ use table::metadata::TableId; use crate::error::{DatanodeTableInfoNotFoundSnafu, InvalidMetadataSnafu, Result}; use crate::key::table_route::PhysicalTableRouteValue; use crate::key::{ - MetadataKey, MetadataValue, RegionDistribution, DATANODE_TABLE_KEY_PATTERN, + MetadataKey, MetadataValue, RegionDistribution, RegionRoleSet, DATANODE_TABLE_KEY_PATTERN, DATANODE_TABLE_KEY_PREFIX, }; use crate::kv_backend::txn::{Txn, TxnOp}; @@ -118,23 +118,31 @@ impl Display for DatanodeTableKey { pub struct DatanodeTableValue { pub table_id: TableId, pub regions: Vec, + #[serde(default)] + pub follower_regions: Vec, #[serde(flatten)] pub region_info: RegionInfo, version: u64, } impl DatanodeTableValue { - pub fn new(table_id: TableId, regions: Vec, region_info: RegionInfo) -> Self { + pub fn new(table_id: TableId, region_role_set: RegionRoleSet, region_info: RegionInfo) -> Self { + let RegionRoleSet { + leader_regions, + follower_regions, + } = region_role_set; + Self { table_id, - regions, + regions: leader_regions, + follower_regions, region_info, version: 0, } } } -/// Decodes `KeyValue` to ((),`DatanodeTableValue`) +/// Decodes [`KeyValue`] to [`DatanodeTableValue`]. pub fn datanode_table_value_decoder(kv: KeyValue) -> Result { DatanodeTableValue::try_from_raw_value(&kv.value) } @@ -373,10 +381,11 @@ mod tests { let value = DatanodeTableValue { table_id: 42, regions: vec![1, 2, 3], + follower_regions: vec![], region_info: RegionInfo::default(), version: 1, }; - let literal = br#"{"table_id":42,"regions":[1,2,3],"engine":"","region_storage_path":"","region_options":{},"region_wal_options":{},"version":1}"#; + let literal = br#"{"table_id":42,"regions":[1,2,3],"follower_regions":[],"engine":"","region_storage_path":"","region_options":{},"region_wal_options":{},"version":1}"#; let raw_value = value.try_as_raw_value().unwrap(); assert_eq!(raw_value, literal); @@ -467,6 +476,7 @@ mod tests { let table_value = DatanodeTableValue { table_id: 1, regions: vec![], + follower_regions: vec![], region_info, version: 1, }; diff --git a/src/common/meta/src/rpc/router.rs b/src/common/meta/src/rpc/router.rs index 2386ca73a7..b7d14202f9 100644 --- a/src/common/meta/src/rpc/router.rs +++ b/src/common/meta/src/rpc/router.rs @@ -40,17 +40,23 @@ pub fn region_distribution(region_routes: &[RegionRoute]) -> RegionDistribution let mut regions_id_map = RegionDistribution::new(); for route in region_routes.iter() { if let Some(peer) = route.leader_peer.as_ref() { - let region_id = route.region.id.region_number(); - regions_id_map.entry(peer.id).or_default().push(region_id); + let region_number = route.region.id.region_number(); + regions_id_map + .entry(peer.id) + .or_default() + .add_leader_region(region_number); } for peer in route.follower_peers.iter() { - let region_id = route.region.id.region_number(); - regions_id_map.entry(peer.id).or_default().push(region_id); + let region_number = route.region.id.region_number(); + regions_id_map + .entry(peer.id) + .or_default() + .add_follower_region(region_number); } } - for (_, regions) in regions_id_map.iter_mut() { - // id asc - regions.sort() + for (_, region_role_set) in regions_id_map.iter_mut() { + // Sort the regions in ascending order. + region_role_set.sort() } regions_id_map } @@ -455,6 +461,7 @@ impl From for Partition { #[cfg(test)] mod tests { use super::*; + use crate::key::RegionRoleSet; #[test] fn test_leader_is_downgraded() { @@ -611,8 +618,8 @@ mod tests { let distribution = region_distribution(®ion_routes); assert_eq!(distribution.len(), 3); - assert_eq!(distribution[&1], vec![1, 2]); - assert_eq!(distribution[&2], vec![1, 2]); - assert_eq!(distribution[&3], vec![1, 2]); + assert_eq!(distribution[&1], RegionRoleSet::new(vec![1], vec![2])); + assert_eq!(distribution[&2], RegionRoleSet::new(vec![2], vec![1])); + assert_eq!(distribution[&3], RegionRoleSet::new(vec![], vec![1, 2])); } } diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 15823fb498..92e90fec1c 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -6,6 +6,7 @@ license.workspace = true [features] testing = [] +enterprise = [] [lints] workspace = true diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index fddca06ed7..47f34c8f9b 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -559,6 +559,8 @@ async fn open_all_regions( init_regions_parallelism: usize, ) -> Result<()> { let mut regions = vec![]; + #[cfg(feature = "enterprise")] + let mut follower_regions = vec![]; for table_value in table_values { for region_number in table_value.regions { // Augments region options with wal options if a wal options is provided. @@ -576,6 +578,24 @@ async fn open_all_regions( region_options, )); } + + #[cfg(feature = "enterprise")] + for region_number in table_value.follower_regions { + // Augments region options with wal options if a wal options is provided. + let mut region_options = table_value.region_info.region_options.clone(); + prepare_wal_options( + &mut region_options, + RegionId::new(table_value.table_id, region_number), + &table_value.region_info.region_wal_options, + ); + + follower_regions.push(( + RegionId::new(table_value.table_id, region_number), + table_value.region_info.engine.clone(), + table_value.region_info.region_storage_path.clone(), + region_options, + )); + } } let num_regions = regions.len(); info!("going to open {} region(s)", num_regions); @@ -617,6 +637,43 @@ async fn open_all_regions( } } } + + #[cfg(feature = "enterprise")] + if !follower_regions.is_empty() { + info!( + "going to open {} follower region(s)", + follower_regions.len() + ); + let mut region_requests = Vec::with_capacity(follower_regions.len()); + for (region_id, engine, store_path, options) in follower_regions { + let region_dir = region_dir(&store_path, region_id); + region_requests.push(( + region_id, + RegionOpenRequest { + engine, + region_dir, + options, + skip_wal_replay: true, + }, + )); + } + + let open_regions = region_server + .handle_batch_open_requests(init_regions_parallelism, region_requests) + .await?; + + ensure!( + open_regions.len() == num_regions, + error::UnexpectedSnafu { + violated: format!( + "Expected to open {} of follower regions, only {} of regions has opened", + num_regions, + open_regions.len() + ) + } + ); + } + info!("all regions are opened"); Ok(()) @@ -632,6 +689,7 @@ mod tests { use common_base::Plugins; use common_meta::cache::LayeredCacheRegistryBuilder; use common_meta::key::datanode_table::DatanodeTableManager; + use common_meta::key::RegionRoleSet; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::KvBackendRef; use mito2::engine::MITO_ENGINE_NAME; @@ -651,7 +709,7 @@ mod tests { "foo/bar/weny", HashMap::from([("foo".to_string(), "bar".to_string())]), HashMap::default(), - BTreeMap::from([(0, vec![0, 1, 2])]), + BTreeMap::from([(0, RegionRoleSet::new(vec![0, 1, 2], vec![]))]), ) .unwrap(); diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs index d09bbc3761..19ff1fa3ae 100644 --- a/tests-integration/src/grpc.rs +++ b/tests-integration/src/grpc.rs @@ -857,7 +857,7 @@ CREATE TABLE {table_name} ( .expect("physical table route"), ) .iter() - .map(|(k, v)| (v[0], *k)) + .map(|(k, v)| (v.leader_regions[0], *k)) .collect::>(); assert!(region_to_dn_map.len() <= instance.datanodes().len()); diff --git a/tests-integration/src/instance.rs b/tests-integration/src/instance.rs index 39ca6ae498..e23c4234be 100644 --- a/tests-integration/src/instance.rs +++ b/tests-integration/src/instance.rs @@ -224,7 +224,7 @@ mod tests { .expect("region routes should be physical"), ) .iter() - .map(|(k, v)| (v[0], *k)) + .map(|(k, v)| (v.leader_regions[0], *k)) .collect::>(); assert!(region_to_dn_map.len() <= instance.datanodes().len()); diff --git a/tests-integration/tests/region_migration.rs b/tests-integration/tests/region_migration.rs index 553160ac42..d9f1c61082 100644 --- a/tests-integration/tests/region_migration.rs +++ b/tests-integration/tests/region_migration.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use std::time::Duration; use client::{OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; -use common_meta::key::{RegionDistribution, TableMetadataManagerRef}; +use common_meta::key::{RegionDistribution, RegionRoleSet, TableMetadataManagerRef}; use common_meta::peer::Peer; use common_query::Output; use common_recordbatch::RecordBatches; @@ -166,7 +166,7 @@ pub async fn test_region_migration(store_type: StorageType, endpoints: Vec