diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index 3fa6b1bad0..ab97823a6b 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -179,12 +179,40 @@ impl Display for OpenRegion { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "OpenRegion(region_ident={}, region_storage_path={})", - self.region_ident, self.region_storage_path + "OpenRegion(region_ident={}, region_storage_path={}, reason={:?})", + self.region_ident, self.region_storage_path, self.reason ) } } +/// The reason why a region is being opened. +#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum OpenRegionReason { + /// Normal region open, e.g., on datanode startup for its own regions. + #[default] + Normal, + /// Region is being migrated to this datanode (manual or auto-rebalance). + RegionMigration, + /// Region is being opened as part of failover to this datanode. + RegionFailover, +} + +impl OpenRegionReason { + fn is_normal(&self) -> bool { + matches!(self, Self::Normal) + } + + /// Returns true if this open reason requires the datanode to be backed by object storage. + /// + /// When a region is migrated or failed over to this node, the region data must reside in + /// a shared object storage that both the source and destination datanodes can access. + /// If the datanode only has local file storage, the data would not be accessible. + pub fn requires_object_storage(&self) -> bool { + matches!(self, Self::RegionMigration | Self::RegionFailover) + } +} + #[serde_with::serde_as] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct OpenRegion { @@ -196,6 +224,9 @@ pub struct OpenRegion { pub region_wal_options: HashMap, #[serde(default)] pub skip_wal_replay: bool, + /// The reason why this region is being opened. + #[serde(default, skip_serializing_if = "OpenRegionReason::is_normal")] + pub reason: OpenRegionReason, } impl OpenRegion { @@ -212,8 +243,15 @@ impl OpenRegion { region_options, region_wal_options, skip_wal_replay, + reason: OpenRegionReason::Normal, } } + + /// Sets the reason for opening this region. + pub fn with_reason(mut self, reason: OpenRegionReason) -> Self { + self.reason = reason; + self + } } /// The instruction of downgrading leader region. @@ -1368,6 +1406,7 @@ mod tests { region_options, region_wal_options: HashMap::new(), skip_wal_replay: false, + reason: OpenRegionReason::Normal, }; assert_eq!(expected, deserialized); } diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index 3d77251953..20ef2c1e5e 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -98,7 +98,8 @@ impl HeartbeatTask { Arc::new(SuspendHandler::new(region_server.suspend_state())), Arc::new( RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend) - .with_open_region_parallelism(opts.init_regions_parallelism), + .with_open_region_parallelism(opts.init_regions_parallelism) + .with_is_object_storage(opts.storage.is_object_storage()), ), Arc::new(InvalidateCacheHandler::new(cache_invalidator)), ])); diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index 10948a3e7c..3b54beef82 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -60,6 +60,8 @@ pub struct RegionHeartbeatResponseHandler { open_region_parallelism: usize, gc_tasks: TaskTracker, kv_backend: KvBackendRef, + /// Whether the datanode's default storage is backed by object storage. + is_object_storage: bool, } #[async_trait::async_trait] @@ -105,6 +107,7 @@ impl RegionHeartbeatResponseHandler { open_region_parallelism: (num_cpus::get() / 2).max(1), gc_tasks: TaskTracker::new(), kv_backend, + is_object_storage: false, } } @@ -114,6 +117,12 @@ impl RegionHeartbeatResponseHandler { self } + /// Sets whether the datanode is backed by object storage. + pub fn with_is_object_storage(mut self, is_object_storage: bool) -> Self { + self.is_object_storage = is_object_storage; + self + } + fn build_handler( &self, instruction: &Instruction, @@ -123,6 +132,7 @@ impl RegionHeartbeatResponseHandler { Instruction::OpenRegions(_) => Ok(Some(Box::new( OpenRegionsHandler { open_region_parallelism: self.open_region_parallelism, + is_object_storage: self.is_object_storage, } .into(), ))), diff --git a/src/datanode/src/heartbeat/handler/open_region.rs b/src/datanode/src/heartbeat/handler/open_region.rs index 56c07a3efe..926c28af3e 100644 --- a/src/datanode/src/heartbeat/handler/open_region.rs +++ b/src/datanode/src/heartbeat/handler/open_region.rs @@ -14,6 +14,7 @@ use common_meta::instruction::{InstructionReply, OpenRegion, SimpleReply}; use common_meta::wal_provider::prepare_wal_options; +use common_telemetry::warn; use store_api::path_utils::table_dir; use store_api::region_request::{PathType, RegionOpenRequest}; use store_api::storage::RegionId; @@ -22,6 +23,12 @@ use crate::heartbeat::handler::{HandlerContext, InstructionHandler}; pub struct OpenRegionsHandler { pub open_region_parallelism: usize, + /// Whether the datanode is backed by object storage (e.g., S3, OSS, etc.). + /// + /// When `false`, region migration and failover open requests are rejected + /// because the region data only exists on the source node's local storage + /// and cannot be accessed by this node. + pub is_object_storage: bool, } #[async_trait::async_trait] @@ -32,6 +39,35 @@ impl InstructionHandler for OpenRegionsHandler { ctx: &HandlerContext, open_regions: Self::Instruction, ) -> Option { + // Check capability: if this datanode is not backed by object storage, reject + // open requests that require shared object storage (migration/failover). + if !self.is_object_storage { + let rejected: Vec<_> = open_regions + .iter() + .filter(|r| r.reason.requires_object_storage()) + .collect(); + if let Some(first) = rejected.first() { + let reason = &first.reason; + let regions: Vec<_> = rejected + .iter() + .map(|r| r.region_ident.get_region_id()) + .collect(); + warn!( + "Rejecting open_region instruction for regions {:?}: reason={:?} requires \ + object storage, but this datanode uses local file storage.", + regions, reason, + ); + return Some(InstructionReply::OpenRegions(SimpleReply { + result: false, + error: Some(format!( + "Cannot open regions {regions:?} on a datanode without object storage: \ + open reason {:?} requires shared object storage.", + reason, + )), + })); + } + } + let requests = open_regions .into_iter() .map(|open_region| { @@ -41,6 +77,7 @@ impl InstructionHandler for OpenRegionsHandler { mut region_options, region_wal_options, skip_wal_replay, + .. } = open_region; let region_id = RegionId::new(region_ident.table_id, region_ident.region_number); prepare_wal_options(&mut region_options, region_id, ®ion_wal_options); @@ -109,6 +146,7 @@ mod tests { region_options: HashMap::new(), region_wal_options: HashMap::new(), skip_wal_replay: false, + reason: Default::default(), }) .collect(); diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs index 0c0e5de5d7..36aa39d53b 100644 --- a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs @@ -18,7 +18,9 @@ use std::ops::Div; use api::v1::meta::MailboxMessage; use common_meta::RegionIdent; use common_meta::distributed_time_constants::default_distributed_time_constants; -use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply}; +use common_meta::instruction::{ + Instruction, InstructionReply, OpenRegion, OpenRegionReason, SimpleReply, +}; use common_meta::key::datanode_table::RegionInfo; use common_procedure::{Context as ProcedureContext, Status}; use common_telemetry::info; @@ -31,6 +33,7 @@ use tokio::time::Instant; use crate::error::{self, Result}; use crate::handler::HeartbeatMailbox; use crate::procedure::region_migration::flush_leader_region::PreFlushRegion; +use crate::procedure::region_migration::manager::RegionMigrationTriggerReason; use crate::procedure::region_migration::{Context, State}; use crate::service::mailbox::Channel; @@ -67,6 +70,10 @@ impl OpenCandidateRegion { let region_ids = ctx.persistent_ctx.region_ids.clone(); let from_peer_id = ctx.persistent_ctx.from_peer.id; let to_peer_id = ctx.persistent_ctx.to_peer.id; + let open_reason = match ctx.persistent_ctx.trigger_reason { + RegionMigrationTriggerReason::Failover => OpenRegionReason::RegionFailover, + _ => OpenRegionReason::RegionMigration, + }; let datanode_table_values = ctx.get_from_peer_datanode_table_values().await?; let mut open_regions = Vec::with_capacity(region_ids.len()); @@ -86,18 +93,21 @@ impl OpenCandidateRegion { engine, } = datanode_table_value.region_info.clone(); - open_regions.push(OpenRegion::new( - RegionIdent { - datanode_id: to_peer_id, - table_id, - region_number, - engine, - }, - ®ion_storage_path, - region_options, - region_wal_options, - true, - )); + open_regions.push( + OpenRegion::new( + RegionIdent { + datanode_id: to_peer_id, + table_id, + region_number, + engine, + }, + ®ion_storage_path, + region_options, + region_wal_options, + true, + ) + .with_reason(open_reason.clone()), + ); } Ok(Instruction::OpenRegions(open_regions)) @@ -244,6 +254,7 @@ mod tests { region_options: Default::default(), region_wal_options: Default::default(), skip_wal_replay: true, + reason: OpenRegionReason::RegionMigration, }]) }