fix: skip wal replay when opening follower regions (#6234)

* chore: introduce `follower_regions` field for `DatanodeTableValue`

* fix: skip wal replay when opening follower regions

* chore: add enterprise feature gate

* fix: fix unit tests

* feat: improve RegionRoleSet backward compatibility
This commit is contained in:
Weny Xu
2025-06-06 15:29:35 +08:00
committed by GitHub
parent 0d4f27a699
commit 06b1627da5
8 changed files with 217 additions and 39 deletions

View File

@@ -188,7 +188,71 @@ pub const CACHE_KEY_PREFIXES: [&str; 5] = [
NODE_ADDRESS_PREFIX,
];
pub type RegionDistribution = BTreeMap<DatanodeId, Vec<RegionNumber>>;
/// A set of regions with the same role.
#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize)]
pub struct RegionRoleSet {
/// Leader regions.
pub leader_regions: Vec<RegionNumber>,
/// Follower regions.
pub follower_regions: Vec<RegionNumber>,
}
impl<'de> Deserialize<'de> for RegionRoleSet {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
#[derive(Deserialize)]
#[serde(untagged)]
enum RegionRoleSetOrLeaderOnly {
Full {
leader_regions: Vec<RegionNumber>,
follower_regions: Vec<RegionNumber>,
},
LeaderOnly(Vec<RegionNumber>),
}
match RegionRoleSetOrLeaderOnly::deserialize(deserializer)? {
RegionRoleSetOrLeaderOnly::Full {
leader_regions,
follower_regions,
} => Ok(RegionRoleSet::new(leader_regions, follower_regions)),
RegionRoleSetOrLeaderOnly::LeaderOnly(leader_regions) => {
Ok(RegionRoleSet::new(leader_regions, vec![]))
}
}
}
}
impl RegionRoleSet {
/// Create a new region role set.
pub fn new(leader_regions: Vec<RegionNumber>, follower_regions: Vec<RegionNumber>) -> Self {
Self {
leader_regions,
follower_regions,
}
}
/// Add a leader region to the set.
pub fn add_leader_region(&mut self, region_number: RegionNumber) {
self.leader_regions.push(region_number);
}
/// Add a follower region to the set.
pub fn add_follower_region(&mut self, region_number: RegionNumber) {
self.follower_regions.push(region_number);
}
/// Sort the regions.
pub fn sort(&mut self) {
self.follower_regions.sort();
self.leader_regions.sort();
}
}
/// The distribution of regions.
///
/// The key is the datanode id, the value is the region role set.
pub type RegionDistribution = BTreeMap<DatanodeId, RegionRoleSet>;
/// The id of flow.
pub type FlowId = u32;
@@ -1368,7 +1432,8 @@ mod tests {
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::key::{
DeserializedValueWithBytes, TableMetadataManager, ViewInfoValue, TOPIC_REGION_PREFIX,
DeserializedValueWithBytes, RegionDistribution, RegionRoleSet, TableMetadataManager,
ViewInfoValue, TOPIC_REGION_PREFIX,
};
use crate::kv_backend::memory::MemoryKvBackend;
use crate::kv_backend::KvBackend;
@@ -1995,7 +2060,8 @@ mod tests {
.unwrap()
.unwrap();
assert_eq!(got.regions, regions)
assert_eq!(got.regions, regions.leader_regions);
assert_eq!(got.follower_regions, regions.follower_regions);
}
}
@@ -2412,4 +2478,28 @@ mod tests {
assert_eq!(current_view_info.columns, new_columns);
assert_eq!(current_view_info.plan_columns, new_plan_columns);
}
#[test]
fn test_region_role_set_deserialize() {
let s = r#"{"leader_regions": [1, 2, 3], "follower_regions": [4, 5, 6]}"#;
let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
assert_eq!(region_role_set.follower_regions, vec![4, 5, 6]);
let s = r#"[1, 2, 3]"#;
let region_role_set: RegionRoleSet = serde_json::from_str(s).unwrap();
assert_eq!(region_role_set.leader_regions, vec![1, 2, 3]);
assert!(region_role_set.follower_regions.is_empty());
}
#[test]
fn test_region_distribution_deserialize() {
let s = r#"{"1": [1,2,3], "2": {"leader_regions": [7, 8, 9], "follower_regions": [10, 11, 12]}}"#;
let region_distribution: RegionDistribution = serde_json::from_str(s).unwrap();
assert_eq!(region_distribution.len(), 2);
assert_eq!(region_distribution[&1].leader_regions, vec![1, 2, 3]);
assert!(region_distribution[&1].follower_regions.is_empty());
assert_eq!(region_distribution[&2].leader_regions, vec![7, 8, 9]);
assert_eq!(region_distribution[&2].follower_regions, vec![10, 11, 12]);
}
}

View File

@@ -24,7 +24,7 @@ use table::metadata::TableId;
use crate::error::{DatanodeTableInfoNotFoundSnafu, InvalidMetadataSnafu, Result};
use crate::key::table_route::PhysicalTableRouteValue;
use crate::key::{
MetadataKey, MetadataValue, RegionDistribution, DATANODE_TABLE_KEY_PATTERN,
MetadataKey, MetadataValue, RegionDistribution, RegionRoleSet, DATANODE_TABLE_KEY_PATTERN,
DATANODE_TABLE_KEY_PREFIX,
};
use crate::kv_backend::txn::{Txn, TxnOp};
@@ -118,23 +118,31 @@ impl Display for DatanodeTableKey {
pub struct DatanodeTableValue {
pub table_id: TableId,
pub regions: Vec<RegionNumber>,
#[serde(default)]
pub follower_regions: Vec<RegionNumber>,
#[serde(flatten)]
pub region_info: RegionInfo,
version: u64,
}
impl DatanodeTableValue {
pub fn new(table_id: TableId, regions: Vec<RegionNumber>, region_info: RegionInfo) -> Self {
pub fn new(table_id: TableId, region_role_set: RegionRoleSet, region_info: RegionInfo) -> Self {
let RegionRoleSet {
leader_regions,
follower_regions,
} = region_role_set;
Self {
table_id,
regions,
regions: leader_regions,
follower_regions,
region_info,
version: 0,
}
}
}
/// Decodes `KeyValue` to ((),`DatanodeTableValue`)
/// Decodes [`KeyValue`] to [`DatanodeTableValue`].
pub fn datanode_table_value_decoder(kv: KeyValue) -> Result<DatanodeTableValue> {
DatanodeTableValue::try_from_raw_value(&kv.value)
}
@@ -373,10 +381,11 @@ mod tests {
let value = DatanodeTableValue {
table_id: 42,
regions: vec![1, 2, 3],
follower_regions: vec![],
region_info: RegionInfo::default(),
version: 1,
};
let literal = br#"{"table_id":42,"regions":[1,2,3],"engine":"","region_storage_path":"","region_options":{},"region_wal_options":{},"version":1}"#;
let literal = br#"{"table_id":42,"regions":[1,2,3],"follower_regions":[],"engine":"","region_storage_path":"","region_options":{},"region_wal_options":{},"version":1}"#;
let raw_value = value.try_as_raw_value().unwrap();
assert_eq!(raw_value, literal);
@@ -467,6 +476,7 @@ mod tests {
let table_value = DatanodeTableValue {
table_id: 1,
regions: vec![],
follower_regions: vec![],
region_info,
version: 1,
};

View File

@@ -40,17 +40,23 @@ pub fn region_distribution(region_routes: &[RegionRoute]) -> RegionDistribution
let mut regions_id_map = RegionDistribution::new();
for route in region_routes.iter() {
if let Some(peer) = route.leader_peer.as_ref() {
let region_id = route.region.id.region_number();
regions_id_map.entry(peer.id).or_default().push(region_id);
let region_number = route.region.id.region_number();
regions_id_map
.entry(peer.id)
.or_default()
.add_leader_region(region_number);
}
for peer in route.follower_peers.iter() {
let region_id = route.region.id.region_number();
regions_id_map.entry(peer.id).or_default().push(region_id);
let region_number = route.region.id.region_number();
regions_id_map
.entry(peer.id)
.or_default()
.add_follower_region(region_number);
}
}
for (_, regions) in regions_id_map.iter_mut() {
// id asc
regions.sort()
for (_, region_role_set) in regions_id_map.iter_mut() {
// Sort the regions in ascending order.
region_role_set.sort()
}
regions_id_map
}
@@ -455,6 +461,7 @@ impl From<PbPartition> for Partition {
#[cfg(test)]
mod tests {
use super::*;
use crate::key::RegionRoleSet;
#[test]
fn test_leader_is_downgraded() {
@@ -611,8 +618,8 @@ mod tests {
let distribution = region_distribution(&region_routes);
assert_eq!(distribution.len(), 3);
assert_eq!(distribution[&1], vec![1, 2]);
assert_eq!(distribution[&2], vec![1, 2]);
assert_eq!(distribution[&3], vec![1, 2]);
assert_eq!(distribution[&1], RegionRoleSet::new(vec![1], vec![2]));
assert_eq!(distribution[&2], RegionRoleSet::new(vec![2], vec![1]));
assert_eq!(distribution[&3], RegionRoleSet::new(vec![], vec![1, 2]));
}
}

View File

@@ -6,6 +6,7 @@ license.workspace = true
[features]
testing = []
enterprise = []
[lints]
workspace = true

View File

@@ -559,6 +559,8 @@ async fn open_all_regions(
init_regions_parallelism: usize,
) -> Result<()> {
let mut regions = vec![];
#[cfg(feature = "enterprise")]
let mut follower_regions = vec![];
for table_value in table_values {
for region_number in table_value.regions {
// Augments region options with wal options if a wal options is provided.
@@ -576,6 +578,24 @@ async fn open_all_regions(
region_options,
));
}
#[cfg(feature = "enterprise")]
for region_number in table_value.follower_regions {
// Augments region options with wal options if a wal options is provided.
let mut region_options = table_value.region_info.region_options.clone();
prepare_wal_options(
&mut region_options,
RegionId::new(table_value.table_id, region_number),
&table_value.region_info.region_wal_options,
);
follower_regions.push((
RegionId::new(table_value.table_id, region_number),
table_value.region_info.engine.clone(),
table_value.region_info.region_storage_path.clone(),
region_options,
));
}
}
let num_regions = regions.len();
info!("going to open {} region(s)", num_regions);
@@ -617,6 +637,43 @@ async fn open_all_regions(
}
}
}
#[cfg(feature = "enterprise")]
if !follower_regions.is_empty() {
info!(
"going to open {} follower region(s)",
follower_regions.len()
);
let mut region_requests = Vec::with_capacity(follower_regions.len());
for (region_id, engine, store_path, options) in follower_regions {
let region_dir = region_dir(&store_path, region_id);
region_requests.push((
region_id,
RegionOpenRequest {
engine,
region_dir,
options,
skip_wal_replay: true,
},
));
}
let open_regions = region_server
.handle_batch_open_requests(init_regions_parallelism, region_requests)
.await?;
ensure!(
open_regions.len() == num_regions,
error::UnexpectedSnafu {
violated: format!(
"Expected to open {} of follower regions, only {} of regions has opened",
num_regions,
open_regions.len()
)
}
);
}
info!("all regions are opened");
Ok(())
@@ -632,6 +689,7 @@ mod tests {
use common_base::Plugins;
use common_meta::cache::LayeredCacheRegistryBuilder;
use common_meta::key::datanode_table::DatanodeTableManager;
use common_meta::key::RegionRoleSet;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
use mito2::engine::MITO_ENGINE_NAME;
@@ -651,7 +709,7 @@ mod tests {
"foo/bar/weny",
HashMap::from([("foo".to_string(), "bar".to_string())]),
HashMap::default(),
BTreeMap::from([(0, vec![0, 1, 2])]),
BTreeMap::from([(0, RegionRoleSet::new(vec![0, 1, 2], vec![]))]),
)
.unwrap();

View File

@@ -857,7 +857,7 @@ CREATE TABLE {table_name} (
.expect("physical table route"),
)
.iter()
.map(|(k, v)| (v[0], *k))
.map(|(k, v)| (v.leader_regions[0], *k))
.collect::<HashMap<u32, u64>>();
assert!(region_to_dn_map.len() <= instance.datanodes().len());

View File

@@ -224,7 +224,7 @@ mod tests {
.expect("region routes should be physical"),
)
.iter()
.map(|(k, v)| (v[0], *k))
.map(|(k, v)| (v.leader_regions[0], *k))
.collect::<HashMap<u32, u64>>();
assert!(region_to_dn_map.len() <= instance.datanodes().len());

View File

@@ -16,7 +16,7 @@ use std::sync::Arc;
use std::time::Duration;
use client::{OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_meta::key::{RegionDistribution, TableMetadataManagerRef};
use common_meta::key::{RegionDistribution, RegionRoleSet, TableMetadataManagerRef};
use common_meta::peer::Peer;
use common_query::Output;
use common_recordbatch::RecordBatches;
@@ -166,7 +166,7 @@ pub async fn test_region_migration(store_type: StorageType, endpoints: Vec<Strin
to_regions
);
let region_id = RegionId::new(table_id, from_regions[0]);
let region_id = RegionId::new(table_id, from_regions.leader_regions[0]);
// Trigger region migration.
let procedure = region_migration_manager
.submit_procedure(RegionMigrationProcedureTask::new(
@@ -180,7 +180,12 @@ pub async fn test_region_migration(store_type: StorageType, endpoints: Vec<Strin
info!("Started region procedure: {}!", procedure.unwrap());
// Prepares expected region distribution.
to_regions.extend(from_regions);
to_regions
.leader_regions
.extend(from_regions.leader_regions);
to_regions
.follower_regions
.extend(from_regions.follower_regions);
// Keeps asc order.
to_regions.sort();
distribution.insert(to_peer_id, to_regions);
@@ -300,10 +305,10 @@ pub async fn test_metric_table_region_migration_by_sql(
let (from_peer_id, from_regions) = distribution.pop_first().unwrap();
info!(
"Selecting from peer: {from_peer_id}, and regions: {:?}",
from_regions[0]
from_regions.leader_regions[0]
);
let to_peer_id = (from_peer_id + 1) % 3;
let region_id = RegionId::new(table_id, from_regions[0]);
let region_id = RegionId::new(table_id, from_regions.leader_regions[0]);
// Trigger region migration.
let procedure_id =
trigger_migration_by_sql(&cluster, region_id.as_u64(), from_peer_id, to_peer_id).await;
@@ -436,7 +441,7 @@ pub async fn test_region_migration_by_sql(store_type: StorageType, endpoints: Ve
to_regions
);
let region_id = RegionId::new(table_id, from_regions[0]);
let region_id = RegionId::new(table_id, from_regions.leader_regions[0]);
// Trigger region migration.
let procedure_id =
trigger_migration_by_sql(&cluster, region_id.as_u64(), from_peer_id, to_peer_id).await;
@@ -558,12 +563,12 @@ pub async fn test_region_migration_multiple_regions(
let (peer_2, peer_2_regions) = distribution.pop_first().unwrap();
// Picks the peer only contains as from peer.
let ((from_peer_id, from_regions), (to_peer_id, mut to_regions)) = if peer_1_regions.len() == 1
{
((peer_1, peer_1_regions), (peer_2, peer_2_regions))
} else {
((peer_2, peer_2_regions), (peer_1, peer_1_regions))
};
let ((from_peer_id, from_regions), (to_peer_id, mut to_regions)) =
if peer_1_regions.leader_regions.len() == 1 {
((peer_1, peer_1_regions), (peer_2, peer_2_regions))
} else {
((peer_2, peer_2_regions), (peer_1, peer_1_regions))
};
info!(
"Selecting from peer: {from_peer_id}, and regions: {:?}",
@@ -574,7 +579,7 @@ pub async fn test_region_migration_multiple_regions(
to_regions
);
let region_id = RegionId::new(table_id, from_regions[0]);
let region_id = RegionId::new(table_id, from_regions.leader_regions[0]);
// Trigger region migration.
let procedure = region_migration_manager
.submit_procedure(RegionMigrationProcedureTask::new(
@@ -588,7 +593,12 @@ pub async fn test_region_migration_multiple_regions(
info!("Started region procedure: {}!", procedure.unwrap());
// Prepares expected region distribution.
to_regions.extend(from_regions);
to_regions
.leader_regions
.extend(from_regions.leader_regions);
to_regions
.follower_regions
.extend(from_regions.follower_regions);
// Keeps asc order.
to_regions.sort();
distribution.insert(to_peer_id, to_regions);
@@ -699,7 +709,7 @@ pub async fn test_region_migration_all_regions(store_type: StorageType, endpoint
let region_migration_manager = cluster.metasrv.region_migration_manager();
let (from_peer_id, mut from_regions) = distribution.pop_first().unwrap();
let to_peer_id = 1;
let mut to_regions = Vec::new();
let mut to_regions = RegionRoleSet::default();
info!(
"Selecting from peer: {from_peer_id}, and regions: {:?}",
from_regions
@@ -709,7 +719,7 @@ pub async fn test_region_migration_all_regions(store_type: StorageType, endpoint
to_regions
);
let region_id = RegionId::new(table_id, from_regions[0]);
let region_id = RegionId::new(table_id, from_regions.leader_regions[0]);
// Trigger region migration.
let procedure = region_migration_manager
.submit_procedure(RegionMigrationProcedureTask::new(
@@ -723,7 +733,9 @@ pub async fn test_region_migration_all_regions(store_type: StorageType, endpoint
info!("Started region procedure: {}!", procedure.unwrap());
// Prepares expected region distribution.
to_regions.push(from_regions.remove(0));
to_regions
.leader_regions
.push(from_regions.leader_regions.remove(0));
// Keeps asc order.
to_regions.sort();
distribution.insert(to_peer_id, to_regions);
@@ -1120,7 +1132,7 @@ async fn find_region_distribution_by_sql(
distribution
.entry(datanode_id)
.or_default()
.push(region_id.region_number());
.add_leader_region(region_id.region_number());
}
}