diff --git a/src/cli/src/bench.rs b/src/cli/src/bench.rs index ae5765e3e0..eb55931652 100644 --- a/src/cli/src/bench.rs +++ b/src/cli/src/bench.rs @@ -189,7 +189,6 @@ fn create_region_routes(regions: Vec) -> Vec { region: Region { id: region_id.into(), name: String::new(), - partition: None, attrs: BTreeMap::new(), partition_expr: Default::default(), }, @@ -200,6 +199,7 @@ fn create_region_routes(regions: Vec) -> Vec { follower_peers: vec![], leader_state: None, leader_down_since: None, + write_route_policy: None, }); } diff --git a/src/common/meta/src/ddl/alter_table/region_request.rs b/src/common/meta/src/ddl/alter_table/region_request.rs index 1b3dafdd3a..73edd4c361 100644 --- a/src/common/meta/src/ddl/alter_table/region_request.rs +++ b/src/common/meta/src/ddl/alter_table/region_request.rs @@ -201,6 +201,7 @@ mod tests { follower_peers: vec![], leader_state: None, leader_down_since: None, + write_route_policy: None, }]), HashMap::new(), ) diff --git a/src/common/meta/src/ddl/tests/alter_logical_tables.rs b/src/common/meta/src/ddl/tests/alter_logical_tables.rs index 127c3e95f9..e6bb7676f5 100644 --- a/src/common/meta/src/ddl/tests/alter_logical_tables.rs +++ b/src/common/meta/src/ddl/tests/alter_logical_tables.rs @@ -600,6 +600,7 @@ async fn test_on_submit_alter_region_request() { follower_peers: vec![Peer::empty(5)], leader_state: None, leader_down_since: None, + write_route_policy: None, }]; create_physical_table_task.set_table_id(phy_id); create_physical_table_metadata( diff --git a/src/common/meta/src/ddl/tests/alter_table.rs b/src/common/meta/src/ddl/tests/alter_table.rs index bf6a83a581..d935aa6a15 100644 --- a/src/common/meta/src/ddl/tests/alter_table.rs +++ b/src/common/meta/src/ddl/tests/alter_table.rs @@ -68,6 +68,7 @@ fn prepare_table_route(table_id: u32) -> TableRouteValue { follower_peers: vec![Peer::empty(5)], leader_state: None, leader_down_since: None, + write_route_policy: None, }, RegionRoute { region: Region::new_test(RegionId::new(table_id, 2)), @@ -75,6 +76,7 @@ fn prepare_table_route(table_id: u32) -> TableRouteValue { follower_peers: vec![Peer::empty(4)], leader_state: None, leader_down_since: None, + write_route_policy: None, }, RegionRoute { region: Region::new_test(RegionId::new(table_id, 3)), @@ -82,6 +84,7 @@ fn prepare_table_route(table_id: u32) -> TableRouteValue { follower_peers: vec![], leader_state: None, leader_down_since: None, + write_route_policy: None, }, ]) } @@ -425,6 +428,7 @@ async fn test_on_update_metadata_add_columns() { follower_peers: vec![], leader_state: None, leader_down_since: None, + write_route_policy: None, }]; // Puts a value to table name key. ddl_context @@ -528,6 +532,7 @@ async fn test_on_update_table_options() { follower_peers: vec![], leader_state: None, leader_down_since: None, + write_route_policy: None, }]; // Puts a value to table name key. ddl_context diff --git a/src/common/meta/src/ddl/tests/create_logical_tables.rs b/src/common/meta/src/ddl/tests/create_logical_tables.rs index 08e4cad69b..f7dd397f9f 100644 --- a/src/common/meta/src/ddl/tests/create_logical_tables.rs +++ b/src/common/meta/src/ddl/tests/create_logical_tables.rs @@ -521,6 +521,7 @@ async fn test_on_submit_create_request() { follower_peers: vec![Peer::empty(5)], leader_state: None, leader_down_since: None, + write_route_policy: None, }]; create_physical_table_task.set_table_id(table_id); create_physical_table_metadata( diff --git a/src/common/meta/src/ddl/tests/drop_table.rs b/src/common/meta/src/ddl/tests/drop_table.rs index 2a303deb44..65c3915adc 100644 --- a/src/common/meta/src/ddl/tests/drop_table.rs +++ b/src/common/meta/src/ddl/tests/drop_table.rs @@ -118,6 +118,7 @@ async fn test_on_datanode_drop_regions() { follower_peers: vec![Peer::empty(5)], leader_state: None, leader_down_since: None, + write_route_policy: None, }, RegionRoute { region: Region::new_test(RegionId::new(table_id, 2)), @@ -125,6 +126,7 @@ async fn test_on_datanode_drop_regions() { follower_peers: vec![Peer::empty(4)], leader_state: None, leader_down_since: None, + write_route_policy: None, }, RegionRoute { region: Region::new_test(RegionId::new(table_id, 3)), @@ -132,6 +134,7 @@ async fn test_on_datanode_drop_regions() { follower_peers: vec![], leader_state: None, leader_down_since: None, + write_route_policy: None, }, ]), HashMap::new(), diff --git a/src/common/meta/src/ddl/utils/region_metadata_lister.rs b/src/common/meta/src/ddl/utils/region_metadata_lister.rs index ed8e9fb792..71e874dc26 100644 --- a/src/common/meta/src/ddl/utils/region_metadata_lister.rs +++ b/src/common/meta/src/ddl/utils/region_metadata_lister.rs @@ -171,6 +171,7 @@ mod tests { follower_peers: vec![Peer::empty(5)], leader_state: None, leader_down_since: None, + write_route_policy: None, }, RegionRoute { region: Region::new_test(RegionId::new(1024, 2)), @@ -178,6 +179,7 @@ mod tests { follower_peers: vec![Peer::empty(4)], leader_state: None, leader_down_since: None, + write_route_policy: None, }, RegionRoute { region: Region::new_test(RegionId::new(1024, 3)), @@ -185,6 +187,7 @@ mod tests { follower_peers: vec![Peer::empty(4)], leader_state: None, leader_down_since: None, + write_route_policy: None, }, ]; let region_metadatas = lister.list(1024, ®ion_routes).await.unwrap(); @@ -223,6 +226,7 @@ mod tests { follower_peers: vec![], leader_state: None, leader_down_since: None, + write_route_policy: None, }, RegionRoute { region: Region::new_test(RegionId::new(1024, 1)), @@ -230,6 +234,7 @@ mod tests { follower_peers: vec![], leader_state: None, leader_down_since: None, + write_route_policy: None, }, ]; let region_metadatas = lister.list(1024, ®ion_routes).await.unwrap(); diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index af911853de..4056597bf5 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -16,7 +16,7 @@ use std::collections::{HashMap, HashSet}; use std::fmt::{Display, Formatter}; use std::time::Duration; -use serde::{Deserialize, Deserializer, Serialize}; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; use store_api::region_engine::SyncRegionFromRequest; use store_api::storage::{FileRefsManifest, GcReport, RegionId, RegionNumber}; use strum::Display; @@ -518,19 +518,97 @@ impl Display for GcRegionsReply { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct EnterStagingRegion { pub region_id: RegionId, - pub partition_expr: String, + #[serde( + alias = "partition_expr", + deserialize_with = "deserialize_enter_staging_partition_directive", + serialize_with = "serialize_enter_staging_partition_directive" + )] + pub partition_directive: StagingPartitionDirective, } impl Display for EnterStagingRegion { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "EnterStagingRegion(region_id={}, partition_expr={})", - self.region_id, self.partition_expr + "EnterStagingRegion(region_id={}, partition_directive={})", + self.region_id, self.partition_directive ) } } +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum StagingPartitionDirective { + UpdatePartitionExpr(String), + RejectAllWrites, +} + +impl StagingPartitionDirective { + /// Returns the partition expression carried by this directive, if any. + pub fn as_partition_expr(&self) -> Option<&str> { + match self { + Self::UpdatePartitionExpr(expr) => Some(expr), + Self::RejectAllWrites => None, + } + } +} + +impl Display for StagingPartitionDirective { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Self::UpdatePartitionExpr(expr) => write!(f, "UpdatePartitionExpr({})", expr), + Self::RejectAllWrites => write!(f, "RejectAllWrites"), + } + } +} + +fn serialize_enter_staging_partition_directive( + rule: &StagingPartitionDirective, + serializer: S, +) -> std::result::Result +where + S: Serializer, +{ + match rule { + StagingPartitionDirective::UpdatePartitionExpr(expr) => serializer.serialize_str(expr), + StagingPartitionDirective::RejectAllWrites => { + #[derive(Serialize)] + struct RejectAllWritesSer<'a> { + r#type: &'a str, + } + + RejectAllWritesSer { + r#type: "reject_all_writes", + } + .serialize(serializer) + } + } +} + +fn deserialize_enter_staging_partition_directive<'de, D>( + deserializer: D, +) -> std::result::Result +where + D: Deserializer<'de>, +{ + #[derive(Deserialize)] + #[serde(untagged)] + enum Compat { + Legacy(String), + TypeTagged { r#type: String }, + } + + match Compat::deserialize(deserializer)? { + Compat::Legacy(expr) => Ok(StagingPartitionDirective::UpdatePartitionExpr(expr)), + Compat::TypeTagged { r#type } if r#type == "reject_all_writes" => { + Ok(StagingPartitionDirective::RejectAllWrites) + } + Compat::TypeTagged { r#type } => Err(serde::de::Error::custom(format!( + "Unknown enter staging partition directive type: {}", + r#type + ))), + } +} + /// Instruction payload for syncing a region from a manifest or another region. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct SyncRegion { @@ -795,7 +873,7 @@ where #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] pub struct EnterStagingRegionReply { pub region_id: RegionId, - /// Returns true if the region is under the new region rule. + /// Returns true if the region has entered staging with the target directive. pub ready: bool, /// Indicates whether the region exists. pub exists: bool, @@ -1208,6 +1286,31 @@ mod tests { assert_eq!(upgrade_region_instruction_reply, upgrade_region_reply); } + #[test] + fn test_enter_staging_partition_rule_compatibility() { + let legacy = r#"{"region_id":4398046511105,"partition_expr":"{\"Expr\":{\"lhs\":{\"Column\":\"x\"},\"op\":\"GtEq\",\"rhs\":{\"Value\":{\"Int32\":0}}}}"}"#; + let enter: EnterStagingRegion = serde_json::from_str(legacy).unwrap(); + assert_eq!(enter.region_id, RegionId::new(1024, 1)); + assert_eq!( + enter.partition_directive, + StagingPartitionDirective::UpdatePartitionExpr( + "{\"Expr\":{\"lhs\":{\"Column\":\"x\"},\"op\":\"GtEq\",\"rhs\":{\"Value\":{\"Int32\":0}}}}" + .to_string() + ) + ); + + let serialized = serde_json::to_string(&enter).unwrap(); + assert!(serialized.contains("\"partition_directive\":\"")); + assert!(!serialized.contains("partition_expr")); + + let reject = r#"{"region_id":4398046511105,"partition_expr":{"type":"reject_all_writes"}}"#; + let enter: EnterStagingRegion = serde_json::from_str(reject).unwrap(); + assert_eq!( + enter.partition_directive, + StagingPartitionDirective::RejectAllWrites + ); + } + #[derive(Debug, Clone, Serialize, Deserialize)] struct LegacyOpenRegion { region_ident: RegionIdent, diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 3bb9e1a559..0d123ead42 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -1539,7 +1539,6 @@ mod tests { region: Region { id: region_id.into(), name: "r1".to_string(), - partition: None, attrs: BTreeMap::new(), partition_expr: Default::default(), }, @@ -1547,6 +1546,7 @@ mod tests { follower_peers: vec![], leader_state: None, leader_down_since: None, + write_route_policy: None, } } @@ -2045,7 +2045,6 @@ mod tests { region: Region { id: 1.into(), name: "r1".to_string(), - partition: None, attrs: BTreeMap::new(), partition_expr: Default::default(), }, @@ -2053,12 +2052,12 @@ mod tests { leader_state: Some(LeaderState::Downgrading), follower_peers: vec![], leader_down_since: Some(current_time_millis()), + write_route_policy: None, }, RegionRoute { region: Region { id: 2.into(), name: "r2".to_string(), - partition: None, attrs: BTreeMap::new(), partition_expr: Default::default(), }, @@ -2066,6 +2065,7 @@ mod tests { leader_state: None, follower_peers: vec![], leader_down_since: None, + write_route_policy: None, }, ]; let table_info = new_test_table_info(); @@ -2504,6 +2504,7 @@ mod tests { follower_peers: vec![Peer::empty(5)], leader_state: None, leader_down_since: None, + write_route_policy: None, }, RegionRoute { region: Region::new_test(RegionId::new(table_id, 2)), @@ -2511,6 +2512,7 @@ mod tests { follower_peers: vec![Peer::empty(4)], leader_state: None, leader_down_since: None, + write_route_policy: None, }, RegionRoute { region: Region::new_test(RegionId::new(table_id, 3)), @@ -2518,6 +2520,7 @@ mod tests { follower_peers: vec![], leader_state: None, leader_down_since: None, + write_route_policy: None, }, ]), serialized_options, @@ -2561,6 +2564,7 @@ mod tests { follower_peers: vec![Peer::empty(5)], leader_state: None, leader_down_since: None, + write_route_policy: None, }, RegionRoute { region: Region::new_test(RegionId::new(table_id, 2)), @@ -2568,6 +2572,7 @@ mod tests { follower_peers: vec![Peer::empty(4)], leader_state: None, leader_down_since: None, + write_route_policy: None, }, RegionRoute { region: Region::new_test(RegionId::new(table_id, 3)), @@ -2575,6 +2580,7 @@ mod tests { follower_peers: vec![], leader_state: None, leader_down_since: None, + write_route_policy: None, }, ]), serialized_options, diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index 46ee23dce2..f8465c7ef6 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -941,7 +941,6 @@ mod tests { region: Region { id: RegionId::new(0, 1), name: "r1".to_string(), - partition: None, attrs: Default::default(), partition_expr: Default::default(), }, @@ -952,12 +951,12 @@ mod tests { follower_peers: vec![], leader_state: None, leader_down_since: None, + write_route_policy: None, }, RegionRoute { region: Region { id: RegionId::new(0, 1), name: "r1".to_string(), - partition: None, attrs: Default::default(), partition_expr: Default::default(), }, @@ -968,6 +967,7 @@ mod tests { follower_peers: vec![], leader_state: None, leader_down_since: None, + write_route_policy: None, }, ], max_region_number: 1, diff --git a/src/common/meta/src/rpc/router.rs b/src/common/meta/src/rpc/router.rs index de5c38a1d0..92627c1d8f 100644 --- a/src/common/meta/src/rpc/router.rs +++ b/src/common/meta/src/rpc/router.rs @@ -196,6 +196,7 @@ impl TableRoute { follower_peers, leader_state: None, leader_down_since: None, + write_route_policy: None, }); } @@ -258,6 +259,10 @@ pub struct RegionRoute { #[serde(default)] #[builder(default = "self.default_leader_down_since()")] pub leader_down_since: Option, + /// Special write routing behavior for this region. + #[builder(setter(into, strip_option), default)] + #[serde(default, skip_serializing_if = "Option::is_none")] + pub write_route_policy: Option, } impl RegionRouteBuilder { @@ -287,7 +292,40 @@ pub enum LeaderState { Staging, } +/// The write route policy for the region. +#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq)] +pub enum WriteRoutePolicy { + // The default policy. + Normal, + /// Ignores all writes for this region. + /// + /// This policy is typically used during region merge operations, such as repartitioning. + /// For example, when merging Region A and Region B into just Region B, + /// writes to Region A are ignored, while Region B accepts all writes originating from both regions. + IgnoreAllWrites, +} + impl RegionRoute { + /// Returns true if the region should ignore all writes. + pub fn is_ignore_all_writes(&self) -> bool { + matches!( + self.write_route_policy, + Some(WriteRoutePolicy::IgnoreAllWrites) + ) + } + + /// Marks this region as ignore-all for writes. + pub fn set_ignore_all_writes(&mut self) { + self.write_route_policy = Some(WriteRoutePolicy::IgnoreAllWrites); + } + + /// Clears ignore-all write policy and falls back to normal routing behavior. + pub fn clear_ignore_all_writes(&mut self) { + if self.write_route_policy == Some(WriteRoutePolicy::IgnoreAllWrites) { + self.write_route_policy = None; + } + } + /// Returns true if the Leader [`Region`] is downgraded. /// /// The following cases in which the [`Region`] will be downgraded. @@ -376,19 +414,57 @@ impl RegionRoutes { } } -#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq)] +#[derive(Debug, Clone, Default, PartialEq, Serialize)] pub struct Region { pub id: RegionId, pub name: String, pub attrs: BTreeMap, - - /// **Deprecated:** Use `partition_expr` instead. - pub partition: Option, - /// The partition expression of the region. - #[serde(default)] + /// The normalized partition expression of the region. pub partition_expr: String, } +#[derive(Debug, Deserialize)] +struct RegionDe { + id: RegionId, + name: String, + #[serde(default)] + attrs: BTreeMap, + #[serde(default)] + partition: Option, + #[serde(default)] + partition_expr: String, +} + +impl<'de> Deserialize<'de> for Region { + fn deserialize(deserializer: D) -> std::result::Result + where + D: Deserializer<'de>, + { + let de = RegionDe::deserialize(deserializer)?; + // Compatibility path for legacy serialized routes: prefer the normalized + // `partition_expr` field and only fall back to legacy `partition.value_list`. + let partition_expr = if de.partition_expr.is_empty() { + if let Some(LegacyPartition { value_list, .. }) = &de.partition { + value_list + .first() + .map(|expr| String::from_utf8_lossy(expr).to_string()) + .unwrap_or_default() + } else { + String::new() + } + } else { + de.partition_expr + }; + + Ok(Self { + id: de.id, + name: de.name, + attrs: de.attrs, + partition_expr, + }) + } +} + impl Region { #[cfg(any(test, feature = "testing"))] pub fn new_test(id: RegionId) -> Self { @@ -400,17 +476,7 @@ impl Region { /// Gets the partition expression of the region in compatible mode. pub fn partition_expr(&self) -> String { - if !self.partition_expr.is_empty() { - self.partition_expr.clone() - } else if let Some(LegacyPartition { value_list, .. }) = &self.partition { - if !value_list.is_empty() { - String::from_utf8_lossy(&value_list[0]).to_string() - } else { - "".to_string() - } - } else { - "".to_string() - } + self.partition_expr.clone() } } @@ -436,7 +502,6 @@ impl From for Region { Self { id: r.id.into(), name: r.name, - partition: None, partition_expr, attrs: r.attrs.into_iter().collect::>(), } @@ -519,13 +584,13 @@ mod tests { id: 2.into(), name: "r2".to_string(), attrs: BTreeMap::new(), - partition: None, partition_expr: "".to_string(), }, leader_peer: Some(Peer::new(1, "a1")), follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")], leader_state: None, leader_down_since: None, + write_route_policy: None, }; assert!(!region_route.is_leader_downgrading()); @@ -542,13 +607,13 @@ mod tests { id: 2.into(), name: "r2".to_string(), attrs: BTreeMap::new(), - partition: None, partition_expr: "".to_string(), }, leader_peer: Some(Peer::new(1, "a1")), follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")], leader_state: None, leader_down_since: None, + write_route_policy: None, }; let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}]}"#; @@ -565,13 +630,13 @@ mod tests { id: 2.into(), name: "r2".to_string(), attrs: BTreeMap::new(), - partition: None, partition_expr: "".to_string(), }, leader_peer: Some(Peer::new(1, "a1")), follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")], leader_state: Some(LeaderState::Downgrading), leader_down_since: None, + write_route_policy: None, }; let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_state":"Downgraded","leader_down_since":null}"#; let decoded: RegionRoute = serde_json::from_str(input).unwrap(); @@ -582,13 +647,13 @@ mod tests { id: 2.into(), name: "r2".to_string(), attrs: BTreeMap::new(), - partition: None, partition_expr: "".to_string(), }, leader_peer: Some(Peer::new(1, "a1")), follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")], leader_state: Some(LeaderState::Downgrading), leader_down_since: None, + write_route_policy: None, }; let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_status":"Downgraded","leader_down_since":null}"#; let decoded: RegionRoute = serde_json::from_str(input).unwrap(); @@ -599,13 +664,13 @@ mod tests { id: 2.into(), name: "r2".to_string(), attrs: BTreeMap::new(), - partition: None, partition_expr: "".to_string(), }, leader_peer: Some(Peer::new(1, "a1")), follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")], leader_state: Some(LeaderState::Downgrading), leader_down_since: None, + write_route_policy: None, }; let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_state":"Downgrading","leader_down_since":null}"#; let decoded: RegionRoute = serde_json::from_str(input).unwrap(); @@ -616,19 +681,65 @@ mod tests { id: 2.into(), name: "r2".to_string(), attrs: BTreeMap::new(), - partition: None, partition_expr: "".to_string(), }, leader_peer: Some(Peer::new(1, "a1")), follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")], leader_state: Some(LeaderState::Downgrading), leader_down_since: None, + write_route_policy: None, }; let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_status":"Downgrading","leader_down_since":null}"#; let decoded: RegionRoute = serde_json::from_str(input).unwrap(); assert_eq!(decoded, region_route); } + #[test] + fn test_region_route_write_route_policy_decode_compatibility() { + let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"}],"write_route_policy":"IgnoreAllWrites"}"#; + let decoded: RegionRoute = serde_json::from_str(input).unwrap(); + + assert!(decoded.is_ignore_all_writes()); + } + + #[test] + fn test_region_route_write_route_policy_default_not_serialized() { + let region_route = RegionRoute { + region: Region { + id: 2.into(), + name: "r2".to_string(), + attrs: BTreeMap::new(), + partition_expr: "".to_string(), + }, + leader_peer: Some(Peer::new(1, "a1")), + follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")], + leader_state: None, + leader_down_since: None, + write_route_policy: None, + }; + + let encoded = serde_json::to_string(®ion_route).unwrap(); + assert!(!encoded.contains("write_route_policy")); + } + + #[test] + fn test_region_route_write_route_policy_helpers() { + let mut region_route = RegionRoute { + region: Region::new_test(2.into()), + leader_peer: Some(Peer::new(1, "a1")), + follower_peers: vec![], + leader_state: None, + leader_down_since: None, + write_route_policy: None, + }; + + assert!(!region_route.is_ignore_all_writes()); + region_route.set_ignore_all_writes(); + assert!(region_route.is_ignore_all_writes()); + region_route.clear_ignore_all_writes(); + assert!(!region_route.is_ignore_all_writes()); + } + #[test] fn test_region_distribution() { let region_routes = vec![ @@ -637,26 +748,26 @@ mod tests { id: RegionId::new(1, 1), name: "r1".to_string(), attrs: BTreeMap::new(), - partition: None, partition_expr: "".to_string(), }, leader_peer: Some(Peer::new(1, "a1")), follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")], leader_state: None, leader_down_since: None, + write_route_policy: None, }, RegionRoute { region: Region { id: RegionId::new(1, 2), name: "r2".to_string(), attrs: BTreeMap::new(), - partition: None, partition_expr: "".to_string(), }, leader_peer: Some(Peer::new(2, "a2")), follower_peers: vec![Peer::new(1, "a1"), Peer::new(3, "a3")], leader_state: None, leader_down_since: None, + write_route_policy: None, }, ]; @@ -693,7 +804,6 @@ mod tests { let r2: Region = r.into(); assert_eq!(r2.partition_expr(), ""); - assert!(r2.partition.is_none()); let r3: PbRegion = r2.into(); assert_eq!(r3.partition.as_ref().unwrap().expression, ""); @@ -712,7 +822,6 @@ mod tests { let r2: Region = r.into(); assert_eq!(r2.partition_expr(), "{}"); - assert!(r2.partition.is_none()); let r3: PbRegion = r2.into(); assert_eq!(r3.partition.as_ref().unwrap().expression, "{}"); @@ -731,7 +840,6 @@ mod tests { let r2: Region = r.into(); assert_eq!(r2.partition_expr(), "a>b"); - assert!(r2.partition.is_none()); let r3: PbRegion = r2.into(); assert_eq!(r3.partition.as_ref().unwrap().expression, "a>b"); diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index da09144d24..0f8ebbbc1c 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -289,7 +289,7 @@ mod tests { HeartbeatMailbox, IncomingMessage, MailboxRef, MessageMeta, }; use common_meta::instruction::{ - DowngradeRegion, EnterStagingRegion, OpenRegion, UpgradeRegion, + DowngradeRegion, EnterStagingRegion, OpenRegion, StagingPartitionDirective, UpgradeRegion, }; use common_meta::kv_backend::memory::MemoryKvBackend; use mito2::config::MitoConfig; @@ -382,7 +382,7 @@ mod tests { // Enter staging region let instruction = Instruction::EnterStagingRegions(vec![EnterStagingRegion { region_id, - partition_expr: "".to_string(), + partition_directive: StagingPartitionDirective::UpdatePartitionExpr("".to_string()), }]); assert!( heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((meta, instruction))) diff --git a/src/datanode/src/heartbeat/handler/apply_staging_manifest.rs b/src/datanode/src/heartbeat/handler/apply_staging_manifest.rs index 7f45703355..e612d4b2c6 100644 --- a/src/datanode/src/heartbeat/handler/apply_staging_manifest.rs +++ b/src/datanode/src/heartbeat/handler/apply_staging_manifest.rs @@ -125,7 +125,7 @@ mod tests { use partition::expr::{PartitionExpr, col}; use store_api::path_utils::table_dir; use store_api::region_engine::RegionRole; - use store_api::region_request::EnterStagingRequest; + use store_api::region_request::{EnterStagingRequest, StagingPartitionDirective}; use store_api::storage::RegionId; use super::*; @@ -216,7 +216,9 @@ mod tests { .handle_request( region_id, RegionRequest::EnterStaging(EnterStagingRequest { - partition_expr: partition_expr.as_json_str().unwrap(), + partition_directive: StagingPartitionDirective::UpdatePartitionExpr( + partition_expr.as_json_str().unwrap(), + ), }), ) .await diff --git a/src/datanode/src/heartbeat/handler/enter_staging.rs b/src/datanode/src/heartbeat/handler/enter_staging.rs index e28e067f80..06f4e9aaeb 100644 --- a/src/datanode/src/heartbeat/handler/enter_staging.rs +++ b/src/datanode/src/heartbeat/handler/enter_staging.rs @@ -14,10 +14,14 @@ use common_meta::instruction::{ EnterStagingRegion, EnterStagingRegionReply, EnterStagingRegionsReply, InstructionReply, + StagingPartitionDirective as InstructionStagingPartitionDirective, }; use common_telemetry::{error, warn}; use futures::future::join_all; -use store_api::region_request::{EnterStagingRequest, RegionRequest}; +use store_api::region_request::{ + EnterStagingRequest, RegionRequest, + StagingPartitionDirective as RequestStagingPartitionDirective, +}; use crate::heartbeat::handler::{HandlerContext, InstructionHandler}; @@ -48,14 +52,9 @@ impl EnterStagingRegionsHandler { ctx: &HandlerContext, EnterStagingRegion { region_id, - partition_expr, + partition_directive, }: EnterStagingRegion, ) -> EnterStagingRegionReply { - common_telemetry::info!( - "Datanode received enter staging region: {}, partition_expr: {}", - region_id, - partition_expr - ); let Some(writable) = ctx.region_server.is_region_leader(region_id) else { warn!("Region: {} is not found", region_id); return EnterStagingRegionReply { @@ -75,11 +74,24 @@ impl EnterStagingRegionsHandler { }; } + common_telemetry::info!("Datanode received enter staging region: {}", region_id); + + let partition_directive = match partition_directive { + InstructionStagingPartitionDirective::UpdatePartitionExpr(expr) => { + RequestStagingPartitionDirective::UpdatePartitionExpr(expr) + } + InstructionStagingPartitionDirective::RejectAllWrites => { + RequestStagingPartitionDirective::RejectAllWrites + } + }; + match ctx .region_server .handle_request( region_id, - RegionRequest::EnterStaging(EnterStagingRequest { partition_expr }), + RegionRequest::EnterStaging(EnterStagingRequest { + partition_directive, + }), ) .await { @@ -106,7 +118,9 @@ impl EnterStagingRegionsHandler { mod tests { use std::sync::Arc; - use common_meta::instruction::EnterStagingRegion; + use common_meta::instruction::{ + EnterStagingRegion, StagingPartitionDirective as InstructionStagingPartitionDirective, + }; use common_meta::kv_backend::memory::MemoryKvBackend; use mito2::config::MitoConfig; use mito2::engine::MITO_ENGINE_NAME; @@ -136,7 +150,9 @@ mod tests { &handler_context, vec![EnterStagingRegion { region_id, - partition_expr: "".to_string(), + partition_directive: InstructionStagingPartitionDirective::UpdatePartitionExpr( + "".to_string(), + ), }], ) .await @@ -165,7 +181,9 @@ mod tests { &handler_context, vec![EnterStagingRegion { region_id, - partition_expr: "".to_string(), + partition_directive: InstructionStagingPartitionDirective::UpdatePartitionExpr( + "".to_string(), + ), }], ) .await @@ -204,7 +222,9 @@ mod tests { &handler_context, vec![EnterStagingRegion { region_id, - partition_expr: PARTITION_EXPR.to_string(), + partition_directive: InstructionStagingPartitionDirective::UpdatePartitionExpr( + PARTITION_EXPR.to_string(), + ), }], ) .await @@ -221,7 +241,9 @@ mod tests { &handler_context, vec![EnterStagingRegion { region_id, - partition_expr: PARTITION_EXPR.to_string(), + partition_directive: InstructionStagingPartitionDirective::UpdatePartitionExpr( + PARTITION_EXPR.to_string(), + ), }], ) .await @@ -238,7 +260,9 @@ mod tests { &handler_context, vec![EnterStagingRegion { region_id, - partition_expr: "".to_string(), + partition_directive: InstructionStagingPartitionDirective::UpdatePartitionExpr( + "".to_string(), + ), }], ) .await @@ -249,4 +273,32 @@ mod tests { assert!(reply.error.is_some()); assert!(!reply.ready); } + + #[tokio::test] + async fn test_enter_staging_reject_all_writes() { + let mut region_server = mock_region_server(); + let region_id = RegionId::new(1024, 1); + let mut engine_env = TestEnv::new().await; + let engine = engine_env.create_engine(MitoConfig::default()).await; + region_server.register_engine(Arc::new(engine.clone())); + prepare_region(®ion_server).await; + + let kv_backend = Arc::new(MemoryKvBackend::new()); + let handler_context = HandlerContext::new_for_test(region_server, kv_backend); + let replies = EnterStagingRegionsHandler + .handle( + &handler_context, + vec![EnterStagingRegion { + region_id, + partition_directive: InstructionStagingPartitionDirective::RejectAllWrites, + }], + ) + .await + .unwrap(); + let replies = replies.expect_enter_staging_regions_reply(); + let reply = &replies[0]; + assert!(reply.exists); + assert!(reply.ready); + assert!(reply.error.is_none()); + } } diff --git a/src/datanode/src/heartbeat/handler/remap_manifest.rs b/src/datanode/src/heartbeat/handler/remap_manifest.rs index 6fc44cf01e..ea27f418ab 100644 --- a/src/datanode/src/heartbeat/handler/remap_manifest.rs +++ b/src/datanode/src/heartbeat/handler/remap_manifest.rs @@ -105,7 +105,9 @@ mod tests { use partition::expr::{PartitionExpr, col}; use store_api::path_utils::table_dir; use store_api::region_engine::RegionRole; - use store_api::region_request::{EnterStagingRequest, RegionRequest}; + use store_api::region_request::{ + EnterStagingRequest, RegionRequest, StagingPartitionDirective, + }; use store_api::storage::RegionId; use crate::heartbeat::handler::remap_manifest::RemapManifestHandler; @@ -192,7 +194,9 @@ mod tests { .handle_request( region_id, RegionRequest::EnterStaging(EnterStagingRequest { - partition_expr: partition_expr.as_json_str().unwrap(), + partition_directive: StagingPartitionDirective::UpdatePartitionExpr( + partition_expr.as_json_str().unwrap(), + ), }), ) .await diff --git a/src/datanode/src/partition_expr_fetcher.rs b/src/datanode/src/partition_expr_fetcher.rs index 1d0b81e32c..3fab16eb37 100644 --- a/src/datanode/src/partition_expr_fetcher.rs +++ b/src/datanode/src/partition_expr_fetcher.rs @@ -75,7 +75,6 @@ mod tests { id: region_id, name: "r".to_string(), attrs: Default::default(), - partition: None, partition_expr: expr_json.to_string(), }, ..Default::default() diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index ea648bfe7f..fef84ef0db 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -337,6 +337,7 @@ mod test { follower_peers: vec![follower_peer.clone()], leader_state: Some(LeaderState::Downgrading), leader_down_since: Some(1), + write_route_policy: None, }, RegionRoute { region: Region::new_test(another_region_id), diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs index 153e9913ed..db70e3e166 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs @@ -337,6 +337,7 @@ mod tests { follower_peers: vec![Peer::empty(2), Peer::empty(3)], leader_state: Some(LeaderState::Downgrading), leader_down_since: Some(current_time_millis()), + write_route_policy: None, }]; env.create_physical_table_metadata(table_info, region_routes) @@ -376,6 +377,7 @@ mod tests { follower_peers: vec![Peer::empty(2), Peer::empty(3)], leader_state: None, leader_down_since: None, + write_route_policy: None, }]; env.create_physical_table_metadata(table_info, region_routes) @@ -403,6 +405,7 @@ mod tests { follower_peers: vec![Peer::empty(2), Peer::empty(3)], leader_state: None, leader_down_since: None, + write_route_policy: None, }]; env.create_physical_table_metadata(table_info, region_routes) @@ -431,6 +434,7 @@ mod tests { follower_peers: vec![Peer::empty(2), Peer::empty(3)], leader_state: Some(LeaderState::Downgrading), leader_down_since: None, + write_route_policy: None, }]; env.create_physical_table_metadata(table_info, region_routes) diff --git a/src/meta-srv/src/procedure/region_migration/utils.rs b/src/meta-srv/src/procedure/region_migration/utils.rs index a7c9e93871..df2e8014e2 100644 --- a/src/meta-srv/src/procedure/region_migration/utils.rs +++ b/src/meta-srv/src/procedure/region_migration/utils.rs @@ -274,6 +274,7 @@ mod tests { follower_peers: vec![], leader_state: None, leader_down_since: None, + write_route_policy: None, }; update_result_with_region_route(&mut result, ®ion_route, 2, 1).unwrap(); assert_eq!( @@ -293,6 +294,7 @@ mod tests { follower_peers: vec![], leader_state: None, leader_down_since: None, + write_route_policy: None, }; update_result_with_region_route(&mut result, ®ion_route, 2, 3).unwrap(); assert_eq!( @@ -312,6 +314,7 @@ mod tests { follower_peers: vec![Peer::empty(2)], leader_state: None, leader_down_since: None, + write_route_policy: None, }; update_result_with_region_route(&mut result, ®ion_route, 1, 2).unwrap(); assert_eq!( @@ -331,6 +334,7 @@ mod tests { follower_peers: vec![], leader_state: None, leader_down_since: None, + write_route_policy: None, }; update_result_with_region_route(&mut result, ®ion_route, 1, 3).unwrap(); assert_eq!( @@ -350,6 +354,7 @@ mod tests { follower_peers: vec![], leader_state: None, leader_down_since: None, + write_route_policy: None, }; let err = update_result_with_region_route(&mut result, ®ion_route, 1, 3).unwrap_err(); assert_matches!(err, Error::Unexpected { .. }); @@ -438,6 +443,7 @@ mod tests { follower_peers: vec![], leader_state: None, leader_down_since: None, + write_route_policy: None, }, // Leader peer changed. RegionRoute { @@ -446,6 +452,7 @@ mod tests { follower_peers: vec![], leader_state: None, leader_down_since: None, + write_route_policy: None, }, // Peer conflict. RegionRoute { @@ -454,6 +461,7 @@ mod tests { follower_peers: vec![Peer::empty(2)], leader_state: None, leader_down_since: None, + write_route_policy: None, }, // Normal case. RegionRoute { @@ -462,6 +470,7 @@ mod tests { follower_peers: vec![], leader_state: None, leader_down_since: None, + write_route_policy: None, }, ])), ) diff --git a/src/meta-srv/src/procedure/repartition/group/enter_staging_region.rs b/src/meta-srv/src/procedure/repartition/group/enter_staging_region.rs index a9cf55d16d..5119f7194a 100644 --- a/src/meta-srv/src/procedure/repartition/group/enter_staging_region.rs +++ b/src/meta-srv/src/procedure/repartition/group/enter_staging_region.rs @@ -19,6 +19,7 @@ use std::time::{Duration, Instant}; use api::v1::meta::MailboxMessage; use common_meta::instruction::{ EnterStagingRegionReply, EnterStagingRegionsReply, Instruction, InstructionReply, + StagingPartitionDirective, }; use common_meta::peer::Peer; use common_procedure::{Context as ProcedureContext, Status}; @@ -26,6 +27,7 @@ use common_telemetry::info; use futures::future::{join_all, try_join_all}; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt, ensure}; +use store_api::storage::RegionId; use crate::error::{self, Error, Result}; use crate::handler::HeartbeatMailbox; @@ -33,7 +35,7 @@ use crate::procedure::repartition::group::remap_manifest::RemapManifest; use crate::procedure::repartition::group::utils::{ HandleMultipleResult, group_region_routes_by_peer, handle_multiple_results, }; -use crate::procedure::repartition::group::{Context, GroupPrepareResult, State}; +use crate::procedure::repartition::group::{Context, GroupId, GroupPrepareResult, State}; use crate::procedure::repartition::plan::RegionDescriptor; use crate::procedure::utils::{self, ErrorStrategy}; use crate::service::mailbox::{Channel, MailboxRef}; @@ -71,8 +73,10 @@ impl State for EnterStagingRegion { impl EnterStagingRegion { fn build_enter_staging_instructions( + group_id: GroupId, prepare_result: &GroupPrepareResult, targets: &[RegionDescriptor], + pending_deallocate_region_ids: &[RegionId], ) -> Result>> { let target_partition_expr_by_region = targets .iter() @@ -96,11 +100,32 @@ impl EnterStagingRegion { .map(|region_id| common_meta::instruction::EnterStagingRegion { region_id, // Safety: the target_routes is constructed from the targets, so the region_id is always present in the map. - partition_expr: target_partition_expr_by_region[®ion_id].clone(), + partition_directive: StagingPartitionDirective::UpdatePartitionExpr( + target_partition_expr_by_region[®ion_id].clone(), + ), }) .collect(); instructions.insert(peer.clone(), enter_staging_regions); } + // For pending deallocate regions, set the partition rule to RejectAllWrites. + for region_id in pending_deallocate_region_ids { + let peer = prepare_result + .source_routes + .iter() + .find(|route| route.region.id == *region_id) + .map(|route| route.leader_peer.as_ref().unwrap()) + .context(error::RepartitionSourceRegionMissingSnafu { + group_id, + region_id: *region_id, + })?; + instructions + .entry(peer.clone()) + .or_insert_with(Vec::new) + .push(common_meta::instruction::EnterStagingRegion { + region_id: *region_id, + partition_directive: StagingPartitionDirective::RejectAllWrites, + }); + } Ok(instructions) } @@ -111,7 +136,12 @@ impl EnterStagingRegion { // Safety: the group prepare result is set in the RepartitionStart state. let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap(); let targets = &ctx.persistent_ctx.targets; - let instructions = Self::build_enter_staging_instructions(prepare_result, targets)?; + let instructions = Self::build_enter_staging_instructions( + group_id, + prepare_result, + targets, + &ctx.persistent_ctx.pending_deallocate_region_ids, + )?; let target_region_count = targets.len(); let peer_count = instructions.len(); let operation_timeout = @@ -403,9 +433,11 @@ mod tests { use std::assert_matches::assert_matches; use std::time::Duration; + use common_meta::instruction::StagingPartitionDirective; use common_meta::peer::Peer; use common_meta::rpc::router::{Region, RegionRoute}; use store_api::storage::RegionId; + use uuid::Uuid; use crate::error::{self, Error}; use crate::procedure::repartition::group::GroupPrepareResult; @@ -453,9 +485,13 @@ mod tests { central_region_datanode: Peer::empty(1), }; let targets = test_targets(); - let instructions = - EnterStagingRegion::build_enter_staging_instructions(&prepare_result, &targets) - .unwrap(); + let instructions = EnterStagingRegion::build_enter_staging_instructions( + Uuid::new_v4(), + &prepare_result, + &targets, + &[], + ) + .unwrap(); assert_eq!(instructions.len(), 2); let instruction_1 = instructions.get(&Peer::empty(1)).unwrap().clone(); @@ -463,7 +499,9 @@ mod tests { instruction_1, vec![common_meta::instruction::EnterStagingRegion { region_id: RegionId::new(table_id, 1), - partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(), + partition_directive: StagingPartitionDirective::UpdatePartitionExpr( + range_expr("x", 0, 10).as_json_str().unwrap(), + ), }] ); let instruction_2 = instructions.get(&Peer::empty(2)).unwrap().clone(); @@ -471,7 +509,9 @@ mod tests { instruction_2, vec![common_meta::instruction::EnterStagingRegion { region_id: RegionId::new(table_id, 2), - partition_expr: range_expr("x", 10, 20).as_json_str().unwrap(), + partition_directive: StagingPartitionDirective::UpdatePartitionExpr( + range_expr("x", 10, 20).as_json_str().unwrap(), + ), }] ); } @@ -483,7 +523,9 @@ mod tests { let peer = Peer::empty(1); let enter_staging_regions = vec![common_meta::instruction::EnterStagingRegion { region_id: RegionId::new(1024, 1), - partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(), + partition_directive: StagingPartitionDirective::UpdatePartitionExpr( + range_expr("x", 0, 10).as_json_str().unwrap(), + ), }]; let timeout = Duration::from_secs(10); @@ -512,7 +554,9 @@ mod tests { let peer = Peer::empty(1); let enter_staging_regions = vec![common_meta::instruction::EnterStagingRegion { region_id: RegionId::new(1024, 1), - partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(), + partition_directive: StagingPartitionDirective::UpdatePartitionExpr( + range_expr("x", 0, 10).as_json_str().unwrap(), + ), }]; let timeout = Duration::from_secs(10); @@ -543,7 +587,9 @@ mod tests { let peer = Peer::empty(1); let enter_staging_regions = vec![common_meta::instruction::EnterStagingRegion { region_id: RegionId::new(1024, 1), - partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(), + partition_directive: StagingPartitionDirective::UpdatePartitionExpr( + range_expr("x", 0, 10).as_json_str().unwrap(), + ), }]; let timeout = Duration::from_secs(10); @@ -579,7 +625,9 @@ mod tests { let peer = Peer::empty(1); let enter_staging_regions = vec![common_meta::instruction::EnterStagingRegion { region_id: RegionId::new(1024, 1), - partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(), + partition_directive: StagingPartitionDirective::UpdatePartitionExpr( + range_expr("x", 0, 10).as_json_str().unwrap(), + ), }]; let timeout = Duration::from_secs(10); diff --git a/src/meta-srv/src/procedure/repartition/group/repartition_start.rs b/src/meta-srv/src/procedure/repartition/group/repartition_start.rs index 3c8f011584..72de2f2934 100644 --- a/src/meta-srv/src/procedure/repartition/group/repartition_start.rs +++ b/src/meta-srv/src/procedure/repartition/group/repartition_start.rs @@ -37,13 +37,13 @@ fn ensure_region_route_expr_match( region_route: &RegionRoute, region_descriptor: &RegionDescriptor, ) -> Result { - let actual = ®ion_route.region.partition_expr; + let actual = region_route.region.partition_expr(); let expected = region_descriptor .partition_expr .as_json_str() .context(error::SerializePartitionExprSnafu)?; ensure!( - actual == &expected, + actual == expected, error::PartitionExprMismatchSnafu { region_id: region_route.region.id, expected, @@ -304,4 +304,51 @@ mod tests { .unwrap_err(); assert_matches!(err, Error::RepartitionTargetRegionMissing { .. }); } + + #[test] + fn test_ensure_route_present_legacy_partition_expr_source() { + let source_region = RegionDescriptor { + region_id: RegionId::new(1024, 1), + partition_expr: range_expr("x", 0, 100), + }; + let target_region = RegionDescriptor { + region_id: RegionId::new(1024, 2), + partition_expr: range_expr("x", 0, 10), + }; + let legacy_partition_expr = range_expr("x", 0, 100).as_json_str().unwrap(); + let legacy_region_json = serde_json::json!({ + "id": RegionId::new(1024, 1).as_u64(), + "name": "", + "partition": { + "column_list": ["x"], + "value_list": [legacy_partition_expr] + }, + "partition_expr": "", + "attrs": {} + }); + + let region_routes = vec![ + RegionRoute { + region: serde_json::from_value(legacy_region_json).unwrap(), + leader_peer: Some(Peer::empty(1)), + ..Default::default() + }, + RegionRoute { + region: Region { + id: RegionId::new(1024, 2), + ..Default::default() + }, + leader_peer: Some(Peer::empty(1)), + ..Default::default() + }, + ]; + + let result = RepartitionStart::ensure_route_present( + Uuid::new_v4(), + ®ion_routes, + &[source_region], + &[target_region], + ); + assert!(result.is_ok()); + } } diff --git a/src/meta-srv/src/procedure/repartition/group/update_metadata/apply_staging_region.rs b/src/meta-srv/src/procedure/repartition/group/update_metadata/apply_staging_region.rs index 17c730a3c2..ecde5f0507 100644 --- a/src/meta-srv/src/procedure/repartition/group/update_metadata/apply_staging_region.rs +++ b/src/meta-srv/src/procedure/repartition/group/update_metadata/apply_staging_region.rs @@ -34,6 +34,7 @@ impl UpdateMetadata { group_id: GroupId, sources: &[RegionDescriptor], targets: &[RegionDescriptor], + pending_deallocate_region_ids: &[store_api::storage::RegionId], current_region_routes: &[RegionRoute], ) -> Result> { let mut region_routes = current_region_routes.to_vec(); @@ -54,6 +55,7 @@ impl UpdateMetadata { .as_json_str() .context(error::SerializePartitionExprSnafu)?; region_route.set_leader_staging(); + region_route.clear_ignore_all_writes(); } for source in sources { @@ -64,6 +66,10 @@ impl UpdateMetadata { }, )?; region_route.set_leader_staging(); + if pending_deallocate_region_ids.contains(&source.region_id) { + // When a region is pending deallocation, it should ignore all writes. + region_route.set_ignore_all_writes(); + } } Ok(region_routes) @@ -86,6 +92,7 @@ impl UpdateMetadata { group_id, &ctx.persistent_ctx.sources, &ctx.persistent_ctx.targets, + &ctx.persistent_ctx.pending_deallocate_region_ids, region_routes, )?; @@ -169,6 +176,7 @@ mod tests { group_id, &[source_region], &[target_region], + &[], ®ion_routes, ) .unwrap(); @@ -184,4 +192,53 @@ mod tests { assert!(new_region_routes[1].is_leader_staging()); assert!(!new_region_routes[2].is_leader_staging()); } + + #[test] + fn test_generate_region_routes_mark_pending_deallocate_reject_all_writes() { + let group_id = Uuid::new_v4(); + let table_id = 1024; + let pending_deallocate_region_id = RegionId::new(table_id, 1); + let region_routes = vec![ + RegionRoute { + region: Region { + id: pending_deallocate_region_id, + partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(), + ..Default::default() + }, + leader_peer: Some(Peer::empty(1)), + ..Default::default() + }, + RegionRoute { + region: Region { + id: RegionId::new(table_id, 2), + partition_expr: String::new(), + ..Default::default() + }, + leader_peer: Some(Peer::empty(1)), + ..Default::default() + }, + ]; + let source_region = RegionDescriptor { + region_id: pending_deallocate_region_id, + partition_expr: range_expr("x", 0, 100), + }; + let target_region = RegionDescriptor { + region_id: RegionId::new(table_id, 2), + partition_expr: range_expr("x", 0, 10), + }; + + let new_region_routes = UpdateMetadata::apply_staging_region_routes( + group_id, + &[source_region], + &[target_region], + &[pending_deallocate_region_id], + ®ion_routes, + ) + .unwrap(); + + assert!(new_region_routes[0].is_leader_staging()); + assert!(new_region_routes[0].is_ignore_all_writes()); + assert!(new_region_routes[1].is_leader_staging()); + assert!(!new_region_routes[1].is_ignore_all_writes()); + } } diff --git a/src/meta-srv/src/procedure/repartition/group/update_metadata/exit_staging_region.rs b/src/meta-srv/src/procedure/repartition/group/update_metadata/exit_staging_region.rs index 4d35180a87..f8e505f0ff 100644 --- a/src/meta-srv/src/procedure/repartition/group/update_metadata/exit_staging_region.rs +++ b/src/meta-srv/src/procedure/repartition/group/update_metadata/exit_staging_region.rs @@ -104,3 +104,65 @@ impl UpdateMetadata { Ok(()) } } + +#[cfg(test)] +mod tests { + use common_meta::peer::Peer; + use common_meta::rpc::router::{LeaderState, Region, RegionRoute}; + use store_api::storage::RegionId; + use uuid::Uuid; + + use crate::procedure::repartition::group::update_metadata::UpdateMetadata; + use crate::procedure::repartition::plan::RegionDescriptor; + use crate::procedure::repartition::test_util::range_expr; + + #[test] + fn test_exit_staging_region_routes_keep_reject_all_writes() { + let group_id = Uuid::new_v4(); + let table_id = 1024; + let source_region = RegionDescriptor { + region_id: RegionId::new(table_id, 1), + partition_expr: range_expr("x", 0, 100), + }; + let target_region = RegionDescriptor { + region_id: RegionId::new(table_id, 2), + partition_expr: range_expr("x", 0, 50), + }; + let mut source_route = RegionRoute { + region: Region { + id: source_region.region_id, + partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(), + ..Default::default() + }, + leader_peer: Some(Peer::empty(1)), + leader_state: Some(LeaderState::Staging), + ..Default::default() + }; + source_route.set_ignore_all_writes(); + + let mut target_route = RegionRoute { + region: Region { + id: target_region.region_id, + partition_expr: range_expr("x", 0, 50).as_json_str().unwrap(), + ..Default::default() + }, + leader_peer: Some(Peer::empty(1)), + leader_state: Some(LeaderState::Staging), + ..Default::default() + }; + target_route.set_ignore_all_writes(); + + let new_region_routes = UpdateMetadata::exit_staging_region_routes( + group_id, + &[source_region], + &[target_region], + &[source_route, target_route], + ) + .unwrap(); + + assert!(!new_region_routes[0].is_leader_staging()); + assert!(new_region_routes[0].is_ignore_all_writes()); + assert!(!new_region_routes[1].is_leader_staging()); + assert!(new_region_routes[1].is_ignore_all_writes()); + } +} diff --git a/src/meta-srv/src/procedure/repartition/group/update_metadata/rollback_staging_region.rs b/src/meta-srv/src/procedure/repartition/group/update_metadata/rollback_staging_region.rs index b9a50e7075..e9bef4cf8e 100644 --- a/src/meta-srv/src/procedure/repartition/group/update_metadata/rollback_staging_region.rs +++ b/src/meta-srv/src/procedure/repartition/group/update_metadata/rollback_staging_region.rs @@ -50,6 +50,7 @@ impl UpdateMetadata { )?; region_route.region.partition_expr = source.region.partition_expr.clone(); region_route.clear_leader_staging(); + region_route.clear_ignore_all_writes(); } for target in target_routes { @@ -125,15 +126,19 @@ mod tests { let group_id = Uuid::new_v4(); let table_id = 1024; let region_routes = vec![ - RegionRoute { - region: Region { - id: RegionId::new(table_id, 1), - partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(), + { + let mut route = RegionRoute { + region: Region { + id: RegionId::new(table_id, 1), + partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(), + ..Default::default() + }, + leader_peer: Some(Peer::empty(1)), + leader_state: Some(LeaderState::Staging), ..Default::default() - }, - leader_peer: Some(Peer::empty(1)), - leader_state: Some(LeaderState::Staging), - ..Default::default() + }; + route.set_ignore_all_writes(); + route }, RegionRoute { region: Region { @@ -182,11 +187,13 @@ mod tests { ) .unwrap(); assert!(!new_region_routes[0].is_leader_staging()); + assert!(!new_region_routes[0].is_ignore_all_writes()); assert_eq!( new_region_routes[0].region.partition_expr, range_expr("x", 0, 20).as_json_str().unwrap(), ); assert!(!new_region_routes[1].is_leader_staging()); + assert!(!new_region_routes[1].is_ignore_all_writes()); assert!(new_region_routes[2].is_leader_downgrading()); } } diff --git a/src/meta-srv/src/procedure/repartition/utils.rs b/src/meta-srv/src/procedure/repartition/utils.rs index ddeb974eef..1bda85699b 100644 --- a/src/meta-srv/src/procedure/repartition/utils.rs +++ b/src/meta-srv/src/procedure/repartition/utils.rs @@ -145,6 +145,7 @@ mod tests { follower_peers: vec![], leader_state: None, leader_down_since: None, + write_route_policy: None, } } diff --git a/src/meta-srv/src/test_util.rs b/src/meta-srv/src/test_util.rs index 20ba6e2091..1d9437ef12 100644 --- a/src/meta-srv/src/test_util.rs +++ b/src/meta-srv/src/test_util.rs @@ -39,6 +39,7 @@ pub(crate) fn new_region_route(region_id: u64, peers: &[Peer], leader_node: u64) follower_peers: vec![], leader_state: None, leader_down_since: None, + write_route_policy: None, } } diff --git a/src/metric-engine/src/engine/put.rs b/src/metric-engine/src/engine/put.rs index 5f9c58277a..9251605aea 100644 --- a/src/metric-engine/src/engine/put.rs +++ b/src/metric-engine/src/engine/put.rs @@ -186,7 +186,7 @@ impl MetricEngineInner { merged_version == request_version, InvalidRequestSnafu { region_id: physical_region_id, - reason: "inconsistent partition rule version in batch" + reason: "inconsistent partition expr version in batch" } ); } else { @@ -312,7 +312,7 @@ impl MetricEngineInner { merged_version == request_version, InvalidRequestSnafu { region_id: logical_region_id, - reason: "inconsistent partition rule version in batch" + reason: "inconsistent partition expr version in batch" } ); } else { @@ -569,7 +569,9 @@ mod tests { }; use store_api::path_utils::table_dir; use store_api::region_engine::RegionEngine; - use store_api::region_request::{EnterStagingRequest, RegionRequest}; + use store_api::region_request::{ + EnterStagingRequest, RegionRequest, StagingPartitionDirective, + }; use store_api::storage::ScanRequest; use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME; @@ -1200,7 +1202,9 @@ mod tests { .handle_request( physical_region_id, RegionRequest::EnterStaging(EnterStagingRequest { - partition_expr: partition_expr.clone(), + partition_directive: StagingPartitionDirective::UpdatePartitionExpr( + partition_expr.clone(), + ), }), ) .await diff --git a/src/mito2/src/engine/apply_staging_manifest_test.rs b/src/mito2/src/engine/apply_staging_manifest_test.rs index cf569f6fb4..6904fbd624 100644 --- a/src/mito2/src/engine/apply_staging_manifest_test.rs +++ b/src/mito2/src/engine/apply_staging_manifest_test.rs @@ -26,7 +26,7 @@ use store_api::region_engine::{ }; use store_api::region_request::{ ApplyStagingManifestRequest, EnterStagingRequest, RegionFlushRequest, RegionPutRequest, - RegionRequest, + RegionRequest, StagingPartitionDirective, }; use store_api::storage::{FileId, RegionId}; @@ -141,7 +141,9 @@ async fn test_apply_staging_manifest_mismatched_partition_expr_with_format(flat_ .handle_request( region_id, RegionRequest::EnterStaging(EnterStagingRequest { - partition_expr: range_expr("x", 0, 50).as_json_str().unwrap(), + partition_directive: StagingPartitionDirective::UpdatePartitionExpr( + range_expr("x", 0, 50).as_json_str().unwrap(), + ), }), ) .await @@ -292,7 +294,9 @@ async fn test_apply_staging_manifest_success_with_format(flat_format: bool) { .handle_request( new_region_id_1, RegionRequest::EnterStaging(EnterStagingRequest { - partition_expr: range_expr("tag_0", 0, 50).as_json_str().unwrap(), + partition_directive: StagingPartitionDirective::UpdatePartitionExpr( + range_expr("tag_0", 0, 50).as_json_str().unwrap(), + ), }), ) .await @@ -417,7 +421,9 @@ async fn test_apply_staging_manifest_invalid_files_to_add_with_format(flat_forma .handle_request( region_id, RegionRequest::EnterStaging(EnterStagingRequest { - partition_expr: range_expr("tag_0", 0, 50).as_json_str().unwrap(), + partition_directive: StagingPartitionDirective::UpdatePartitionExpr( + range_expr("tag_0", 0, 50).as_json_str().unwrap(), + ), }), ) .await @@ -510,7 +516,9 @@ async fn test_apply_staging_manifest_change_edit_different_columns_fails_with_fo .handle_request( region_id, RegionRequest::EnterStaging(EnterStagingRequest { - partition_expr: partition_expr.clone(), + partition_directive: StagingPartitionDirective::UpdatePartitionExpr( + partition_expr.clone(), + ), }), ) .await @@ -629,7 +637,9 @@ async fn test_apply_staging_manifest_preserves_unflushed_memtable_with_format(fl .handle_request( region_id, RegionRequest::EnterStaging(EnterStagingRequest { - partition_expr: partition_expr.clone(), + partition_directive: StagingPartitionDirective::UpdatePartitionExpr( + partition_expr.clone(), + ), }), ) .await @@ -740,7 +750,9 @@ async fn test_split_repartition_causes_duplicate_data() { .handle_request( source_region_id, RegionRequest::EnterStaging(EnterStagingRequest { - partition_expr: target_partition_expr_1.as_json_str().unwrap(), + partition_directive: StagingPartitionDirective::UpdatePartitionExpr( + target_partition_expr_1.as_json_str().unwrap(), + ), }), ) .await @@ -784,7 +796,9 @@ async fn test_split_repartition_causes_duplicate_data() { .handle_request( target_region_id_2, RegionRequest::EnterStaging(EnterStagingRequest { - partition_expr: target_partition_expr_2.as_json_str().unwrap(), + partition_directive: StagingPartitionDirective::UpdatePartitionExpr( + target_partition_expr_2.as_json_str().unwrap(), + ), }), ) .await @@ -952,7 +966,9 @@ async fn test_merge_repartition_data_integrity() { .handle_request( target_region_id, RegionRequest::EnterStaging(EnterStagingRequest { - partition_expr: target_partition_expr.as_json_str().unwrap(), + partition_directive: StagingPartitionDirective::UpdatePartitionExpr( + target_partition_expr.as_json_str().unwrap(), + ), }), ) .await diff --git a/src/mito2/src/engine/partition_filter_test.rs b/src/mito2/src/engine/partition_filter_test.rs index da3d472c99..fdea7d547f 100644 --- a/src/mito2/src/engine/partition_filter_test.rs +++ b/src/mito2/src/engine/partition_filter_test.rs @@ -19,7 +19,9 @@ use common_recordbatch::RecordBatches; use datatypes::value::Value; use partition::expr::col; use store_api::region_engine::RegionEngine; -use store_api::region_request::{EnterStagingRequest, RegionFlushRequest, RegionRequest}; +use store_api::region_request::{ + EnterStagingRequest, RegionFlushRequest, RegionRequest, StagingPartitionDirective, +}; use store_api::storage::{RegionId, ScanRequest}; use crate::config::MitoConfig; @@ -101,7 +103,9 @@ async fn test_partition_filter_basic_with_format(flat_format: bool) { .handle_request( region_id, RegionRequest::EnterStaging(EnterStagingRequest { - partition_expr: new_partition_expr.clone(), + partition_directive: StagingPartitionDirective::UpdatePartitionExpr( + new_partition_expr.clone(), + ), }), ) .await diff --git a/src/mito2/src/engine/staging_test.rs b/src/mito2/src/engine/staging_test.rs index a390eea681..2e9c5045ff 100644 --- a/src/mito2/src/engine/staging_test.rs +++ b/src/mito2/src/engine/staging_test.rs @@ -36,7 +36,7 @@ use store_api::region_engine::{ }; use store_api::region_request::{ ApplyStagingManifestRequest, EnterStagingRequest, RegionAlterRequest, RegionFlushRequest, - RegionPutRequest, RegionRequest, RegionTruncateRequest, + RegionPutRequest, RegionRequest, RegionTruncateRequest, StagingPartitionDirective, }; use store_api::storage::{RegionId, ScanRequest}; @@ -252,6 +252,52 @@ fn default_partition_expr() -> String { range_expr("a", 0, 100).as_json_str().unwrap() } +#[tokio::test] +async fn test_staging_reject_all_writes_rejects_put() { + let mut env = TestEnv::new().await; + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(2048, 0); + let request = CreateRequestBuilder::new().build(); + let column_schemas = rows_schema(&request); + + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + engine + .handle_request( + region_id, + RegionRequest::EnterStaging(EnterStagingRequest { + partition_directive: StagingPartitionDirective::RejectAllWrites, + }), + ) + .await + .unwrap(); + + let rows = Rows { + schema: column_schemas, + rows: build_rows(0, 2), + }; + let err = engine + .handle_request( + region_id, + RegionRequest::Put(RegionPutRequest { + rows, + hint: None, + partition_expr_version: None, + }), + ) + .await + .unwrap_err(); + assert_eq!(err.status_code(), StatusCode::StorageUnavailable); + assert_matches!( + err.into_inner().as_any().downcast_ref::().unwrap(), + Error::RejectWrite { .. } + ); +} + #[tokio::test] async fn test_staging_write_partition_expr_version() { test_staging_write_partition_expr_version_with_format(false).await; @@ -285,7 +331,9 @@ async fn test_staging_write_partition_expr_version_with_format(flat_format: bool .handle_request( region_id, RegionRequest::EnterStaging(EnterStagingRequest { - partition_expr: partition_expr.clone(), + partition_directive: StagingPartitionDirective::UpdatePartitionExpr( + partition_expr.clone(), + ), }), ) .await @@ -316,7 +364,7 @@ async fn test_staging_write_partition_expr_version_with_format(flat_format: bool assert_eq!(err.status_code(), StatusCode::InvalidArguments); assert_matches!( err.into_inner().as_any().downcast_ref::().unwrap(), - Error::PartitionRuleVersionMismatch { .. } + Error::PartitionExprVersionMismatch { .. } ); let compat_rows = Rows { @@ -492,7 +540,9 @@ async fn test_staging_manifest_directory_with_format(flat_format: bool) { .handle_request( region_id, RegionRequest::EnterStaging(EnterStagingRequest { - partition_expr: partition_expr.clone(), + partition_directive: StagingPartitionDirective::UpdatePartitionExpr( + partition_expr.clone(), + ), }), ) .await @@ -500,8 +550,12 @@ async fn test_staging_manifest_directory_with_format(flat_format: bool) { let region = engine.get_region(region_id).unwrap(); let staging_partition_info = region.staging_partition_info.lock().unwrap().clone(); assert_eq!( - staging_partition_info.unwrap().partition_expr, - partition_expr + staging_partition_info + .unwrap() + .partition_expr() + .unwrap() + .to_string(), + partition_expr, ); { let manager = region.manifest_ctx.manifest_manager.read().await; @@ -523,7 +577,9 @@ async fn test_staging_manifest_directory_with_format(flat_format: bool) { .handle_request( region_id, RegionRequest::EnterStaging(EnterStagingRequest { - partition_expr: partition_expr.clone(), + partition_directive: StagingPartitionDirective::UpdatePartitionExpr( + partition_expr.clone(), + ), }), ) .await @@ -534,7 +590,7 @@ async fn test_staging_manifest_directory_with_format(flat_format: bool) { .handle_request( region_id, RegionRequest::EnterStaging(EnterStagingRequest { - partition_expr: "".to_string(), + partition_directive: StagingPartitionDirective::UpdatePartitionExpr("".to_string()), }), ) .await @@ -629,7 +685,9 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool) .handle_request( region_id, RegionRequest::EnterStaging(EnterStagingRequest { - partition_expr: partition_expr.clone(), + partition_directive: StagingPartitionDirective::UpdatePartitionExpr( + partition_expr.clone(), + ), }), ) .await @@ -843,7 +901,9 @@ async fn test_enter_staging_writes_partition_expr_change_action_with_format(flat .handle_request( region_id, RegionRequest::EnterStaging(EnterStagingRequest { - partition_expr: partition_expr.clone(), + partition_directive: StagingPartitionDirective::UpdatePartitionExpr( + partition_expr.clone(), + ), }), ) .await @@ -905,7 +965,9 @@ async fn test_staging_exit_conflict_partition_expr_change_and_change_with_format .handle_request( region_id, RegionRequest::EnterStaging(EnterStagingRequest { - partition_expr: partition_expr.clone(), + partition_directive: StagingPartitionDirective::UpdatePartitionExpr( + partition_expr.clone(), + ), }), ) .await @@ -1006,7 +1068,9 @@ async fn test_write_stall_on_enter_staging_with_format(flat_format: bool) { .handle_request( region_id, RegionRequest::EnterStaging(EnterStagingRequest { - partition_expr: partition_expr.clone(), + partition_directive: StagingPartitionDirective::UpdatePartitionExpr( + partition_expr.clone(), + ), }), ) .await @@ -1108,7 +1172,9 @@ async fn test_enter_staging_error(env: &mut TestEnv, flat_format: bool) { .handle_request( region_id, RegionRequest::EnterStaging(EnterStagingRequest { - partition_expr: partition_expr.clone(), + partition_directive: StagingPartitionDirective::UpdatePartitionExpr( + partition_expr.clone(), + ), }), ) .await diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index ff1760b9b9..23f8185cad 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -562,12 +562,12 @@ pub enum Error { }, #[snafu(display( - "Partition rule version mismatch for region {}: request {}, expected {}", + "Partition expr version mismatch for region {}: request {}, expected {}", region_id, request_version, expected_version ))] - PartitionRuleVersionMismatch { + PartitionExprVersionMismatch { region_id: RegionId, request_version: u64, expected_version: u64, @@ -1272,7 +1272,7 @@ impl ErrorExt for Error { | InvalidScanIndex { .. } | InvalidMeta { .. } | InvalidRequest { .. } - | PartitionRuleVersionMismatch { .. } + | PartitionExprVersionMismatch { .. } | FillDefault { .. } | ConvertColumnDataType { .. } | ColumnNotFound { .. } diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 156d972497..bef5b9ce07 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -25,6 +25,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; use std::sync::{Arc, Mutex, RwLock}; +use common_base::hash::partition_expr_version; use common_telemetry::{error, info, warn}; use crossbeam_utils::atomic::AtomicCell; use partition::expr::PartitionExpr; @@ -36,7 +37,7 @@ use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{ RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState, }; -use store_api::region_request::PathType; +use store_api::region_request::{PathType, StagingPartitionDirective}; use store_api::sst_entry::ManifestSstEntry; use store_api::storage::{FileId, RegionId, SequenceNumber}; use tokio::sync::RwLockWriteGuard; @@ -168,8 +169,29 @@ pub type MitoRegionRef = Arc; #[derive(Debug, Clone)] pub(crate) struct StagingPartitionInfo { - pub(crate) partition_expr: String, - pub(crate) partition_expr_version: u64, + pub(crate) partition_directive: StagingPartitionDirective, + pub(crate) partition_rule_version: u64, +} + +impl StagingPartitionInfo { + /// Returns the partition expression carried by the staging directive, if any. + pub(crate) fn partition_expr(&self) -> Option<&str> { + self.partition_directive.partition_expr() + } + + /// Builds staging partition info from a directive and derives its version marker. + pub(crate) fn from_partition_directive(partition_directive: StagingPartitionDirective) -> Self { + let partition_rule_version = match &partition_directive { + StagingPartitionDirective::UpdatePartitionExpr(expr) => { + partition_expr_version(Some(expr)) + } + StagingPartitionDirective::RejectAllWrites => 0, + }; + Self { + partition_directive, + partition_rule_version, + } + } } impl MitoRegion { @@ -801,7 +823,7 @@ impl MitoRegion { } staging_partition_info .as_ref() - .map(|info| info.partition_expr.clone()) + .and_then(|info| info.partition_expr().map(ToString::to_string)) } else { let version = self.version(); version.metadata.partition_expr.clone() @@ -813,12 +835,29 @@ impl MitoRegion { let staging_partition_info = self.staging_partition_info.lock().unwrap(); staging_partition_info .as_ref() - .map(|info| info.partition_expr_version) + .map(|info| info.partition_rule_version) .unwrap_or_default() } else { self.version().metadata.partition_expr_version } } + + /// Returns whether writes should be rejected for this region in staging mode. + pub(crate) fn reject_all_writes_in_staging(&self) -> bool { + if !self.is_staging() { + return false; + } + let staging_partition_info = self.staging_partition_info.lock().unwrap(); + staging_partition_info + .as_ref() + .map(|info| { + matches!( + info.partition_directive, + StagingPartitionDirective::RejectAllWrites + ) + }) + .unwrap_or(false) + } } /// Context to update the region manifest. diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 34cd4ee485..4f8f1b0d51 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -40,7 +40,7 @@ use store_api::region_request::{ AffectedRows, ApplyStagingManifestRequest, EnterStagingRequest, RegionAlterRequest, RegionBuildIndexRequest, RegionBulkInsertsRequest, RegionCatchupRequest, RegionCloseRequest, RegionCompactRequest, RegionCreateRequest, RegionDropRequest, RegionFlushRequest, - RegionOpenRequest, RegionRequest, RegionTruncateRequest, + RegionOpenRequest, RegionRequest, RegionTruncateRequest, StagingPartitionDirective, }; use store_api::storage::{FileId, RegionId}; use tokio::sync::oneshot::{self, Receiver, Sender}; @@ -1057,8 +1057,8 @@ pub(crate) struct RegionChangeResult { pub(crate) struct EnterStagingResult { /// Region id. pub(crate) region_id: RegionId, - /// The new partition expression to apply. - pub(crate) partition_expr: String, + /// The new staging partition directive to apply. + pub(crate) partition_directive: StagingPartitionDirective, /// Result sender. pub(crate) sender: OptionOutputTx, /// Result from the manifest manager. diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index dacec92772..71896b3d5d 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -1140,7 +1140,7 @@ impl RegionWorkerLoop { DdlRequest::EnterStaging(req) => { self.handle_enter_staging_request( ddl.region_id, - req.partition_expr, + req.partition_directive, ddl.sender, ) .await; diff --git a/src/mito2/src/worker/handle_apply_staging.rs b/src/mito2/src/worker/handle_apply_staging.rs index 00e57e1ae3..e773150356 100644 --- a/src/mito2/src/worker/handle_apply_staging.rs +++ b/src/mito2/src/worker/handle_apply_staging.rs @@ -76,17 +76,17 @@ impl RegionWorkerLoop { } let staging_partition_info = region.staging_partition_info.lock().unwrap().clone(); - // If the partition expr mismatch, return error. - if staging_partition_info + + let staging_partition_expr = staging_partition_info .as_ref() - .map(|info| &info.partition_expr) - != Some(&request.partition_expr) - { + .and_then(|info| info.partition_expr()); + // If the partition expr mismatch, return error. + if staging_partition_expr != Some(request.partition_expr.as_str()) { sender.send( StagingPartitionExprMismatchSnafu { manifest_expr: staging_partition_info .as_ref() - .map(|info| info.partition_expr.clone()), + .and_then(|info| info.partition_expr().map(ToString::to_string)), request_expr: request.partition_expr, } .fail(), diff --git a/src/mito2/src/worker/handle_enter_staging.rs b/src/mito2/src/worker/handle_enter_staging.rs index 52a31d03a8..7f926cce80 100644 --- a/src/mito2/src/worker/handle_enter_staging.rs +++ b/src/mito2/src/worker/handle_enter_staging.rs @@ -14,10 +14,9 @@ use std::time::Instant; -use common_base::hash::partition_expr_version; use common_telemetry::{error, info, warn}; use store_api::logstore::LogStore; -use store_api::region_request::EnterStagingRequest; +use store_api::region_request::{EnterStagingRequest, StagingPartitionDirective}; use store_api::storage::RegionId; use crate::error::{RegionNotFoundSnafu, Result, StagingPartitionExprMismatchSnafu}; @@ -34,27 +33,27 @@ impl RegionWorkerLoop { pub(crate) async fn handle_enter_staging_request( &mut self, region_id: RegionId, - partition_expr: String, + partition_directive: StagingPartitionDirective, mut sender: OptionOutputTx, ) { let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else { return; }; - // If the region is already in staging mode, verify the partition expr matches. + // If the region is already in staging mode, verify the partition directive matches. if region.is_staging() { let staging_partition_info = region.staging_partition_info.lock().unwrap().clone(); - // If the partition expr mismatch, return error. + // If the partition directive mismatches, return error. if staging_partition_info .as_ref() - .map(|info| &info.partition_expr) - != Some(&partition_expr) + .map(|info| &info.partition_directive) + != Some(&partition_directive) { sender.send(Err(StagingPartitionExprMismatchSnafu { manifest_expr: staging_partition_info .as_ref() - .map(|info| info.partition_expr.clone()), - request_expr: partition_expr, + .and_then(|info| info.partition_expr().map(ToString::to_string)), + request_expr: format!("{:?}", partition_directive), } .build())); return; @@ -91,16 +90,21 @@ impl RegionWorkerLoop { .add_ddl_request_to_pending(SenderDdlRequest { region_id, sender, - request: DdlRequest::EnterStaging(EnterStagingRequest { partition_expr }), + request: DdlRequest::EnterStaging(EnterStagingRequest { + partition_directive: partition_directive.clone(), + }), }); return; } - self.handle_enter_staging(region, partition_expr, sender); + self.handle_enter_staging(region, partition_directive, sender); } - async fn enter_staging(region: &MitoRegionRef, partition_expr: String) -> Result<()> { + async fn enter_staging( + region: &MitoRegionRef, + partition_directive: &StagingPartitionDirective, + ) -> Result<()> { let now = Instant::now(); // First step: clear all staging manifest files. { @@ -123,6 +127,20 @@ impl RegionWorkerLoop { ); } + let partition_expr = match partition_directive { + StagingPartitionDirective::UpdatePartitionExpr(partition_expr) => { + partition_expr.clone() + } + StagingPartitionDirective::RejectAllWrites => { + info!( + "Enter staging with reject all writes, region_id: {}", + region.region_id + ); + // Rejects all writes just a memory flag, no need to write new staging manifest. + return Ok(()); + } + }; + // Second step: write new staging manifest. let change = RegionPartitionExprChange { partition_expr: Some(partition_expr.clone()), @@ -140,7 +158,7 @@ impl RegionWorkerLoop { fn handle_enter_staging( &self, region: MitoRegionRef, - partition_expr: String, + partition_directive: StagingPartitionDirective, sender: OptionOutputTx, ) { if let Err(e) = region.set_entering_staging() { @@ -152,7 +170,7 @@ impl RegionWorkerLoop { let request_sender = self.sender.clone(); common_runtime::spawn_global(async move { let now = Instant::now(); - let result = Self::enter_staging(®ion, partition_expr.clone()).await; + let result = Self::enter_staging(®ion, &partition_directive).await; match result { Ok(_) => { info!( @@ -184,7 +202,7 @@ impl RegionWorkerLoop { region_id: region.region_id, sender, result, - partition_expr, + partition_directive, }), }; listener @@ -224,12 +242,12 @@ impl RegionWorkerLoop { if enter_staging_result.result.is_ok() { info!( - "Updating region {} staging partition expr to {}", - region.region_id, enter_staging_result.partition_expr + "Updating region {} staging partition directive to {:?}", + region.region_id, enter_staging_result.partition_directive ); Self::update_region_staging_partition_info( ®ion, - enter_staging_result.partition_expr, + enter_staging_result.partition_directive, ); region.switch_state_to_staging(RegionLeaderState::EnteringStaging); } else { @@ -243,13 +261,14 @@ impl RegionWorkerLoop { .await; } - fn update_region_staging_partition_info(region: &MitoRegionRef, partition_expr: String) { + fn update_region_staging_partition_info( + region: &MitoRegionRef, + partition_directive: StagingPartitionDirective, + ) { let mut staging_partition_info = region.staging_partition_info.lock().unwrap(); debug_assert!(staging_partition_info.is_none()); - let partition_expr_version = partition_expr_version(Some(&partition_expr)); - *staging_partition_info = Some(StagingPartitionInfo { - partition_expr, - partition_expr_version, - }); + *staging_partition_info = Some(StagingPartitionInfo::from_partition_directive( + partition_directive, + )); } } diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index 8745aa894e..802073b816 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -26,7 +26,7 @@ use store_api::logstore::LogStore; use store_api::storage::RegionId; use crate::error::{ - InvalidRequestSnafu, PartitionRuleVersionMismatchSnafu, RegionStateSnafu, RejectWriteSnafu, + InvalidRequestSnafu, PartitionExprVersionMismatchSnafu, RegionStateSnafu, RejectWriteSnafu, Result, }; use crate::metrics; @@ -253,6 +253,13 @@ impl RegionWorkerLoop { match region.state() { RegionRoleState::Leader(RegionLeaderState::Writable) | RegionRoleState::Leader(RegionLeaderState::Staging) => { + if region.reject_all_writes_in_staging() { + sender_req + .sender + .send(RejectWriteSnafu { region_id }.fail()); + continue; + } + let region_ctx = RegionWriteCtx::new( region.region_id, ®ion.version_control, @@ -305,6 +312,12 @@ impl RegionWorkerLoop { else { continue; }; + if region.reject_all_writes_in_staging() { + sender_req + .sender + .send(RejectWriteSnafu { region_id }.fail()); + continue; + } let expected_version = region.expected_partition_expr_version(); if let Err(e) = check_partition_expr_version( region_id, @@ -380,7 +393,12 @@ impl RegionWorkerLoop { continue; }; match region.state() { - RegionRoleState::Leader(RegionLeaderState::Writable) => { + RegionRoleState::Leader(RegionLeaderState::Writable) + | RegionRoleState::Leader(RegionLeaderState::Staging) => { + if region.reject_all_writes_in_staging() { + bulk_req.sender.send(RejectWriteSnafu { region_id }.fail()); + continue; + } let region_ctx = RegionWriteCtx::new( region.region_id, ®ion.version_control, @@ -420,6 +438,10 @@ impl RegionWorkerLoop { let Some(region) = self.regions.get_region_or(region_id, &mut bulk_req.sender) else { continue; }; + if region.reject_all_writes_in_staging() { + bulk_req.sender.send(RejectWriteSnafu { region_id }.fail()); + continue; + } let expected_version = region.expected_partition_expr_version(); if let Err(e) = check_partition_expr_version( region_id, @@ -505,7 +527,7 @@ fn check_partition_expr_version( Some(value) => value, }; if request_version != expected_version { - return PartitionRuleVersionMismatchSnafu { + return PartitionExprVersionMismatchSnafu { region_id, request_version, expected_version, diff --git a/src/operator/src/tests/partition_manager.rs b/src/operator/src/tests/partition_manager.rs index 78b35a6906..b4ac321089 100644 --- a/src/operator/src/tests/partition_manager.rs +++ b/src/operator/src/tests/partition_manager.rs @@ -21,7 +21,7 @@ use common_meta::key::TableMetadataManager; use common_meta::key::table_route::TableRouteValue; use common_meta::kv_backend::KvBackendRef; use common_meta::peer::Peer; -use common_meta::rpc::router::{LegacyPartition, Region, RegionRoute}; +use common_meta::rpc::router::{Region, RegionRoute}; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, SchemaBuilder}; use moka::future::CacheBuilder; @@ -122,6 +122,14 @@ pub(crate) async fn create_partition_rule_manager( )); let regions = vec![1u32, 2, 3]; let region_wal_options = new_test_region_wal_options(regions.clone()); + let expr_str = serde_json::json!({ + "Expr": { + "lhs": {"Column": "a"}, + "op": "GtEq", + "rhs": {"Value": {"Int32": 50}} + } + }) + .to_string(); table_metadata_manager .create_table_metadata( new_test_table_info(1, "table_1", regions.clone().into_iter()), @@ -130,7 +138,6 @@ pub(crate) async fn create_partition_rule_manager( region: Region { id: 3.into(), name: "r1".to_string(), - partition: None, attrs: BTreeMap::new(), partition_expr: PartitionExpr::new( Operand::Column("a".to_string()), @@ -144,12 +151,12 @@ pub(crate) async fn create_partition_rule_manager( follower_peers: vec![], leader_state: None, leader_down_since: None, + write_route_policy: None, }, RegionRoute { region: Region { id: 2.into(), name: "r2".to_string(), - partition: None, attrs: BTreeMap::new(), partition_expr: PartitionExpr::new( Operand::Expr(PartitionExpr::new( @@ -171,23 +178,26 @@ pub(crate) async fn create_partition_rule_manager( follower_peers: vec![], leader_state: None, leader_down_since: None, + write_route_policy: None, }, RegionRoute { - region: Region { - id: 1.into(), - name: "r3".to_string(), - // Keep the old partition definition to test compatibility. - partition: Some(LegacyPartition { - column_list: vec![b"a".to_vec()], - value_list: vec![b"{\"Expr\":{\"lhs\":{\"Column\":\"a\"},\"op\":\"GtEq\",\"rhs\":{\"Value\":{\"Int32\":50}}}}".to_vec()], - }), - attrs: BTreeMap::new(), - partition_expr: Default::default(), - }, + // Keep legacy `partition` payload to test compatibility. + region: serde_json::from_value(serde_json::json!({ + "id": 1, + "name": "r3", + "partition": { + "column_list": ["a"], + "value_list": [expr_str] + }, + "attrs": {}, + "partition_expr": "" + })) + .unwrap(), leader_peer: Some(Peer::new(1, "")), follower_peers: vec![], leader_state: None, leader_down_since: None, + write_route_policy: None, }, ]), region_wal_options.clone(), diff --git a/src/partition/src/cache.rs b/src/partition/src/cache.rs index 9dd94064bd..a886e1e08d 100644 --- a/src/partition/src/cache.rs +++ b/src/partition/src/cache.rs @@ -57,6 +57,12 @@ pub fn create_partitions_with_version_from_region_routes( ) -> common_meta::error::Result> { let mut partitions = Vec::with_capacity(region_routes.len()); for r in region_routes { + // Ignore regions marked as reject-all-writes; they should not participate + // in writable partition-cache construction. + if r.is_ignore_all_writes() { + continue; + } + let expr_json = r.region.partition_expr(); let partition_expr_version = if expr_json.is_empty() { None @@ -127,3 +133,50 @@ pub fn new_partition_info_cache( |ident| matches!(ident, CacheIdent::TableId(_)), ) } + +#[cfg(test)] +mod tests { + use common_base::hash::partition_expr_version; + use common_meta::rpc::router::{Region, RegionRoute, WriteRoutePolicy}; + use store_api::storage::RegionId; + + use super::create_partitions_with_version_from_region_routes; + + #[test] + fn test_create_partitions_with_version_excludes_reject_all_writes() { + let table_id = 1024; + let expr_json = + r#"{"Expr":{"lhs":{"Column":"a"},"op":"GtEq","rhs":{"Value":{"UInt32":10}}}}"#; + let region_routes = vec![ + RegionRoute { + region: Region { + id: RegionId::new(table_id, 1), + partition_expr: expr_json.to_string(), + ..Default::default() + }, + leader_peer: None, + follower_peers: vec![], + leader_state: None, + leader_down_since: None, + write_route_policy: Some(WriteRoutePolicy::IgnoreAllWrites), + }, + RegionRoute { + region: Region { + id: RegionId::new(table_id, 2), + partition_expr: expr_json.to_string(), + ..Default::default() + }, + ..Default::default() + }, + ]; + + let partitions = + create_partitions_with_version_from_region_routes(table_id, ®ion_routes).unwrap(); + assert_eq!(1, partitions.len()); + assert_eq!(RegionId::new(table_id, 2), partitions[0].id); + assert_eq!( + Some(partition_expr_version(Some(expr_json))), + partitions[0].partition_expr_version + ); + } +} diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index ace8fd1edd..99d3a87dd3 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -1476,26 +1476,51 @@ impl RegionBulkInsertsRequest { } } -/// Request to stage a region with a new region rule(partition expression). +/// Request to stage a region with a new partition directive. /// /// This request transitions a region into the staging mode. -/// It first flushes the memtable for the old region rule if it is not empty, -/// then enters the staging mode with the new region rule. +/// It first flushes the memtable for the old partition expression if it is not +/// empty, then enters the staging mode with the new directive. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum StagingPartitionDirective { + UpdatePartitionExpr(String), + RejectAllWrites, +} + +impl StagingPartitionDirective { + /// Returns the partition expression carried by this directive, if any. + pub fn partition_expr(&self) -> Option<&str> { + match self { + Self::UpdatePartitionExpr(expr) => Some(expr), + Self::RejectAllWrites => None, + } + } +} + #[derive(Debug, Clone)] pub struct EnterStagingRequest { - /// The partition expression of the staging region. - pub partition_expr: String, + /// The staging partition directive of the region. + pub partition_directive: StagingPartitionDirective, +} + +impl EnterStagingRequest { + /// Builds an enter-staging request with a partition expression directive. + pub fn with_partition_expr(partition_expr: String) -> Self { + Self { + partition_directive: StagingPartitionDirective::UpdatePartitionExpr(partition_expr), + } + } } /// This request is used as part of the region repartition. /// -/// After a region has entered staging mode with a new region rule (partition +/// After a region has entered staging mode with a new partition expression /// expression) and a separate process (for example, `remap_manifests`) has /// generated the new file assignments for the staging region, this request /// applies that generated manifest to the region. /// /// In practice, this means: -/// - The `partition_expr` identifies the staging region rule that the manifest +/// - The `partition_expr` identifies the staging partition expression that the manifest /// was generated for. /// - `central_region_id` specifies which region holds the staging blob storage /// where the manifest was written during the `remap_manifests` operation.