fix(repartition): reject writes on deallocating regions during region merge (#7694)

* feat(meta): add write route policy to region route with backward compatibility

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix(meta): use partition_expr compatibility accessor in repartition matching

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat(meta): introduce staging partition rule enum for repartition instructions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat(datanode): plumb staging partition rule enum through heartbeat handlers

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat(meta): mark pending-deallocate regions as reject-all during merge staging

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat(partition): exclude reject-all regions from write partitioning

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat(mito): store staging partition rule enum in region state

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat(mito): reject writes in staging when partition rule is reject-all

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat(meta): send enter staging instruction with reject-all

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix(repartition): preserve reject-all on exit, merge enter-staging instructions, and allow staged bulk writes

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: refactor to ignore all writes

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: rename StagingPartitionRule to StagingPartitionDirective across staging flow

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: add comments

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: clippy

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: nit

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: rename

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2026-02-25 15:04:38 +08:00
committed by GitHub
parent 6d998c043e
commit df04267c54
40 changed files with 962 additions and 177 deletions

View File

@@ -189,7 +189,6 @@ fn create_region_routes(regions: Vec<RegionNumber>) -> Vec<RegionRoute> {
region: Region {
id: region_id.into(),
name: String::new(),
partition: None,
attrs: BTreeMap::new(),
partition_expr: Default::default(),
},
@@ -200,6 +199,7 @@ fn create_region_routes(regions: Vec<RegionNumber>) -> Vec<RegionRoute> {
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
});
}

View File

@@ -201,6 +201,7 @@ mod tests {
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
}]),
HashMap::new(),
)

View File

@@ -600,6 +600,7 @@ async fn test_on_submit_alter_region_request() {
follower_peers: vec![Peer::empty(5)],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
}];
create_physical_table_task.set_table_id(phy_id);
create_physical_table_metadata(

View File

@@ -68,6 +68,7 @@ fn prepare_table_route(table_id: u32) -> TableRouteValue {
follower_peers: vec![Peer::empty(5)],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
},
RegionRoute {
region: Region::new_test(RegionId::new(table_id, 2)),
@@ -75,6 +76,7 @@ fn prepare_table_route(table_id: u32) -> TableRouteValue {
follower_peers: vec![Peer::empty(4)],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
},
RegionRoute {
region: Region::new_test(RegionId::new(table_id, 3)),
@@ -82,6 +84,7 @@ fn prepare_table_route(table_id: u32) -> TableRouteValue {
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
},
])
}
@@ -425,6 +428,7 @@ async fn test_on_update_metadata_add_columns() {
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
}];
// Puts a value to table name key.
ddl_context
@@ -528,6 +532,7 @@ async fn test_on_update_table_options() {
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
}];
// Puts a value to table name key.
ddl_context

View File

@@ -521,6 +521,7 @@ async fn test_on_submit_create_request() {
follower_peers: vec![Peer::empty(5)],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
}];
create_physical_table_task.set_table_id(table_id);
create_physical_table_metadata(

View File

@@ -118,6 +118,7 @@ async fn test_on_datanode_drop_regions() {
follower_peers: vec![Peer::empty(5)],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
},
RegionRoute {
region: Region::new_test(RegionId::new(table_id, 2)),
@@ -125,6 +126,7 @@ async fn test_on_datanode_drop_regions() {
follower_peers: vec![Peer::empty(4)],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
},
RegionRoute {
region: Region::new_test(RegionId::new(table_id, 3)),
@@ -132,6 +134,7 @@ async fn test_on_datanode_drop_regions() {
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
},
]),
HashMap::new(),

View File

@@ -171,6 +171,7 @@ mod tests {
follower_peers: vec![Peer::empty(5)],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
},
RegionRoute {
region: Region::new_test(RegionId::new(1024, 2)),
@@ -178,6 +179,7 @@ mod tests {
follower_peers: vec![Peer::empty(4)],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
},
RegionRoute {
region: Region::new_test(RegionId::new(1024, 3)),
@@ -185,6 +187,7 @@ mod tests {
follower_peers: vec![Peer::empty(4)],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
},
];
let region_metadatas = lister.list(1024, &region_routes).await.unwrap();
@@ -223,6 +226,7 @@ mod tests {
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
},
RegionRoute {
region: Region::new_test(RegionId::new(1024, 1)),
@@ -230,6 +234,7 @@ mod tests {
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
},
];
let region_metadatas = lister.list(1024, &region_routes).await.unwrap();

View File

@@ -16,7 +16,7 @@ use std::collections::{HashMap, HashSet};
use std::fmt::{Display, Formatter};
use std::time::Duration;
use serde::{Deserialize, Deserializer, Serialize};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use store_api::region_engine::SyncRegionFromRequest;
use store_api::storage::{FileRefsManifest, GcReport, RegionId, RegionNumber};
use strum::Display;
@@ -518,19 +518,97 @@ impl Display for GcRegionsReply {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct EnterStagingRegion {
pub region_id: RegionId,
pub partition_expr: String,
#[serde(
alias = "partition_expr",
deserialize_with = "deserialize_enter_staging_partition_directive",
serialize_with = "serialize_enter_staging_partition_directive"
)]
pub partition_directive: StagingPartitionDirective,
}
impl Display for EnterStagingRegion {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"EnterStagingRegion(region_id={}, partition_expr={})",
self.region_id, self.partition_expr
"EnterStagingRegion(region_id={}, partition_directive={})",
self.region_id, self.partition_directive
)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum StagingPartitionDirective {
UpdatePartitionExpr(String),
RejectAllWrites,
}
impl StagingPartitionDirective {
/// Returns the partition expression carried by this directive, if any.
pub fn as_partition_expr(&self) -> Option<&str> {
match self {
Self::UpdatePartitionExpr(expr) => Some(expr),
Self::RejectAllWrites => None,
}
}
}
impl Display for StagingPartitionDirective {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::UpdatePartitionExpr(expr) => write!(f, "UpdatePartitionExpr({})", expr),
Self::RejectAllWrites => write!(f, "RejectAllWrites"),
}
}
}
fn serialize_enter_staging_partition_directive<S>(
rule: &StagingPartitionDirective,
serializer: S,
) -> std::result::Result<S::Ok, S::Error>
where
S: Serializer,
{
match rule {
StagingPartitionDirective::UpdatePartitionExpr(expr) => serializer.serialize_str(expr),
StagingPartitionDirective::RejectAllWrites => {
#[derive(Serialize)]
struct RejectAllWritesSer<'a> {
r#type: &'a str,
}
RejectAllWritesSer {
r#type: "reject_all_writes",
}
.serialize(serializer)
}
}
}
fn deserialize_enter_staging_partition_directive<'de, D>(
deserializer: D,
) -> std::result::Result<StagingPartitionDirective, D::Error>
where
D: Deserializer<'de>,
{
#[derive(Deserialize)]
#[serde(untagged)]
enum Compat {
Legacy(String),
TypeTagged { r#type: String },
}
match Compat::deserialize(deserializer)? {
Compat::Legacy(expr) => Ok(StagingPartitionDirective::UpdatePartitionExpr(expr)),
Compat::TypeTagged { r#type } if r#type == "reject_all_writes" => {
Ok(StagingPartitionDirective::RejectAllWrites)
}
Compat::TypeTagged { r#type } => Err(serde::de::Error::custom(format!(
"Unknown enter staging partition directive type: {}",
r#type
))),
}
}
/// Instruction payload for syncing a region from a manifest or another region.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct SyncRegion {
@@ -795,7 +873,7 @@ where
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct EnterStagingRegionReply {
pub region_id: RegionId,
/// Returns true if the region is under the new region rule.
/// Returns true if the region has entered staging with the target directive.
pub ready: bool,
/// Indicates whether the region exists.
pub exists: bool,
@@ -1208,6 +1286,31 @@ mod tests {
assert_eq!(upgrade_region_instruction_reply, upgrade_region_reply);
}
#[test]
fn test_enter_staging_partition_rule_compatibility() {
let legacy = r#"{"region_id":4398046511105,"partition_expr":"{\"Expr\":{\"lhs\":{\"Column\":\"x\"},\"op\":\"GtEq\",\"rhs\":{\"Value\":{\"Int32\":0}}}}"}"#;
let enter: EnterStagingRegion = serde_json::from_str(legacy).unwrap();
assert_eq!(enter.region_id, RegionId::new(1024, 1));
assert_eq!(
enter.partition_directive,
StagingPartitionDirective::UpdatePartitionExpr(
"{\"Expr\":{\"lhs\":{\"Column\":\"x\"},\"op\":\"GtEq\",\"rhs\":{\"Value\":{\"Int32\":0}}}}"
.to_string()
)
);
let serialized = serde_json::to_string(&enter).unwrap();
assert!(serialized.contains("\"partition_directive\":\""));
assert!(!serialized.contains("partition_expr"));
let reject = r#"{"region_id":4398046511105,"partition_expr":{"type":"reject_all_writes"}}"#;
let enter: EnterStagingRegion = serde_json::from_str(reject).unwrap();
assert_eq!(
enter.partition_directive,
StagingPartitionDirective::RejectAllWrites
);
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct LegacyOpenRegion {
region_ident: RegionIdent,

View File

@@ -1539,7 +1539,6 @@ mod tests {
region: Region {
id: region_id.into(),
name: "r1".to_string(),
partition: None,
attrs: BTreeMap::new(),
partition_expr: Default::default(),
},
@@ -1547,6 +1546,7 @@ mod tests {
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
}
}
@@ -2045,7 +2045,6 @@ mod tests {
region: Region {
id: 1.into(),
name: "r1".to_string(),
partition: None,
attrs: BTreeMap::new(),
partition_expr: Default::default(),
},
@@ -2053,12 +2052,12 @@ mod tests {
leader_state: Some(LeaderState::Downgrading),
follower_peers: vec![],
leader_down_since: Some(current_time_millis()),
write_route_policy: None,
},
RegionRoute {
region: Region {
id: 2.into(),
name: "r2".to_string(),
partition: None,
attrs: BTreeMap::new(),
partition_expr: Default::default(),
},
@@ -2066,6 +2065,7 @@ mod tests {
leader_state: None,
follower_peers: vec![],
leader_down_since: None,
write_route_policy: None,
},
];
let table_info = new_test_table_info();
@@ -2504,6 +2504,7 @@ mod tests {
follower_peers: vec![Peer::empty(5)],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
},
RegionRoute {
region: Region::new_test(RegionId::new(table_id, 2)),
@@ -2511,6 +2512,7 @@ mod tests {
follower_peers: vec![Peer::empty(4)],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
},
RegionRoute {
region: Region::new_test(RegionId::new(table_id, 3)),
@@ -2518,6 +2520,7 @@ mod tests {
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
},
]),
serialized_options,
@@ -2561,6 +2564,7 @@ mod tests {
follower_peers: vec![Peer::empty(5)],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
},
RegionRoute {
region: Region::new_test(RegionId::new(table_id, 2)),
@@ -2568,6 +2572,7 @@ mod tests {
follower_peers: vec![Peer::empty(4)],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
},
RegionRoute {
region: Region::new_test(RegionId::new(table_id, 3)),
@@ -2575,6 +2580,7 @@ mod tests {
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
},
]),
serialized_options,

View File

@@ -941,7 +941,6 @@ mod tests {
region: Region {
id: RegionId::new(0, 1),
name: "r1".to_string(),
partition: None,
attrs: Default::default(),
partition_expr: Default::default(),
},
@@ -952,12 +951,12 @@ mod tests {
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
},
RegionRoute {
region: Region {
id: RegionId::new(0, 1),
name: "r1".to_string(),
partition: None,
attrs: Default::default(),
partition_expr: Default::default(),
},
@@ -968,6 +967,7 @@ mod tests {
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
},
],
max_region_number: 1,

View File

@@ -196,6 +196,7 @@ impl TableRoute {
follower_peers,
leader_state: None,
leader_down_since: None,
write_route_policy: None,
});
}
@@ -258,6 +259,10 @@ pub struct RegionRoute {
#[serde(default)]
#[builder(default = "self.default_leader_down_since()")]
pub leader_down_since: Option<i64>,
/// Special write routing behavior for this region.
#[builder(setter(into, strip_option), default)]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub write_route_policy: Option<WriteRoutePolicy>,
}
impl RegionRouteBuilder {
@@ -287,7 +292,40 @@ pub enum LeaderState {
Staging,
}
/// The write route policy for the region.
#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq)]
pub enum WriteRoutePolicy {
// The default policy.
Normal,
/// Ignores all writes for this region.
///
/// This policy is typically used during region merge operations, such as repartitioning.
/// For example, when merging Region A and Region B into just Region B,
/// writes to Region A are ignored, while Region B accepts all writes originating from both regions.
IgnoreAllWrites,
}
impl RegionRoute {
/// Returns true if the region should ignore all writes.
pub fn is_ignore_all_writes(&self) -> bool {
matches!(
self.write_route_policy,
Some(WriteRoutePolicy::IgnoreAllWrites)
)
}
/// Marks this region as ignore-all for writes.
pub fn set_ignore_all_writes(&mut self) {
self.write_route_policy = Some(WriteRoutePolicy::IgnoreAllWrites);
}
/// Clears ignore-all write policy and falls back to normal routing behavior.
pub fn clear_ignore_all_writes(&mut self) {
if self.write_route_policy == Some(WriteRoutePolicy::IgnoreAllWrites) {
self.write_route_policy = None;
}
}
/// Returns true if the Leader [`Region`] is downgraded.
///
/// The following cases in which the [`Region`] will be downgraded.
@@ -376,19 +414,57 @@ impl RegionRoutes {
}
}
#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq)]
#[derive(Debug, Clone, Default, PartialEq, Serialize)]
pub struct Region {
pub id: RegionId,
pub name: String,
pub attrs: BTreeMap<String, String>,
/// **Deprecated:** Use `partition_expr` instead.
pub partition: Option<LegacyPartition>,
/// The partition expression of the region.
#[serde(default)]
/// The normalized partition expression of the region.
pub partition_expr: String,
}
#[derive(Debug, Deserialize)]
struct RegionDe {
id: RegionId,
name: String,
#[serde(default)]
attrs: BTreeMap<String, String>,
#[serde(default)]
partition: Option<LegacyPartition>,
#[serde(default)]
partition_expr: String,
}
impl<'de> Deserialize<'de> for Region {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let de = RegionDe::deserialize(deserializer)?;
// Compatibility path for legacy serialized routes: prefer the normalized
// `partition_expr` field and only fall back to legacy `partition.value_list`.
let partition_expr = if de.partition_expr.is_empty() {
if let Some(LegacyPartition { value_list, .. }) = &de.partition {
value_list
.first()
.map(|expr| String::from_utf8_lossy(expr).to_string())
.unwrap_or_default()
} else {
String::new()
}
} else {
de.partition_expr
};
Ok(Self {
id: de.id,
name: de.name,
attrs: de.attrs,
partition_expr,
})
}
}
impl Region {
#[cfg(any(test, feature = "testing"))]
pub fn new_test(id: RegionId) -> Self {
@@ -400,17 +476,7 @@ impl Region {
/// Gets the partition expression of the region in compatible mode.
pub fn partition_expr(&self) -> String {
if !self.partition_expr.is_empty() {
self.partition_expr.clone()
} else if let Some(LegacyPartition { value_list, .. }) = &self.partition {
if !value_list.is_empty() {
String::from_utf8_lossy(&value_list[0]).to_string()
} else {
"".to_string()
}
} else {
"".to_string()
}
self.partition_expr.clone()
}
}
@@ -436,7 +502,6 @@ impl From<PbRegion> for Region {
Self {
id: r.id.into(),
name: r.name,
partition: None,
partition_expr,
attrs: r.attrs.into_iter().collect::<BTreeMap<_, _>>(),
}
@@ -519,13 +584,13 @@ mod tests {
id: 2.into(),
name: "r2".to_string(),
attrs: BTreeMap::new(),
partition: None,
partition_expr: "".to_string(),
},
leader_peer: Some(Peer::new(1, "a1")),
follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
};
assert!(!region_route.is_leader_downgrading());
@@ -542,13 +607,13 @@ mod tests {
id: 2.into(),
name: "r2".to_string(),
attrs: BTreeMap::new(),
partition: None,
partition_expr: "".to_string(),
},
leader_peer: Some(Peer::new(1, "a1")),
follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
};
let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}]}"#;
@@ -565,13 +630,13 @@ mod tests {
id: 2.into(),
name: "r2".to_string(),
attrs: BTreeMap::new(),
partition: None,
partition_expr: "".to_string(),
},
leader_peer: Some(Peer::new(1, "a1")),
follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
leader_state: Some(LeaderState::Downgrading),
leader_down_since: None,
write_route_policy: None,
};
let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_state":"Downgraded","leader_down_since":null}"#;
let decoded: RegionRoute = serde_json::from_str(input).unwrap();
@@ -582,13 +647,13 @@ mod tests {
id: 2.into(),
name: "r2".to_string(),
attrs: BTreeMap::new(),
partition: None,
partition_expr: "".to_string(),
},
leader_peer: Some(Peer::new(1, "a1")),
follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
leader_state: Some(LeaderState::Downgrading),
leader_down_since: None,
write_route_policy: None,
};
let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_status":"Downgraded","leader_down_since":null}"#;
let decoded: RegionRoute = serde_json::from_str(input).unwrap();
@@ -599,13 +664,13 @@ mod tests {
id: 2.into(),
name: "r2".to_string(),
attrs: BTreeMap::new(),
partition: None,
partition_expr: "".to_string(),
},
leader_peer: Some(Peer::new(1, "a1")),
follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
leader_state: Some(LeaderState::Downgrading),
leader_down_since: None,
write_route_policy: None,
};
let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_state":"Downgrading","leader_down_since":null}"#;
let decoded: RegionRoute = serde_json::from_str(input).unwrap();
@@ -616,19 +681,65 @@ mod tests {
id: 2.into(),
name: "r2".to_string(),
attrs: BTreeMap::new(),
partition: None,
partition_expr: "".to_string(),
},
leader_peer: Some(Peer::new(1, "a1")),
follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
leader_state: Some(LeaderState::Downgrading),
leader_down_since: None,
write_route_policy: None,
};
let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_status":"Downgrading","leader_down_since":null}"#;
let decoded: RegionRoute = serde_json::from_str(input).unwrap();
assert_eq!(decoded, region_route);
}
#[test]
fn test_region_route_write_route_policy_decode_compatibility() {
let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"}],"write_route_policy":"IgnoreAllWrites"}"#;
let decoded: RegionRoute = serde_json::from_str(input).unwrap();
assert!(decoded.is_ignore_all_writes());
}
#[test]
fn test_region_route_write_route_policy_default_not_serialized() {
let region_route = RegionRoute {
region: Region {
id: 2.into(),
name: "r2".to_string(),
attrs: BTreeMap::new(),
partition_expr: "".to_string(),
},
leader_peer: Some(Peer::new(1, "a1")),
follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
};
let encoded = serde_json::to_string(&region_route).unwrap();
assert!(!encoded.contains("write_route_policy"));
}
#[test]
fn test_region_route_write_route_policy_helpers() {
let mut region_route = RegionRoute {
region: Region::new_test(2.into()),
leader_peer: Some(Peer::new(1, "a1")),
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
};
assert!(!region_route.is_ignore_all_writes());
region_route.set_ignore_all_writes();
assert!(region_route.is_ignore_all_writes());
region_route.clear_ignore_all_writes();
assert!(!region_route.is_ignore_all_writes());
}
#[test]
fn test_region_distribution() {
let region_routes = vec![
@@ -637,26 +748,26 @@ mod tests {
id: RegionId::new(1, 1),
name: "r1".to_string(),
attrs: BTreeMap::new(),
partition: None,
partition_expr: "".to_string(),
},
leader_peer: Some(Peer::new(1, "a1")),
follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
},
RegionRoute {
region: Region {
id: RegionId::new(1, 2),
name: "r2".to_string(),
attrs: BTreeMap::new(),
partition: None,
partition_expr: "".to_string(),
},
leader_peer: Some(Peer::new(2, "a2")),
follower_peers: vec![Peer::new(1, "a1"), Peer::new(3, "a3")],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
},
];
@@ -693,7 +804,6 @@ mod tests {
let r2: Region = r.into();
assert_eq!(r2.partition_expr(), "");
assert!(r2.partition.is_none());
let r3: PbRegion = r2.into();
assert_eq!(r3.partition.as_ref().unwrap().expression, "");
@@ -712,7 +822,6 @@ mod tests {
let r2: Region = r.into();
assert_eq!(r2.partition_expr(), "{}");
assert!(r2.partition.is_none());
let r3: PbRegion = r2.into();
assert_eq!(r3.partition.as_ref().unwrap().expression, "{}");
@@ -731,7 +840,6 @@ mod tests {
let r2: Region = r.into();
assert_eq!(r2.partition_expr(), "a>b");
assert!(r2.partition.is_none());
let r3: PbRegion = r2.into();
assert_eq!(r3.partition.as_ref().unwrap().expression, "a>b");

View File

@@ -289,7 +289,7 @@ mod tests {
HeartbeatMailbox, IncomingMessage, MailboxRef, MessageMeta,
};
use common_meta::instruction::{
DowngradeRegion, EnterStagingRegion, OpenRegion, UpgradeRegion,
DowngradeRegion, EnterStagingRegion, OpenRegion, StagingPartitionDirective, UpgradeRegion,
};
use common_meta::kv_backend::memory::MemoryKvBackend;
use mito2::config::MitoConfig;
@@ -382,7 +382,7 @@ mod tests {
// Enter staging region
let instruction = Instruction::EnterStagingRegions(vec![EnterStagingRegion {
region_id,
partition_expr: "".to_string(),
partition_directive: StagingPartitionDirective::UpdatePartitionExpr("".to_string()),
}]);
assert!(
heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((meta, instruction)))

View File

@@ -125,7 +125,7 @@ mod tests {
use partition::expr::{PartitionExpr, col};
use store_api::path_utils::table_dir;
use store_api::region_engine::RegionRole;
use store_api::region_request::EnterStagingRequest;
use store_api::region_request::{EnterStagingRequest, StagingPartitionDirective};
use store_api::storage::RegionId;
use super::*;
@@ -216,7 +216,9 @@ mod tests {
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: partition_expr.as_json_str().unwrap(),
partition_directive: StagingPartitionDirective::UpdatePartitionExpr(
partition_expr.as_json_str().unwrap(),
),
}),
)
.await

View File

@@ -14,10 +14,14 @@
use common_meta::instruction::{
EnterStagingRegion, EnterStagingRegionReply, EnterStagingRegionsReply, InstructionReply,
StagingPartitionDirective as InstructionStagingPartitionDirective,
};
use common_telemetry::{error, warn};
use futures::future::join_all;
use store_api::region_request::{EnterStagingRequest, RegionRequest};
use store_api::region_request::{
EnterStagingRequest, RegionRequest,
StagingPartitionDirective as RequestStagingPartitionDirective,
};
use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
@@ -48,14 +52,9 @@ impl EnterStagingRegionsHandler {
ctx: &HandlerContext,
EnterStagingRegion {
region_id,
partition_expr,
partition_directive,
}: EnterStagingRegion,
) -> EnterStagingRegionReply {
common_telemetry::info!(
"Datanode received enter staging region: {}, partition_expr: {}",
region_id,
partition_expr
);
let Some(writable) = ctx.region_server.is_region_leader(region_id) else {
warn!("Region: {} is not found", region_id);
return EnterStagingRegionReply {
@@ -75,11 +74,24 @@ impl EnterStagingRegionsHandler {
};
}
common_telemetry::info!("Datanode received enter staging region: {}", region_id);
let partition_directive = match partition_directive {
InstructionStagingPartitionDirective::UpdatePartitionExpr(expr) => {
RequestStagingPartitionDirective::UpdatePartitionExpr(expr)
}
InstructionStagingPartitionDirective::RejectAllWrites => {
RequestStagingPartitionDirective::RejectAllWrites
}
};
match ctx
.region_server
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest { partition_expr }),
RegionRequest::EnterStaging(EnterStagingRequest {
partition_directive,
}),
)
.await
{
@@ -106,7 +118,9 @@ impl EnterStagingRegionsHandler {
mod tests {
use std::sync::Arc;
use common_meta::instruction::EnterStagingRegion;
use common_meta::instruction::{
EnterStagingRegion, StagingPartitionDirective as InstructionStagingPartitionDirective,
};
use common_meta::kv_backend::memory::MemoryKvBackend;
use mito2::config::MitoConfig;
use mito2::engine::MITO_ENGINE_NAME;
@@ -136,7 +150,9 @@ mod tests {
&handler_context,
vec![EnterStagingRegion {
region_id,
partition_expr: "".to_string(),
partition_directive: InstructionStagingPartitionDirective::UpdatePartitionExpr(
"".to_string(),
),
}],
)
.await
@@ -165,7 +181,9 @@ mod tests {
&handler_context,
vec![EnterStagingRegion {
region_id,
partition_expr: "".to_string(),
partition_directive: InstructionStagingPartitionDirective::UpdatePartitionExpr(
"".to_string(),
),
}],
)
.await
@@ -204,7 +222,9 @@ mod tests {
&handler_context,
vec![EnterStagingRegion {
region_id,
partition_expr: PARTITION_EXPR.to_string(),
partition_directive: InstructionStagingPartitionDirective::UpdatePartitionExpr(
PARTITION_EXPR.to_string(),
),
}],
)
.await
@@ -221,7 +241,9 @@ mod tests {
&handler_context,
vec![EnterStagingRegion {
region_id,
partition_expr: PARTITION_EXPR.to_string(),
partition_directive: InstructionStagingPartitionDirective::UpdatePartitionExpr(
PARTITION_EXPR.to_string(),
),
}],
)
.await
@@ -238,7 +260,9 @@ mod tests {
&handler_context,
vec![EnterStagingRegion {
region_id,
partition_expr: "".to_string(),
partition_directive: InstructionStagingPartitionDirective::UpdatePartitionExpr(
"".to_string(),
),
}],
)
.await
@@ -249,4 +273,32 @@ mod tests {
assert!(reply.error.is_some());
assert!(!reply.ready);
}
#[tokio::test]
async fn test_enter_staging_reject_all_writes() {
let mut region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);
let mut engine_env = TestEnv::new().await;
let engine = engine_env.create_engine(MitoConfig::default()).await;
region_server.register_engine(Arc::new(engine.clone()));
prepare_region(&region_server).await;
let kv_backend = Arc::new(MemoryKvBackend::new());
let handler_context = HandlerContext::new_for_test(region_server, kv_backend);
let replies = EnterStagingRegionsHandler
.handle(
&handler_context,
vec![EnterStagingRegion {
region_id,
partition_directive: InstructionStagingPartitionDirective::RejectAllWrites,
}],
)
.await
.unwrap();
let replies = replies.expect_enter_staging_regions_reply();
let reply = &replies[0];
assert!(reply.exists);
assert!(reply.ready);
assert!(reply.error.is_none());
}
}

View File

@@ -105,7 +105,9 @@ mod tests {
use partition::expr::{PartitionExpr, col};
use store_api::path_utils::table_dir;
use store_api::region_engine::RegionRole;
use store_api::region_request::{EnterStagingRequest, RegionRequest};
use store_api::region_request::{
EnterStagingRequest, RegionRequest, StagingPartitionDirective,
};
use store_api::storage::RegionId;
use crate::heartbeat::handler::remap_manifest::RemapManifestHandler;
@@ -192,7 +194,9 @@ mod tests {
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: partition_expr.as_json_str().unwrap(),
partition_directive: StagingPartitionDirective::UpdatePartitionExpr(
partition_expr.as_json_str().unwrap(),
),
}),
)
.await

View File

@@ -75,7 +75,6 @@ mod tests {
id: region_id,
name: "r".to_string(),
attrs: Default::default(),
partition: None,
partition_expr: expr_json.to_string(),
},
..Default::default()

View File

@@ -337,6 +337,7 @@ mod test {
follower_peers: vec![follower_peer.clone()],
leader_state: Some(LeaderState::Downgrading),
leader_down_since: Some(1),
write_route_policy: None,
},
RegionRoute {
region: Region::new_test(another_region_id),

View File

@@ -337,6 +337,7 @@ mod tests {
follower_peers: vec![Peer::empty(2), Peer::empty(3)],
leader_state: Some(LeaderState::Downgrading),
leader_down_since: Some(current_time_millis()),
write_route_policy: None,
}];
env.create_physical_table_metadata(table_info, region_routes)
@@ -376,6 +377,7 @@ mod tests {
follower_peers: vec![Peer::empty(2), Peer::empty(3)],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
}];
env.create_physical_table_metadata(table_info, region_routes)
@@ -403,6 +405,7 @@ mod tests {
follower_peers: vec![Peer::empty(2), Peer::empty(3)],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
}];
env.create_physical_table_metadata(table_info, region_routes)
@@ -431,6 +434,7 @@ mod tests {
follower_peers: vec![Peer::empty(2), Peer::empty(3)],
leader_state: Some(LeaderState::Downgrading),
leader_down_since: None,
write_route_policy: None,
}];
env.create_physical_table_metadata(table_info, region_routes)

View File

@@ -274,6 +274,7 @@ mod tests {
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
};
update_result_with_region_route(&mut result, &region_route, 2, 1).unwrap();
assert_eq!(
@@ -293,6 +294,7 @@ mod tests {
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
};
update_result_with_region_route(&mut result, &region_route, 2, 3).unwrap();
assert_eq!(
@@ -312,6 +314,7 @@ mod tests {
follower_peers: vec![Peer::empty(2)],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
};
update_result_with_region_route(&mut result, &region_route, 1, 2).unwrap();
assert_eq!(
@@ -331,6 +334,7 @@ mod tests {
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
};
update_result_with_region_route(&mut result, &region_route, 1, 3).unwrap();
assert_eq!(
@@ -350,6 +354,7 @@ mod tests {
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
};
let err = update_result_with_region_route(&mut result, &region_route, 1, 3).unwrap_err();
assert_matches!(err, Error::Unexpected { .. });
@@ -438,6 +443,7 @@ mod tests {
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
},
// Leader peer changed.
RegionRoute {
@@ -446,6 +452,7 @@ mod tests {
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
},
// Peer conflict.
RegionRoute {
@@ -454,6 +461,7 @@ mod tests {
follower_peers: vec![Peer::empty(2)],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
},
// Normal case.
RegionRoute {
@@ -462,6 +470,7 @@ mod tests {
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
},
])),
)

View File

@@ -19,6 +19,7 @@ use std::time::{Duration, Instant};
use api::v1::meta::MailboxMessage;
use common_meta::instruction::{
EnterStagingRegionReply, EnterStagingRegionsReply, Instruction, InstructionReply,
StagingPartitionDirective,
};
use common_meta::peer::Peer;
use common_procedure::{Context as ProcedureContext, Status};
@@ -26,6 +27,7 @@ use common_telemetry::info;
use futures::future::{join_all, try_join_all};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, ensure};
use store_api::storage::RegionId;
use crate::error::{self, Error, Result};
use crate::handler::HeartbeatMailbox;
@@ -33,7 +35,7 @@ use crate::procedure::repartition::group::remap_manifest::RemapManifest;
use crate::procedure::repartition::group::utils::{
HandleMultipleResult, group_region_routes_by_peer, handle_multiple_results,
};
use crate::procedure::repartition::group::{Context, GroupPrepareResult, State};
use crate::procedure::repartition::group::{Context, GroupId, GroupPrepareResult, State};
use crate::procedure::repartition::plan::RegionDescriptor;
use crate::procedure::utils::{self, ErrorStrategy};
use crate::service::mailbox::{Channel, MailboxRef};
@@ -71,8 +73,10 @@ impl State for EnterStagingRegion {
impl EnterStagingRegion {
fn build_enter_staging_instructions(
group_id: GroupId,
prepare_result: &GroupPrepareResult,
targets: &[RegionDescriptor],
pending_deallocate_region_ids: &[RegionId],
) -> Result<HashMap<Peer, Vec<common_meta::instruction::EnterStagingRegion>>> {
let target_partition_expr_by_region = targets
.iter()
@@ -96,11 +100,32 @@ impl EnterStagingRegion {
.map(|region_id| common_meta::instruction::EnterStagingRegion {
region_id,
// Safety: the target_routes is constructed from the targets, so the region_id is always present in the map.
partition_expr: target_partition_expr_by_region[&region_id].clone(),
partition_directive: StagingPartitionDirective::UpdatePartitionExpr(
target_partition_expr_by_region[&region_id].clone(),
),
})
.collect();
instructions.insert(peer.clone(), enter_staging_regions);
}
// For pending deallocate regions, set the partition rule to RejectAllWrites.
for region_id in pending_deallocate_region_ids {
let peer = prepare_result
.source_routes
.iter()
.find(|route| route.region.id == *region_id)
.map(|route| route.leader_peer.as_ref().unwrap())
.context(error::RepartitionSourceRegionMissingSnafu {
group_id,
region_id: *region_id,
})?;
instructions
.entry(peer.clone())
.or_insert_with(Vec::new)
.push(common_meta::instruction::EnterStagingRegion {
region_id: *region_id,
partition_directive: StagingPartitionDirective::RejectAllWrites,
});
}
Ok(instructions)
}
@@ -111,7 +136,12 @@ impl EnterStagingRegion {
// Safety: the group prepare result is set in the RepartitionStart state.
let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
let targets = &ctx.persistent_ctx.targets;
let instructions = Self::build_enter_staging_instructions(prepare_result, targets)?;
let instructions = Self::build_enter_staging_instructions(
group_id,
prepare_result,
targets,
&ctx.persistent_ctx.pending_deallocate_region_ids,
)?;
let target_region_count = targets.len();
let peer_count = instructions.len();
let operation_timeout =
@@ -403,9 +433,11 @@ mod tests {
use std::assert_matches::assert_matches;
use std::time::Duration;
use common_meta::instruction::StagingPartitionDirective;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use store_api::storage::RegionId;
use uuid::Uuid;
use crate::error::{self, Error};
use crate::procedure::repartition::group::GroupPrepareResult;
@@ -453,9 +485,13 @@ mod tests {
central_region_datanode: Peer::empty(1),
};
let targets = test_targets();
let instructions =
EnterStagingRegion::build_enter_staging_instructions(&prepare_result, &targets)
.unwrap();
let instructions = EnterStagingRegion::build_enter_staging_instructions(
Uuid::new_v4(),
&prepare_result,
&targets,
&[],
)
.unwrap();
assert_eq!(instructions.len(), 2);
let instruction_1 = instructions.get(&Peer::empty(1)).unwrap().clone();
@@ -463,7 +499,9 @@ mod tests {
instruction_1,
vec![common_meta::instruction::EnterStagingRegion {
region_id: RegionId::new(table_id, 1),
partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(),
partition_directive: StagingPartitionDirective::UpdatePartitionExpr(
range_expr("x", 0, 10).as_json_str().unwrap(),
),
}]
);
let instruction_2 = instructions.get(&Peer::empty(2)).unwrap().clone();
@@ -471,7 +509,9 @@ mod tests {
instruction_2,
vec![common_meta::instruction::EnterStagingRegion {
region_id: RegionId::new(table_id, 2),
partition_expr: range_expr("x", 10, 20).as_json_str().unwrap(),
partition_directive: StagingPartitionDirective::UpdatePartitionExpr(
range_expr("x", 10, 20).as_json_str().unwrap(),
),
}]
);
}
@@ -483,7 +523,9 @@ mod tests {
let peer = Peer::empty(1);
let enter_staging_regions = vec![common_meta::instruction::EnterStagingRegion {
region_id: RegionId::new(1024, 1),
partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(),
partition_directive: StagingPartitionDirective::UpdatePartitionExpr(
range_expr("x", 0, 10).as_json_str().unwrap(),
),
}];
let timeout = Duration::from_secs(10);
@@ -512,7 +554,9 @@ mod tests {
let peer = Peer::empty(1);
let enter_staging_regions = vec![common_meta::instruction::EnterStagingRegion {
region_id: RegionId::new(1024, 1),
partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(),
partition_directive: StagingPartitionDirective::UpdatePartitionExpr(
range_expr("x", 0, 10).as_json_str().unwrap(),
),
}];
let timeout = Duration::from_secs(10);
@@ -543,7 +587,9 @@ mod tests {
let peer = Peer::empty(1);
let enter_staging_regions = vec![common_meta::instruction::EnterStagingRegion {
region_id: RegionId::new(1024, 1),
partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(),
partition_directive: StagingPartitionDirective::UpdatePartitionExpr(
range_expr("x", 0, 10).as_json_str().unwrap(),
),
}];
let timeout = Duration::from_secs(10);
@@ -579,7 +625,9 @@ mod tests {
let peer = Peer::empty(1);
let enter_staging_regions = vec![common_meta::instruction::EnterStagingRegion {
region_id: RegionId::new(1024, 1),
partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(),
partition_directive: StagingPartitionDirective::UpdatePartitionExpr(
range_expr("x", 0, 10).as_json_str().unwrap(),
),
}];
let timeout = Duration::from_secs(10);

View File

@@ -37,13 +37,13 @@ fn ensure_region_route_expr_match(
region_route: &RegionRoute,
region_descriptor: &RegionDescriptor,
) -> Result<RegionRoute> {
let actual = &region_route.region.partition_expr;
let actual = region_route.region.partition_expr();
let expected = region_descriptor
.partition_expr
.as_json_str()
.context(error::SerializePartitionExprSnafu)?;
ensure!(
actual == &expected,
actual == expected,
error::PartitionExprMismatchSnafu {
region_id: region_route.region.id,
expected,
@@ -304,4 +304,51 @@ mod tests {
.unwrap_err();
assert_matches!(err, Error::RepartitionTargetRegionMissing { .. });
}
#[test]
fn test_ensure_route_present_legacy_partition_expr_source() {
let source_region = RegionDescriptor {
region_id: RegionId::new(1024, 1),
partition_expr: range_expr("x", 0, 100),
};
let target_region = RegionDescriptor {
region_id: RegionId::new(1024, 2),
partition_expr: range_expr("x", 0, 10),
};
let legacy_partition_expr = range_expr("x", 0, 100).as_json_str().unwrap();
let legacy_region_json = serde_json::json!({
"id": RegionId::new(1024, 1).as_u64(),
"name": "",
"partition": {
"column_list": ["x"],
"value_list": [legacy_partition_expr]
},
"partition_expr": "",
"attrs": {}
});
let region_routes = vec![
RegionRoute {
region: serde_json::from_value(legacy_region_json).unwrap(),
leader_peer: Some(Peer::empty(1)),
..Default::default()
},
RegionRoute {
region: Region {
id: RegionId::new(1024, 2),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
},
];
let result = RepartitionStart::ensure_route_present(
Uuid::new_v4(),
&region_routes,
&[source_region],
&[target_region],
);
assert!(result.is_ok());
}
}

View File

@@ -34,6 +34,7 @@ impl UpdateMetadata {
group_id: GroupId,
sources: &[RegionDescriptor],
targets: &[RegionDescriptor],
pending_deallocate_region_ids: &[store_api::storage::RegionId],
current_region_routes: &[RegionRoute],
) -> Result<Vec<RegionRoute>> {
let mut region_routes = current_region_routes.to_vec();
@@ -54,6 +55,7 @@ impl UpdateMetadata {
.as_json_str()
.context(error::SerializePartitionExprSnafu)?;
region_route.set_leader_staging();
region_route.clear_ignore_all_writes();
}
for source in sources {
@@ -64,6 +66,10 @@ impl UpdateMetadata {
},
)?;
region_route.set_leader_staging();
if pending_deallocate_region_ids.contains(&source.region_id) {
// When a region is pending deallocation, it should ignore all writes.
region_route.set_ignore_all_writes();
}
}
Ok(region_routes)
@@ -86,6 +92,7 @@ impl UpdateMetadata {
group_id,
&ctx.persistent_ctx.sources,
&ctx.persistent_ctx.targets,
&ctx.persistent_ctx.pending_deallocate_region_ids,
region_routes,
)?;
@@ -169,6 +176,7 @@ mod tests {
group_id,
&[source_region],
&[target_region],
&[],
&region_routes,
)
.unwrap();
@@ -184,4 +192,53 @@ mod tests {
assert!(new_region_routes[1].is_leader_staging());
assert!(!new_region_routes[2].is_leader_staging());
}
#[test]
fn test_generate_region_routes_mark_pending_deallocate_reject_all_writes() {
let group_id = Uuid::new_v4();
let table_id = 1024;
let pending_deallocate_region_id = RegionId::new(table_id, 1);
let region_routes = vec![
RegionRoute {
region: Region {
id: pending_deallocate_region_id,
partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
},
RegionRoute {
region: Region {
id: RegionId::new(table_id, 2),
partition_expr: String::new(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
},
];
let source_region = RegionDescriptor {
region_id: pending_deallocate_region_id,
partition_expr: range_expr("x", 0, 100),
};
let target_region = RegionDescriptor {
region_id: RegionId::new(table_id, 2),
partition_expr: range_expr("x", 0, 10),
};
let new_region_routes = UpdateMetadata::apply_staging_region_routes(
group_id,
&[source_region],
&[target_region],
&[pending_deallocate_region_id],
&region_routes,
)
.unwrap();
assert!(new_region_routes[0].is_leader_staging());
assert!(new_region_routes[0].is_ignore_all_writes());
assert!(new_region_routes[1].is_leader_staging());
assert!(!new_region_routes[1].is_ignore_all_writes());
}
}

View File

@@ -104,3 +104,65 @@ impl UpdateMetadata {
Ok(())
}
}
#[cfg(test)]
mod tests {
use common_meta::peer::Peer;
use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
use store_api::storage::RegionId;
use uuid::Uuid;
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
use crate::procedure::repartition::plan::RegionDescriptor;
use crate::procedure::repartition::test_util::range_expr;
#[test]
fn test_exit_staging_region_routes_keep_reject_all_writes() {
let group_id = Uuid::new_v4();
let table_id = 1024;
let source_region = RegionDescriptor {
region_id: RegionId::new(table_id, 1),
partition_expr: range_expr("x", 0, 100),
};
let target_region = RegionDescriptor {
region_id: RegionId::new(table_id, 2),
partition_expr: range_expr("x", 0, 50),
};
let mut source_route = RegionRoute {
region: Region {
id: source_region.region_id,
partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
leader_state: Some(LeaderState::Staging),
..Default::default()
};
source_route.set_ignore_all_writes();
let mut target_route = RegionRoute {
region: Region {
id: target_region.region_id,
partition_expr: range_expr("x", 0, 50).as_json_str().unwrap(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
leader_state: Some(LeaderState::Staging),
..Default::default()
};
target_route.set_ignore_all_writes();
let new_region_routes = UpdateMetadata::exit_staging_region_routes(
group_id,
&[source_region],
&[target_region],
&[source_route, target_route],
)
.unwrap();
assert!(!new_region_routes[0].is_leader_staging());
assert!(new_region_routes[0].is_ignore_all_writes());
assert!(!new_region_routes[1].is_leader_staging());
assert!(new_region_routes[1].is_ignore_all_writes());
}
}

View File

@@ -50,6 +50,7 @@ impl UpdateMetadata {
)?;
region_route.region.partition_expr = source.region.partition_expr.clone();
region_route.clear_leader_staging();
region_route.clear_ignore_all_writes();
}
for target in target_routes {
@@ -125,15 +126,19 @@ mod tests {
let group_id = Uuid::new_v4();
let table_id = 1024;
let region_routes = vec![
RegionRoute {
region: Region {
id: RegionId::new(table_id, 1),
partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(),
{
let mut route = RegionRoute {
region: Region {
id: RegionId::new(table_id, 1),
partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
leader_state: Some(LeaderState::Staging),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
leader_state: Some(LeaderState::Staging),
..Default::default()
};
route.set_ignore_all_writes();
route
},
RegionRoute {
region: Region {
@@ -182,11 +187,13 @@ mod tests {
)
.unwrap();
assert!(!new_region_routes[0].is_leader_staging());
assert!(!new_region_routes[0].is_ignore_all_writes());
assert_eq!(
new_region_routes[0].region.partition_expr,
range_expr("x", 0, 20).as_json_str().unwrap(),
);
assert!(!new_region_routes[1].is_leader_staging());
assert!(!new_region_routes[1].is_ignore_all_writes());
assert!(new_region_routes[2].is_leader_downgrading());
}
}

View File

@@ -145,6 +145,7 @@ mod tests {
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
}
}

View File

@@ -39,6 +39,7 @@ pub(crate) fn new_region_route(region_id: u64, peers: &[Peer], leader_node: u64)
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
}
}

View File

@@ -186,7 +186,7 @@ impl MetricEngineInner {
merged_version == request_version,
InvalidRequestSnafu {
region_id: physical_region_id,
reason: "inconsistent partition rule version in batch"
reason: "inconsistent partition expr version in batch"
}
);
} else {
@@ -312,7 +312,7 @@ impl MetricEngineInner {
merged_version == request_version,
InvalidRequestSnafu {
region_id: logical_region_id,
reason: "inconsistent partition rule version in batch"
reason: "inconsistent partition expr version in batch"
}
);
} else {
@@ -569,7 +569,9 @@ mod tests {
};
use store_api::path_utils::table_dir;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{EnterStagingRequest, RegionRequest};
use store_api::region_request::{
EnterStagingRequest, RegionRequest, StagingPartitionDirective,
};
use store_api::storage::ScanRequest;
use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
@@ -1200,7 +1202,9 @@ mod tests {
.handle_request(
physical_region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: partition_expr.clone(),
partition_directive: StagingPartitionDirective::UpdatePartitionExpr(
partition_expr.clone(),
),
}),
)
.await

View File

@@ -26,7 +26,7 @@ use store_api::region_engine::{
};
use store_api::region_request::{
ApplyStagingManifestRequest, EnterStagingRequest, RegionFlushRequest, RegionPutRequest,
RegionRequest,
RegionRequest, StagingPartitionDirective,
};
use store_api::storage::{FileId, RegionId};
@@ -141,7 +141,9 @@ async fn test_apply_staging_manifest_mismatched_partition_expr_with_format(flat_
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: range_expr("x", 0, 50).as_json_str().unwrap(),
partition_directive: StagingPartitionDirective::UpdatePartitionExpr(
range_expr("x", 0, 50).as_json_str().unwrap(),
),
}),
)
.await
@@ -292,7 +294,9 @@ async fn test_apply_staging_manifest_success_with_format(flat_format: bool) {
.handle_request(
new_region_id_1,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: range_expr("tag_0", 0, 50).as_json_str().unwrap(),
partition_directive: StagingPartitionDirective::UpdatePartitionExpr(
range_expr("tag_0", 0, 50).as_json_str().unwrap(),
),
}),
)
.await
@@ -417,7 +421,9 @@ async fn test_apply_staging_manifest_invalid_files_to_add_with_format(flat_forma
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: range_expr("tag_0", 0, 50).as_json_str().unwrap(),
partition_directive: StagingPartitionDirective::UpdatePartitionExpr(
range_expr("tag_0", 0, 50).as_json_str().unwrap(),
),
}),
)
.await
@@ -510,7 +516,9 @@ async fn test_apply_staging_manifest_change_edit_different_columns_fails_with_fo
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: partition_expr.clone(),
partition_directive: StagingPartitionDirective::UpdatePartitionExpr(
partition_expr.clone(),
),
}),
)
.await
@@ -629,7 +637,9 @@ async fn test_apply_staging_manifest_preserves_unflushed_memtable_with_format(fl
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: partition_expr.clone(),
partition_directive: StagingPartitionDirective::UpdatePartitionExpr(
partition_expr.clone(),
),
}),
)
.await
@@ -740,7 +750,9 @@ async fn test_split_repartition_causes_duplicate_data() {
.handle_request(
source_region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: target_partition_expr_1.as_json_str().unwrap(),
partition_directive: StagingPartitionDirective::UpdatePartitionExpr(
target_partition_expr_1.as_json_str().unwrap(),
),
}),
)
.await
@@ -784,7 +796,9 @@ async fn test_split_repartition_causes_duplicate_data() {
.handle_request(
target_region_id_2,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: target_partition_expr_2.as_json_str().unwrap(),
partition_directive: StagingPartitionDirective::UpdatePartitionExpr(
target_partition_expr_2.as_json_str().unwrap(),
),
}),
)
.await
@@ -952,7 +966,9 @@ async fn test_merge_repartition_data_integrity() {
.handle_request(
target_region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: target_partition_expr.as_json_str().unwrap(),
partition_directive: StagingPartitionDirective::UpdatePartitionExpr(
target_partition_expr.as_json_str().unwrap(),
),
}),
)
.await

View File

@@ -19,7 +19,9 @@ use common_recordbatch::RecordBatches;
use datatypes::value::Value;
use partition::expr::col;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{EnterStagingRequest, RegionFlushRequest, RegionRequest};
use store_api::region_request::{
EnterStagingRequest, RegionFlushRequest, RegionRequest, StagingPartitionDirective,
};
use store_api::storage::{RegionId, ScanRequest};
use crate::config::MitoConfig;
@@ -101,7 +103,9 @@ async fn test_partition_filter_basic_with_format(flat_format: bool) {
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: new_partition_expr.clone(),
partition_directive: StagingPartitionDirective::UpdatePartitionExpr(
new_partition_expr.clone(),
),
}),
)
.await

View File

@@ -36,7 +36,7 @@ use store_api::region_engine::{
};
use store_api::region_request::{
ApplyStagingManifestRequest, EnterStagingRequest, RegionAlterRequest, RegionFlushRequest,
RegionPutRequest, RegionRequest, RegionTruncateRequest,
RegionPutRequest, RegionRequest, RegionTruncateRequest, StagingPartitionDirective,
};
use store_api::storage::{RegionId, ScanRequest};
@@ -252,6 +252,52 @@ fn default_partition_expr() -> String {
range_expr("a", 0, 100).as_json_str().unwrap()
}
#[tokio::test]
async fn test_staging_reject_all_writes_rejects_put() {
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(2048, 0);
let request = CreateRequestBuilder::new().build();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
engine
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_directive: StagingPartitionDirective::RejectAllWrites,
}),
)
.await
.unwrap();
let rows = Rows {
schema: column_schemas,
rows: build_rows(0, 2),
};
let err = engine
.handle_request(
region_id,
RegionRequest::Put(RegionPutRequest {
rows,
hint: None,
partition_expr_version: None,
}),
)
.await
.unwrap_err();
assert_eq!(err.status_code(), StatusCode::StorageUnavailable);
assert_matches!(
err.into_inner().as_any().downcast_ref::<Error>().unwrap(),
Error::RejectWrite { .. }
);
}
#[tokio::test]
async fn test_staging_write_partition_expr_version() {
test_staging_write_partition_expr_version_with_format(false).await;
@@ -285,7 +331,9 @@ async fn test_staging_write_partition_expr_version_with_format(flat_format: bool
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: partition_expr.clone(),
partition_directive: StagingPartitionDirective::UpdatePartitionExpr(
partition_expr.clone(),
),
}),
)
.await
@@ -316,7 +364,7 @@ async fn test_staging_write_partition_expr_version_with_format(flat_format: bool
assert_eq!(err.status_code(), StatusCode::InvalidArguments);
assert_matches!(
err.into_inner().as_any().downcast_ref::<Error>().unwrap(),
Error::PartitionRuleVersionMismatch { .. }
Error::PartitionExprVersionMismatch { .. }
);
let compat_rows = Rows {
@@ -492,7 +540,9 @@ async fn test_staging_manifest_directory_with_format(flat_format: bool) {
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: partition_expr.clone(),
partition_directive: StagingPartitionDirective::UpdatePartitionExpr(
partition_expr.clone(),
),
}),
)
.await
@@ -500,8 +550,12 @@ async fn test_staging_manifest_directory_with_format(flat_format: bool) {
let region = engine.get_region(region_id).unwrap();
let staging_partition_info = region.staging_partition_info.lock().unwrap().clone();
assert_eq!(
staging_partition_info.unwrap().partition_expr,
partition_expr
staging_partition_info
.unwrap()
.partition_expr()
.unwrap()
.to_string(),
partition_expr,
);
{
let manager = region.manifest_ctx.manifest_manager.read().await;
@@ -523,7 +577,9 @@ async fn test_staging_manifest_directory_with_format(flat_format: bool) {
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: partition_expr.clone(),
partition_directive: StagingPartitionDirective::UpdatePartitionExpr(
partition_expr.clone(),
),
}),
)
.await
@@ -534,7 +590,7 @@ async fn test_staging_manifest_directory_with_format(flat_format: bool) {
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: "".to_string(),
partition_directive: StagingPartitionDirective::UpdatePartitionExpr("".to_string()),
}),
)
.await
@@ -629,7 +685,9 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool)
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: partition_expr.clone(),
partition_directive: StagingPartitionDirective::UpdatePartitionExpr(
partition_expr.clone(),
),
}),
)
.await
@@ -843,7 +901,9 @@ async fn test_enter_staging_writes_partition_expr_change_action_with_format(flat
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: partition_expr.clone(),
partition_directive: StagingPartitionDirective::UpdatePartitionExpr(
partition_expr.clone(),
),
}),
)
.await
@@ -905,7 +965,9 @@ async fn test_staging_exit_conflict_partition_expr_change_and_change_with_format
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: partition_expr.clone(),
partition_directive: StagingPartitionDirective::UpdatePartitionExpr(
partition_expr.clone(),
),
}),
)
.await
@@ -1006,7 +1068,9 @@ async fn test_write_stall_on_enter_staging_with_format(flat_format: bool) {
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: partition_expr.clone(),
partition_directive: StagingPartitionDirective::UpdatePartitionExpr(
partition_expr.clone(),
),
}),
)
.await
@@ -1108,7 +1172,9 @@ async fn test_enter_staging_error(env: &mut TestEnv, flat_format: bool) {
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: partition_expr.clone(),
partition_directive: StagingPartitionDirective::UpdatePartitionExpr(
partition_expr.clone(),
),
}),
)
.await

View File

@@ -562,12 +562,12 @@ pub enum Error {
},
#[snafu(display(
"Partition rule version mismatch for region {}: request {}, expected {}",
"Partition expr version mismatch for region {}: request {}, expected {}",
region_id,
request_version,
expected_version
))]
PartitionRuleVersionMismatch {
PartitionExprVersionMismatch {
region_id: RegionId,
request_version: u64,
expected_version: u64,
@@ -1272,7 +1272,7 @@ impl ErrorExt for Error {
| InvalidScanIndex { .. }
| InvalidMeta { .. }
| InvalidRequest { .. }
| PartitionRuleVersionMismatch { .. }
| PartitionExprVersionMismatch { .. }
| FillDefault { .. }
| ConvertColumnDataType { .. }
| ColumnNotFound { .. }

View File

@@ -25,6 +25,7 @@ use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use common_base::hash::partition_expr_version;
use common_telemetry::{error, info, warn};
use crossbeam_utils::atomic::AtomicCell;
use partition::expr::PartitionExpr;
@@ -36,7 +37,7 @@ use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{
RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState,
};
use store_api::region_request::PathType;
use store_api::region_request::{PathType, StagingPartitionDirective};
use store_api::sst_entry::ManifestSstEntry;
use store_api::storage::{FileId, RegionId, SequenceNumber};
use tokio::sync::RwLockWriteGuard;
@@ -168,8 +169,29 @@ pub type MitoRegionRef = Arc<MitoRegion>;
#[derive(Debug, Clone)]
pub(crate) struct StagingPartitionInfo {
pub(crate) partition_expr: String,
pub(crate) partition_expr_version: u64,
pub(crate) partition_directive: StagingPartitionDirective,
pub(crate) partition_rule_version: u64,
}
impl StagingPartitionInfo {
/// Returns the partition expression carried by the staging directive, if any.
pub(crate) fn partition_expr(&self) -> Option<&str> {
self.partition_directive.partition_expr()
}
/// Builds staging partition info from a directive and derives its version marker.
pub(crate) fn from_partition_directive(partition_directive: StagingPartitionDirective) -> Self {
let partition_rule_version = match &partition_directive {
StagingPartitionDirective::UpdatePartitionExpr(expr) => {
partition_expr_version(Some(expr))
}
StagingPartitionDirective::RejectAllWrites => 0,
};
Self {
partition_directive,
partition_rule_version,
}
}
}
impl MitoRegion {
@@ -801,7 +823,7 @@ impl MitoRegion {
}
staging_partition_info
.as_ref()
.map(|info| info.partition_expr.clone())
.and_then(|info| info.partition_expr().map(ToString::to_string))
} else {
let version = self.version();
version.metadata.partition_expr.clone()
@@ -813,12 +835,29 @@ impl MitoRegion {
let staging_partition_info = self.staging_partition_info.lock().unwrap();
staging_partition_info
.as_ref()
.map(|info| info.partition_expr_version)
.map(|info| info.partition_rule_version)
.unwrap_or_default()
} else {
self.version().metadata.partition_expr_version
}
}
/// Returns whether writes should be rejected for this region in staging mode.
pub(crate) fn reject_all_writes_in_staging(&self) -> bool {
if !self.is_staging() {
return false;
}
let staging_partition_info = self.staging_partition_info.lock().unwrap();
staging_partition_info
.as_ref()
.map(|info| {
matches!(
info.partition_directive,
StagingPartitionDirective::RejectAllWrites
)
})
.unwrap_or(false)
}
}
/// Context to update the region manifest.

View File

@@ -40,7 +40,7 @@ use store_api::region_request::{
AffectedRows, ApplyStagingManifestRequest, EnterStagingRequest, RegionAlterRequest,
RegionBuildIndexRequest, RegionBulkInsertsRequest, RegionCatchupRequest, RegionCloseRequest,
RegionCompactRequest, RegionCreateRequest, RegionDropRequest, RegionFlushRequest,
RegionOpenRequest, RegionRequest, RegionTruncateRequest,
RegionOpenRequest, RegionRequest, RegionTruncateRequest, StagingPartitionDirective,
};
use store_api::storage::{FileId, RegionId};
use tokio::sync::oneshot::{self, Receiver, Sender};
@@ -1057,8 +1057,8 @@ pub(crate) struct RegionChangeResult {
pub(crate) struct EnterStagingResult {
/// Region id.
pub(crate) region_id: RegionId,
/// The new partition expression to apply.
pub(crate) partition_expr: String,
/// The new staging partition directive to apply.
pub(crate) partition_directive: StagingPartitionDirective,
/// Result sender.
pub(crate) sender: OptionOutputTx,
/// Result from the manifest manager.

View File

@@ -1140,7 +1140,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
DdlRequest::EnterStaging(req) => {
self.handle_enter_staging_request(
ddl.region_id,
req.partition_expr,
req.partition_directive,
ddl.sender,
)
.await;

View File

@@ -76,17 +76,17 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}
let staging_partition_info = region.staging_partition_info.lock().unwrap().clone();
// If the partition expr mismatch, return error.
if staging_partition_info
let staging_partition_expr = staging_partition_info
.as_ref()
.map(|info| &info.partition_expr)
!= Some(&request.partition_expr)
{
.and_then(|info| info.partition_expr());
// If the partition expr mismatch, return error.
if staging_partition_expr != Some(request.partition_expr.as_str()) {
sender.send(
StagingPartitionExprMismatchSnafu {
manifest_expr: staging_partition_info
.as_ref()
.map(|info| info.partition_expr.clone()),
.and_then(|info| info.partition_expr().map(ToString::to_string)),
request_expr: request.partition_expr,
}
.fail(),

View File

@@ -14,10 +14,9 @@
use std::time::Instant;
use common_base::hash::partition_expr_version;
use common_telemetry::{error, info, warn};
use store_api::logstore::LogStore;
use store_api::region_request::EnterStagingRequest;
use store_api::region_request::{EnterStagingRequest, StagingPartitionDirective};
use store_api::storage::RegionId;
use crate::error::{RegionNotFoundSnafu, Result, StagingPartitionExprMismatchSnafu};
@@ -34,27 +33,27 @@ impl<S: LogStore> RegionWorkerLoop<S> {
pub(crate) async fn handle_enter_staging_request(
&mut self,
region_id: RegionId,
partition_expr: String,
partition_directive: StagingPartitionDirective,
mut sender: OptionOutputTx,
) {
let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
return;
};
// If the region is already in staging mode, verify the partition expr matches.
// If the region is already in staging mode, verify the partition directive matches.
if region.is_staging() {
let staging_partition_info = region.staging_partition_info.lock().unwrap().clone();
// If the partition expr mismatch, return error.
// If the partition directive mismatches, return error.
if staging_partition_info
.as_ref()
.map(|info| &info.partition_expr)
!= Some(&partition_expr)
.map(|info| &info.partition_directive)
!= Some(&partition_directive)
{
sender.send(Err(StagingPartitionExprMismatchSnafu {
manifest_expr: staging_partition_info
.as_ref()
.map(|info| info.partition_expr.clone()),
request_expr: partition_expr,
.and_then(|info| info.partition_expr().map(ToString::to_string)),
request_expr: format!("{:?}", partition_directive),
}
.build()));
return;
@@ -91,16 +90,21 @@ impl<S: LogStore> RegionWorkerLoop<S> {
.add_ddl_request_to_pending(SenderDdlRequest {
region_id,
sender,
request: DdlRequest::EnterStaging(EnterStagingRequest { partition_expr }),
request: DdlRequest::EnterStaging(EnterStagingRequest {
partition_directive: partition_directive.clone(),
}),
});
return;
}
self.handle_enter_staging(region, partition_expr, sender);
self.handle_enter_staging(region, partition_directive, sender);
}
async fn enter_staging(region: &MitoRegionRef, partition_expr: String) -> Result<()> {
async fn enter_staging(
region: &MitoRegionRef,
partition_directive: &StagingPartitionDirective,
) -> Result<()> {
let now = Instant::now();
// First step: clear all staging manifest files.
{
@@ -123,6 +127,20 @@ impl<S: LogStore> RegionWorkerLoop<S> {
);
}
let partition_expr = match partition_directive {
StagingPartitionDirective::UpdatePartitionExpr(partition_expr) => {
partition_expr.clone()
}
StagingPartitionDirective::RejectAllWrites => {
info!(
"Enter staging with reject all writes, region_id: {}",
region.region_id
);
// Rejects all writes just a memory flag, no need to write new staging manifest.
return Ok(());
}
};
// Second step: write new staging manifest.
let change = RegionPartitionExprChange {
partition_expr: Some(partition_expr.clone()),
@@ -140,7 +158,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
fn handle_enter_staging(
&self,
region: MitoRegionRef,
partition_expr: String,
partition_directive: StagingPartitionDirective,
sender: OptionOutputTx,
) {
if let Err(e) = region.set_entering_staging() {
@@ -152,7 +170,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
let request_sender = self.sender.clone();
common_runtime::spawn_global(async move {
let now = Instant::now();
let result = Self::enter_staging(&region, partition_expr.clone()).await;
let result = Self::enter_staging(&region, &partition_directive).await;
match result {
Ok(_) => {
info!(
@@ -184,7 +202,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
region_id: region.region_id,
sender,
result,
partition_expr,
partition_directive,
}),
};
listener
@@ -224,12 +242,12 @@ impl<S: LogStore> RegionWorkerLoop<S> {
if enter_staging_result.result.is_ok() {
info!(
"Updating region {} staging partition expr to {}",
region.region_id, enter_staging_result.partition_expr
"Updating region {} staging partition directive to {:?}",
region.region_id, enter_staging_result.partition_directive
);
Self::update_region_staging_partition_info(
&region,
enter_staging_result.partition_expr,
enter_staging_result.partition_directive,
);
region.switch_state_to_staging(RegionLeaderState::EnteringStaging);
} else {
@@ -243,13 +261,14 @@ impl<S: LogStore> RegionWorkerLoop<S> {
.await;
}
fn update_region_staging_partition_info(region: &MitoRegionRef, partition_expr: String) {
fn update_region_staging_partition_info(
region: &MitoRegionRef,
partition_directive: StagingPartitionDirective,
) {
let mut staging_partition_info = region.staging_partition_info.lock().unwrap();
debug_assert!(staging_partition_info.is_none());
let partition_expr_version = partition_expr_version(Some(&partition_expr));
*staging_partition_info = Some(StagingPartitionInfo {
partition_expr,
partition_expr_version,
});
*staging_partition_info = Some(StagingPartitionInfo::from_partition_directive(
partition_directive,
));
}
}

View File

@@ -26,7 +26,7 @@ use store_api::logstore::LogStore;
use store_api::storage::RegionId;
use crate::error::{
InvalidRequestSnafu, PartitionRuleVersionMismatchSnafu, RegionStateSnafu, RejectWriteSnafu,
InvalidRequestSnafu, PartitionExprVersionMismatchSnafu, RegionStateSnafu, RejectWriteSnafu,
Result,
};
use crate::metrics;
@@ -253,6 +253,13 @@ impl<S> RegionWorkerLoop<S> {
match region.state() {
RegionRoleState::Leader(RegionLeaderState::Writable)
| RegionRoleState::Leader(RegionLeaderState::Staging) => {
if region.reject_all_writes_in_staging() {
sender_req
.sender
.send(RejectWriteSnafu { region_id }.fail());
continue;
}
let region_ctx = RegionWriteCtx::new(
region.region_id,
&region.version_control,
@@ -305,6 +312,12 @@ impl<S> RegionWorkerLoop<S> {
else {
continue;
};
if region.reject_all_writes_in_staging() {
sender_req
.sender
.send(RejectWriteSnafu { region_id }.fail());
continue;
}
let expected_version = region.expected_partition_expr_version();
if let Err(e) = check_partition_expr_version(
region_id,
@@ -380,7 +393,12 @@ impl<S> RegionWorkerLoop<S> {
continue;
};
match region.state() {
RegionRoleState::Leader(RegionLeaderState::Writable) => {
RegionRoleState::Leader(RegionLeaderState::Writable)
| RegionRoleState::Leader(RegionLeaderState::Staging) => {
if region.reject_all_writes_in_staging() {
bulk_req.sender.send(RejectWriteSnafu { region_id }.fail());
continue;
}
let region_ctx = RegionWriteCtx::new(
region.region_id,
&region.version_control,
@@ -420,6 +438,10 @@ impl<S> RegionWorkerLoop<S> {
let Some(region) = self.regions.get_region_or(region_id, &mut bulk_req.sender) else {
continue;
};
if region.reject_all_writes_in_staging() {
bulk_req.sender.send(RejectWriteSnafu { region_id }.fail());
continue;
}
let expected_version = region.expected_partition_expr_version();
if let Err(e) = check_partition_expr_version(
region_id,
@@ -505,7 +527,7 @@ fn check_partition_expr_version(
Some(value) => value,
};
if request_version != expected_version {
return PartitionRuleVersionMismatchSnafu {
return PartitionExprVersionMismatchSnafu {
region_id,
request_version,
expected_version,

View File

@@ -21,7 +21,7 @@ use common_meta::key::TableMetadataManager;
use common_meta::key::table_route::TableRouteValue;
use common_meta::kv_backend::KvBackendRef;
use common_meta::peer::Peer;
use common_meta::rpc::router::{LegacyPartition, Region, RegionRoute};
use common_meta::rpc::router::{Region, RegionRoute};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaBuilder};
use moka::future::CacheBuilder;
@@ -122,6 +122,14 @@ pub(crate) async fn create_partition_rule_manager(
));
let regions = vec![1u32, 2, 3];
let region_wal_options = new_test_region_wal_options(regions.clone());
let expr_str = serde_json::json!({
"Expr": {
"lhs": {"Column": "a"},
"op": "GtEq",
"rhs": {"Value": {"Int32": 50}}
}
})
.to_string();
table_metadata_manager
.create_table_metadata(
new_test_table_info(1, "table_1", regions.clone().into_iter()),
@@ -130,7 +138,6 @@ pub(crate) async fn create_partition_rule_manager(
region: Region {
id: 3.into(),
name: "r1".to_string(),
partition: None,
attrs: BTreeMap::new(),
partition_expr: PartitionExpr::new(
Operand::Column("a".to_string()),
@@ -144,12 +151,12 @@ pub(crate) async fn create_partition_rule_manager(
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
},
RegionRoute {
region: Region {
id: 2.into(),
name: "r2".to_string(),
partition: None,
attrs: BTreeMap::new(),
partition_expr: PartitionExpr::new(
Operand::Expr(PartitionExpr::new(
@@ -171,23 +178,26 @@ pub(crate) async fn create_partition_rule_manager(
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
},
RegionRoute {
region: Region {
id: 1.into(),
name: "r3".to_string(),
// Keep the old partition definition to test compatibility.
partition: Some(LegacyPartition {
column_list: vec![b"a".to_vec()],
value_list: vec![b"{\"Expr\":{\"lhs\":{\"Column\":\"a\"},\"op\":\"GtEq\",\"rhs\":{\"Value\":{\"Int32\":50}}}}".to_vec()],
}),
attrs: BTreeMap::new(),
partition_expr: Default::default(),
},
// Keep legacy `partition` payload to test compatibility.
region: serde_json::from_value(serde_json::json!({
"id": 1,
"name": "r3",
"partition": {
"column_list": ["a"],
"value_list": [expr_str]
},
"attrs": {},
"partition_expr": ""
}))
.unwrap(),
leader_peer: Some(Peer::new(1, "")),
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
write_route_policy: None,
},
]),
region_wal_options.clone(),

View File

@@ -57,6 +57,12 @@ pub fn create_partitions_with_version_from_region_routes(
) -> common_meta::error::Result<Vec<PartitionInfoWithVersion>> {
let mut partitions = Vec::with_capacity(region_routes.len());
for r in region_routes {
// Ignore regions marked as reject-all-writes; they should not participate
// in writable partition-cache construction.
if r.is_ignore_all_writes() {
continue;
}
let expr_json = r.region.partition_expr();
let partition_expr_version = if expr_json.is_empty() {
None
@@ -127,3 +133,50 @@ pub fn new_partition_info_cache(
|ident| matches!(ident, CacheIdent::TableId(_)),
)
}
#[cfg(test)]
mod tests {
use common_base::hash::partition_expr_version;
use common_meta::rpc::router::{Region, RegionRoute, WriteRoutePolicy};
use store_api::storage::RegionId;
use super::create_partitions_with_version_from_region_routes;
#[test]
fn test_create_partitions_with_version_excludes_reject_all_writes() {
let table_id = 1024;
let expr_json =
r#"{"Expr":{"lhs":{"Column":"a"},"op":"GtEq","rhs":{"Value":{"UInt32":10}}}}"#;
let region_routes = vec![
RegionRoute {
region: Region {
id: RegionId::new(table_id, 1),
partition_expr: expr_json.to_string(),
..Default::default()
},
leader_peer: None,
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
write_route_policy: Some(WriteRoutePolicy::IgnoreAllWrites),
},
RegionRoute {
region: Region {
id: RegionId::new(table_id, 2),
partition_expr: expr_json.to_string(),
..Default::default()
},
..Default::default()
},
];
let partitions =
create_partitions_with_version_from_region_routes(table_id, &region_routes).unwrap();
assert_eq!(1, partitions.len());
assert_eq!(RegionId::new(table_id, 2), partitions[0].id);
assert_eq!(
Some(partition_expr_version(Some(expr_json))),
partitions[0].partition_expr_version
);
}
}

View File

@@ -1476,26 +1476,51 @@ impl RegionBulkInsertsRequest {
}
}
/// Request to stage a region with a new region rule(partition expression).
/// Request to stage a region with a new partition directive.
///
/// This request transitions a region into the staging mode.
/// It first flushes the memtable for the old region rule if it is not empty,
/// then enters the staging mode with the new region rule.
/// It first flushes the memtable for the old partition expression if it is not
/// empty, then enters the staging mode with the new directive.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum StagingPartitionDirective {
UpdatePartitionExpr(String),
RejectAllWrites,
}
impl StagingPartitionDirective {
/// Returns the partition expression carried by this directive, if any.
pub fn partition_expr(&self) -> Option<&str> {
match self {
Self::UpdatePartitionExpr(expr) => Some(expr),
Self::RejectAllWrites => None,
}
}
}
#[derive(Debug, Clone)]
pub struct EnterStagingRequest {
/// The partition expression of the staging region.
pub partition_expr: String,
/// The staging partition directive of the region.
pub partition_directive: StagingPartitionDirective,
}
impl EnterStagingRequest {
/// Builds an enter-staging request with a partition expression directive.
pub fn with_partition_expr(partition_expr: String) -> Self {
Self {
partition_directive: StagingPartitionDirective::UpdatePartitionExpr(partition_expr),
}
}
}
/// This request is used as part of the region repartition.
///
/// After a region has entered staging mode with a new region rule (partition
/// After a region has entered staging mode with a new partition expression
/// expression) and a separate process (for example, `remap_manifests`) has
/// generated the new file assignments for the staging region, this request
/// applies that generated manifest to the region.
///
/// In practice, this means:
/// - The `partition_expr` identifies the staging region rule that the manifest
/// - The `partition_expr` identifies the staging partition expression that the manifest
/// was generated for.
/// - `central_region_id` specifies which region holds the staging blob storage
/// where the manifest was written during the `remap_manifests` operation.