feat: add OpenRegionReason and capability check for open_region heartbeat

This commit is contained in:
copilot-swe-agent[bot]
2026-05-26 18:02:35 +00:00
committed by GitHub
parent f513b77ccc
commit 6f0e1bba4f
5 changed files with 115 additions and 16 deletions

View File

@@ -179,12 +179,40 @@ impl Display for OpenRegion {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"OpenRegion(region_ident={}, region_storage_path={})",
self.region_ident, self.region_storage_path
"OpenRegion(region_ident={}, region_storage_path={}, reason={:?})",
self.region_ident, self.region_storage_path, self.reason
)
}
}
/// The reason why a region is being opened.
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum OpenRegionReason {
/// Normal region open, e.g., on datanode startup for its own regions.
#[default]
Normal,
/// Region is being migrated to this datanode (manual or auto-rebalance).
RegionMigration,
/// Region is being opened as part of failover to this datanode.
RegionFailover,
}
impl OpenRegionReason {
fn is_normal(&self) -> bool {
matches!(self, Self::Normal)
}
/// Returns true if this open reason requires the datanode to be backed by object storage.
///
/// When a region is migrated or failed over to this node, the region data must reside in
/// a shared object storage that both the source and destination datanodes can access.
/// If the datanode only has local file storage, the data would not be accessible.
pub fn requires_object_storage(&self) -> bool {
matches!(self, Self::RegionMigration | Self::RegionFailover)
}
}
#[serde_with::serde_as]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct OpenRegion {
@@ -196,6 +224,9 @@ pub struct OpenRegion {
pub region_wal_options: HashMap<RegionNumber, String>,
#[serde(default)]
pub skip_wal_replay: bool,
/// The reason why this region is being opened.
#[serde(default, skip_serializing_if = "OpenRegionReason::is_normal")]
pub reason: OpenRegionReason,
}
impl OpenRegion {
@@ -212,8 +243,15 @@ impl OpenRegion {
region_options,
region_wal_options,
skip_wal_replay,
reason: OpenRegionReason::Normal,
}
}
/// Sets the reason for opening this region.
pub fn with_reason(mut self, reason: OpenRegionReason) -> Self {
self.reason = reason;
self
}
}
/// The instruction of downgrading leader region.
@@ -1368,6 +1406,7 @@ mod tests {
region_options,
region_wal_options: HashMap::new(),
skip_wal_replay: false,
reason: OpenRegionReason::Normal,
};
assert_eq!(expected, deserialized);
}

View File

@@ -98,7 +98,8 @@ impl HeartbeatTask {
Arc::new(SuspendHandler::new(region_server.suspend_state())),
Arc::new(
RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend)
.with_open_region_parallelism(opts.init_regions_parallelism),
.with_open_region_parallelism(opts.init_regions_parallelism)
.with_is_object_storage(opts.storage.is_object_storage()),
),
Arc::new(InvalidateCacheHandler::new(cache_invalidator)),
]));

View File

@@ -60,6 +60,8 @@ pub struct RegionHeartbeatResponseHandler {
open_region_parallelism: usize,
gc_tasks: TaskTracker<GcReport>,
kv_backend: KvBackendRef,
/// Whether the datanode's default storage is backed by object storage.
is_object_storage: bool,
}
#[async_trait::async_trait]
@@ -105,6 +107,7 @@ impl RegionHeartbeatResponseHandler {
open_region_parallelism: (num_cpus::get() / 2).max(1),
gc_tasks: TaskTracker::new(),
kv_backend,
is_object_storage: false,
}
}
@@ -114,6 +117,12 @@ impl RegionHeartbeatResponseHandler {
self
}
/// Sets whether the datanode is backed by object storage.
pub fn with_is_object_storage(mut self, is_object_storage: bool) -> Self {
self.is_object_storage = is_object_storage;
self
}
fn build_handler(
&self,
instruction: &Instruction,
@@ -123,6 +132,7 @@ impl RegionHeartbeatResponseHandler {
Instruction::OpenRegions(_) => Ok(Some(Box::new(
OpenRegionsHandler {
open_region_parallelism: self.open_region_parallelism,
is_object_storage: self.is_object_storage,
}
.into(),
))),

View File

@@ -14,6 +14,7 @@
use common_meta::instruction::{InstructionReply, OpenRegion, SimpleReply};
use common_meta::wal_provider::prepare_wal_options;
use common_telemetry::warn;
use store_api::path_utils::table_dir;
use store_api::region_request::{PathType, RegionOpenRequest};
use store_api::storage::RegionId;
@@ -22,6 +23,12 @@ use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
pub struct OpenRegionsHandler {
pub open_region_parallelism: usize,
/// Whether the datanode is backed by object storage (e.g., S3, OSS, etc.).
///
/// When `false`, region migration and failover open requests are rejected
/// because the region data only exists on the source node's local storage
/// and cannot be accessed by this node.
pub is_object_storage: bool,
}
#[async_trait::async_trait]
@@ -32,6 +39,35 @@ impl InstructionHandler for OpenRegionsHandler {
ctx: &HandlerContext,
open_regions: Self::Instruction,
) -> Option<InstructionReply> {
// Check capability: if this datanode is not backed by object storage, reject
// open requests that require shared object storage (migration/failover).
if !self.is_object_storage {
let rejected: Vec<_> = open_regions
.iter()
.filter(|r| r.reason.requires_object_storage())
.collect();
if let Some(first) = rejected.first() {
let reason = &first.reason;
let regions: Vec<_> = rejected
.iter()
.map(|r| r.region_ident.get_region_id())
.collect();
warn!(
"Rejecting open_region instruction for regions {:?}: reason={:?} requires \
object storage, but this datanode uses local file storage.",
regions, reason,
);
return Some(InstructionReply::OpenRegions(SimpleReply {
result: false,
error: Some(format!(
"Cannot open regions {regions:?} on a datanode without object storage: \
open reason {:?} requires shared object storage.",
reason,
)),
}));
}
}
let requests = open_regions
.into_iter()
.map(|open_region| {
@@ -41,6 +77,7 @@ impl InstructionHandler for OpenRegionsHandler {
mut region_options,
region_wal_options,
skip_wal_replay,
..
} = open_region;
let region_id = RegionId::new(region_ident.table_id, region_ident.region_number);
prepare_wal_options(&mut region_options, region_id, &region_wal_options);
@@ -109,6 +146,7 @@ mod tests {
region_options: HashMap::new(),
region_wal_options: HashMap::new(),
skip_wal_replay: false,
reason: Default::default(),
})
.collect();

View File

@@ -18,7 +18,9 @@ use std::ops::Div;
use api::v1::meta::MailboxMessage;
use common_meta::RegionIdent;
use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
use common_meta::instruction::{
Instruction, InstructionReply, OpenRegion, OpenRegionReason, SimpleReply,
};
use common_meta::key::datanode_table::RegionInfo;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::info;
@@ -31,6 +33,7 @@ use tokio::time::Instant;
use crate::error::{self, Result};
use crate::handler::HeartbeatMailbox;
use crate::procedure::region_migration::flush_leader_region::PreFlushRegion;
use crate::procedure::region_migration::manager::RegionMigrationTriggerReason;
use crate::procedure::region_migration::{Context, State};
use crate::service::mailbox::Channel;
@@ -67,6 +70,10 @@ impl OpenCandidateRegion {
let region_ids = ctx.persistent_ctx.region_ids.clone();
let from_peer_id = ctx.persistent_ctx.from_peer.id;
let to_peer_id = ctx.persistent_ctx.to_peer.id;
let open_reason = match ctx.persistent_ctx.trigger_reason {
RegionMigrationTriggerReason::Failover => OpenRegionReason::RegionFailover,
_ => OpenRegionReason::RegionMigration,
};
let datanode_table_values = ctx.get_from_peer_datanode_table_values().await?;
let mut open_regions = Vec::with_capacity(region_ids.len());
@@ -86,18 +93,21 @@ impl OpenCandidateRegion {
engine,
} = datanode_table_value.region_info.clone();
open_regions.push(OpenRegion::new(
RegionIdent {
datanode_id: to_peer_id,
table_id,
region_number,
engine,
},
&region_storage_path,
region_options,
region_wal_options,
true,
));
open_regions.push(
OpenRegion::new(
RegionIdent {
datanode_id: to_peer_id,
table_id,
region_number,
engine,
},
&region_storage_path,
region_options,
region_wal_options,
true,
)
.with_reason(open_reason.clone()),
);
}
Ok(Instruction::OpenRegions(open_regions))
@@ -244,6 +254,7 @@ mod tests {
region_options: Default::default(),
region_wal_options: Default::default(),
skip_wal_replay: true,
reason: OpenRegionReason::RegionMigration,
}])
}