From cbc8657f77a065a91f1a7c9b245de349a530b5f5 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 26 May 2026 18:44:56 +0000 Subject: [PATCH] Add open region capability plumbing --- src/cmd/src/datanode/scanbench.rs | 1 + src/common/meta/src/instruction.rs | 149 ++++++++++++++++-- src/datanode/src/heartbeat.rs | 3 +- src/datanode/src/heartbeat/handler.rs | 10 -- .../src/heartbeat/handler/open_region.rs | 43 +---- src/datanode/src/region_server.rs | 5 + src/datanode/src/utils.rs | 1 + src/file-engine/src/region.rs | 2 + .../region_migration/open_candidate_region.rs | 73 ++++++++- src/metric-engine/src/engine.rs | 5 + src/metric-engine/src/engine/open.rs | 3 + src/metric-engine/src/engine/sync/region.rs | 1 + src/metric-engine/src/test_util.rs | 1 + src/mito2/src/engine/alter_test.rs | 10 ++ src/mito2/src/engine/append_mode_test.rs | 1 + src/mito2/src/engine/basic_test.rs | 1 + src/mito2/src/engine/batch_catchup_test.rs | 3 + src/mito2/src/engine/batch_open_test.rs | 3 + .../engine/bump_committed_sequence_test.rs | 2 + src/mito2/src/engine/catchup_test.rs | 6 + src/mito2/src/engine/compaction_test.rs | 2 + src/mito2/src/engine/open_test.rs | 11 ++ src/mito2/src/engine/parallel_test.rs | 1 + src/mito2/src/engine/skip_wal_test.rs | 3 + src/mito2/src/engine/sync_test.rs | 2 + src/mito2/src/engine/truncate_test.rs | 2 + src/mito2/src/test_util.rs | 1 + src/mito2/src/worker/handle_open.rs | 126 +++++++++++++-- src/store-api/src/region_request.rs | 27 ++++ 29 files changed, 427 insertions(+), 71 deletions(-) diff --git a/src/cmd/src/datanode/scanbench.rs b/src/cmd/src/datanode/scanbench.rs index b26705991c..e21279d307 100644 --- a/src/cmd/src/datanode/scanbench.rs +++ b/src/cmd/src/datanode/scanbench.rs @@ -524,6 +524,7 @@ impl ScanbenchCommand { options: HashMap::default(), skip_wal_replay: !self.enable_wal, checkpoint: None, + required_capabilities: Default::default(), }; engine diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index ab97823a6b..3ed8c1e952 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -179,12 +179,72 @@ impl Display for OpenRegion { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "OpenRegion(region_ident={}, region_storage_path={}, reason={:?})", - self.region_ident, self.region_storage_path, self.reason + "OpenRegion(region_ident={}, region_storage_path={}, reason={:?}, required_capabilities={:?})", + self.region_ident, + self.region_storage_path, + self.reason, + self.required_capabilities() ) } } +/// Required capabilities for opening a region. +#[derive(Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)] +#[serde(transparent)] +pub struct OpenRegionCapability(u8); + +impl OpenRegionCapability { + pub const OBJECT_STORAGE: Self = Self(1 << 0); + pub const REMOTE_WAL: Self = Self(1 << 1); + + pub const fn empty() -> Self { + Self(0) + } + + pub const fn contains(self, other: Self) -> bool { + (self.0 & other.0) == other.0 + } + + pub const fn bits(self) -> u8 { + self.0 + } + + pub const fn is_empty(&self) -> bool { + self.0 == 0 + } +} + +impl std::fmt::Debug for OpenRegionCapability { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + if self.is_empty() { + return f.write_str("None"); + } + + let mut capabilities = Vec::new(); + if self.contains(Self::OBJECT_STORAGE) { + capabilities.push("ObjectStorage"); + } + if self.contains(Self::REMOTE_WAL) { + capabilities.push("RemoteWal"); + } + f.write_str(&capabilities.join("|")) + } +} + +impl std::ops::BitOr for OpenRegionCapability { + type Output = Self; + + fn bitor(self, rhs: Self) -> Self::Output { + Self(self.0 | rhs.0) + } +} + +impl std::ops::BitOrAssign for OpenRegionCapability { + fn bitor_assign(&mut self, rhs: Self) { + self.0 |= rhs.0; + } +} + /// The reason why a region is being opened. #[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] @@ -203,13 +263,15 @@ impl OpenRegionReason { matches!(self, Self::Normal) } - /// Returns true if this open reason requires the datanode to be backed by object storage. - /// - /// When a region is migrated or failed over to this node, the region data must reside in - /// a shared object storage that both the source and destination datanodes can access. - /// If the datanode only has local file storage, the data would not be accessible. - pub fn requires_object_storage(&self) -> bool { - matches!(self, Self::RegionMigration | Self::RegionFailover) + /// Returns the default required capabilities for this open reason. + pub fn default_required_capabilities(&self) -> OpenRegionCapability { + match self { + Self::Normal => OpenRegionCapability::empty(), + Self::RegionMigration => OpenRegionCapability::OBJECT_STORAGE, + Self::RegionFailover => { + OpenRegionCapability::OBJECT_STORAGE | OpenRegionCapability::REMOTE_WAL + } + } } } @@ -227,6 +289,11 @@ pub struct OpenRegion { /// The reason why this region is being opened. #[serde(default, skip_serializing_if = "OpenRegionReason::is_normal")] pub reason: OpenRegionReason, + /// Explicit required capabilities for opening this region. + /// + /// When empty, the receiver falls back to the default capabilities mapped from [`reason`]. + #[serde(default, skip_serializing_if = "OpenRegionCapability::is_empty")] + pub required_capabilities: OpenRegionCapability, } impl OpenRegion { @@ -244,6 +311,7 @@ impl OpenRegion { region_wal_options, skip_wal_replay, reason: OpenRegionReason::Normal, + required_capabilities: OpenRegionCapability::empty(), } } @@ -252,6 +320,24 @@ impl OpenRegion { self.reason = reason; self } + + /// Sets explicit required capabilities for opening this region. + pub fn with_required_capabilities( + mut self, + required_capabilities: OpenRegionCapability, + ) -> Self { + self.required_capabilities = required_capabilities; + self + } + + /// Returns the effective required capabilities for this open request. + pub fn required_capabilities(&self) -> OpenRegionCapability { + if self.required_capabilities.is_empty() { + self.reason.default_required_capabilities() + } else { + self.required_capabilities + } + } } /// The instruction of downgrading leader region. @@ -1407,10 +1493,55 @@ mod tests { region_wal_options: HashMap::new(), skip_wal_replay: false, reason: OpenRegionReason::Normal, + required_capabilities: OpenRegionCapability::empty(), }; assert_eq!(expected, deserialized); } + #[test] + fn test_open_region_reason_default_required_capabilities() { + assert_eq!( + OpenRegionReason::Normal.default_required_capabilities(), + OpenRegionCapability::empty() + ); + assert_eq!( + OpenRegionReason::RegionMigration.default_required_capabilities(), + OpenRegionCapability::OBJECT_STORAGE + ); + assert_eq!( + OpenRegionReason::RegionFailover.default_required_capabilities(), + OpenRegionCapability::OBJECT_STORAGE | OpenRegionCapability::REMOTE_WAL + ); + } + + #[test] + fn test_open_region_required_capabilities_fallback() { + let open_region = OpenRegion::new( + RegionIdent { + datanode_id: 1, + table_id: 1024, + region_number: 1, + engine: "mito2".to_string(), + }, + "test/foo", + HashMap::new(), + HashMap::new(), + false, + ) + .with_reason(OpenRegionReason::RegionFailover); + assert_eq!( + open_region.required_capabilities(), + OpenRegionCapability::OBJECT_STORAGE | OpenRegionCapability::REMOTE_WAL + ); + + let open_region = + open_region.with_required_capabilities(OpenRegionCapability::OBJECT_STORAGE); + assert_eq!( + open_region.required_capabilities(), + OpenRegionCapability::OBJECT_STORAGE + ); + } + #[test] fn test_flush_regions_creation() { let region_id = RegionId::new(1024, 1); diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index 20ef2c1e5e..3d77251953 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -98,8 +98,7 @@ impl HeartbeatTask { Arc::new(SuspendHandler::new(region_server.suspend_state())), Arc::new( RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend) - .with_open_region_parallelism(opts.init_regions_parallelism) - .with_is_object_storage(opts.storage.is_object_storage()), + .with_open_region_parallelism(opts.init_regions_parallelism), ), Arc::new(InvalidateCacheHandler::new(cache_invalidator)), ])); diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index 3b54beef82..10948a3e7c 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -60,8 +60,6 @@ pub struct RegionHeartbeatResponseHandler { open_region_parallelism: usize, gc_tasks: TaskTracker, kv_backend: KvBackendRef, - /// Whether the datanode's default storage is backed by object storage. - is_object_storage: bool, } #[async_trait::async_trait] @@ -107,7 +105,6 @@ impl RegionHeartbeatResponseHandler { open_region_parallelism: (num_cpus::get() / 2).max(1), gc_tasks: TaskTracker::new(), kv_backend, - is_object_storage: false, } } @@ -117,12 +114,6 @@ impl RegionHeartbeatResponseHandler { self } - /// Sets whether the datanode is backed by object storage. - pub fn with_is_object_storage(mut self, is_object_storage: bool) -> Self { - self.is_object_storage = is_object_storage; - self - } - fn build_handler( &self, instruction: &Instruction, @@ -132,7 +123,6 @@ impl RegionHeartbeatResponseHandler { Instruction::OpenRegions(_) => Ok(Some(Box::new( OpenRegionsHandler { open_region_parallelism: self.open_region_parallelism, - is_object_storage: self.is_object_storage, } .into(), ))), diff --git a/src/datanode/src/heartbeat/handler/open_region.rs b/src/datanode/src/heartbeat/handler/open_region.rs index 926c28af3e..393f936f16 100644 --- a/src/datanode/src/heartbeat/handler/open_region.rs +++ b/src/datanode/src/heartbeat/handler/open_region.rs @@ -14,21 +14,14 @@ use common_meta::instruction::{InstructionReply, OpenRegion, SimpleReply}; use common_meta::wal_provider::prepare_wal_options; -use common_telemetry::warn; use store_api::path_utils::table_dir; -use store_api::region_request::{PathType, RegionOpenRequest}; +use store_api::region_request::{OpenRegionCapability, PathType, RegionOpenRequest}; use store_api::storage::RegionId; use crate::heartbeat::handler::{HandlerContext, InstructionHandler}; pub struct OpenRegionsHandler { pub open_region_parallelism: usize, - /// Whether the datanode is backed by object storage (e.g., S3, OSS, etc.). - /// - /// When `false`, region migration and failover open requests are rejected - /// because the region data only exists on the source node's local storage - /// and cannot be accessed by this node. - pub is_object_storage: bool, } #[async_trait::async_trait] @@ -39,38 +32,12 @@ impl InstructionHandler for OpenRegionsHandler { ctx: &HandlerContext, open_regions: Self::Instruction, ) -> Option { - // Check capability: if this datanode is not backed by object storage, reject - // open requests that require shared object storage (migration/failover). - if !self.is_object_storage { - let rejected: Vec<_> = open_regions - .iter() - .filter(|r| r.reason.requires_object_storage()) - .collect(); - if let Some(first) = rejected.first() { - let reason = &first.reason; - let regions: Vec<_> = rejected - .iter() - .map(|r| r.region_ident.get_region_id()) - .collect(); - warn!( - "Rejecting open_region instruction for regions {:?}: reason={:?} requires \ - object storage, but this datanode uses local file storage.", - regions, reason, - ); - return Some(InstructionReply::OpenRegions(SimpleReply { - result: false, - error: Some(format!( - "Cannot open regions {regions:?} on a datanode without object storage: \ - open reason {:?} requires shared object storage.", - reason, - )), - })); - } - } - let requests = open_regions .into_iter() .map(|open_region| { + let required_capabilities = OpenRegionCapability::from_bits_retain( + open_region.required_capabilities().bits(), + ); let OpenRegion { region_ident, region_storage_path, @@ -88,6 +55,7 @@ impl InstructionHandler for OpenRegionsHandler { options: region_options, skip_wal_replay, checkpoint: None, + required_capabilities, }; (region_id, request) }) @@ -147,6 +115,7 @@ mod tests { region_wal_options: HashMap::new(), skip_wal_replay: false, reason: Default::default(), + required_capabilities: Default::default(), }) .collect(); diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index d5711e1761..b4243eef71 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -2057,6 +2057,7 @@ mod tests { options: Default::default(), skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }), ) .await @@ -2235,6 +2236,7 @@ mod tests { options: Default::default(), skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }, ), ( @@ -2246,6 +2248,7 @@ mod tests { options: Default::default(), skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }, ), ], @@ -2268,6 +2271,7 @@ mod tests { options: Default::default(), skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }, ), ( @@ -2279,6 +2283,7 @@ mod tests { options: Default::default(), skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }, ), ], diff --git a/src/datanode/src/utils.rs b/src/datanode/src/utils.rs index 488ddacdf0..34fc7ca7b2 100644 --- a/src/datanode/src/utils.rs +++ b/src/datanode/src/utils.rs @@ -174,6 +174,7 @@ pub(crate) async fn build_region_open_requests( options, skip_wal_replay: true, checkpoint: None, + required_capabilities: Default::default(), }, )); } diff --git a/src/file-engine/src/region.rs b/src/file-engine/src/region.rs index 3808b33a67..77ba446d33 100644 --- a/src/file-engine/src/region.rs +++ b/src/file-engine/src/region.rs @@ -181,6 +181,7 @@ mod tests { options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }; let region = FileRegion::open(region_id, request, &object_store) @@ -238,6 +239,7 @@ mod tests { options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }; let err = FileRegion::open(region_id, request, &object_store) .await diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs index 36aa39d53b..fef0223309 100644 --- a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs @@ -13,21 +13,24 @@ // limitations under the License. use std::any::Any; +use std::collections::HashMap; use std::ops::Div; use api::v1::meta::MailboxMessage; use common_meta::RegionIdent; use common_meta::distributed_time_constants::default_distributed_time_constants; use common_meta::instruction::{ - Instruction, InstructionReply, OpenRegion, OpenRegionReason, SimpleReply, + Instruction, InstructionReply, OpenRegion, OpenRegionCapability, OpenRegionReason, SimpleReply, }; use common_meta::key::datanode_table::RegionInfo; use common_procedure::{Context as ProcedureContext, Status}; use common_telemetry::info; use common_telemetry::tracing_context::TracingContext; +use common_wal::options::WalOptions; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use store_api::region_engine::RegionRole; +use store_api::storage::RegionNumber; use tokio::time::Instant; use crate::error::{self, Result}; @@ -62,6 +65,27 @@ impl State for OpenCandidateRegion { } impl OpenCandidateRegion { + fn required_capabilities( + open_reason: &OpenRegionReason, + region_wal_options: &HashMap, + region_number: RegionNumber, + ) -> Result { + let mut required_capabilities = open_reason.default_required_capabilities(); + if matches!(open_reason, OpenRegionReason::RegionFailover) { + let wal_options = region_wal_options + .get(®ion_number) + .map(|wal_options| serde_json::from_str::(wal_options)) + .transpose() + .context(error::ParseWalOptionsSnafu)?; + + if !matches!(wal_options.unwrap_or_default(), WalOptions::Kafka(_)) { + required_capabilities = OpenRegionCapability::OBJECT_STORAGE; + } + } + + Ok(required_capabilities) + } + /// Builds open region instructions /// /// Abort(non-retry): @@ -92,6 +116,8 @@ impl OpenCandidateRegion { region_wal_options, engine, } = datanode_table_value.region_info.clone(); + let required_capabilities = + Self::required_capabilities(&open_reason, ®ion_wal_options, region_number)?; open_regions.push( OpenRegion::new( @@ -106,7 +132,8 @@ impl OpenCandidateRegion { region_wal_options, true, ) - .with_reason(open_reason.clone()), + .with_reason(open_reason.clone()) + .with_required_capabilities(required_capabilities), ); } @@ -228,6 +255,7 @@ mod tests { use common_meta::key::test_utils::new_test_table_info; use common_meta::peer::Peer; use common_meta::rpc::router::{Region, RegionRoute}; + use common_wal::options::{KafkaWalOptions, WalOptions}; use store_api::storage::RegionId; use super::*; @@ -255,9 +283,50 @@ mod tests { region_wal_options: Default::default(), skip_wal_replay: true, reason: OpenRegionReason::RegionMigration, + required_capabilities: OpenRegionCapability::OBJECT_STORAGE, }]) } + #[test] + fn test_required_capabilities_for_region_migration() { + let capabilities = OpenCandidateRegion::required_capabilities( + &OpenRegionReason::RegionMigration, + &HashMap::new(), + 1, + ) + .unwrap(); + assert_eq!(capabilities, OpenRegionCapability::OBJECT_STORAGE); + } + + #[test] + fn test_required_capabilities_for_failover_with_remote_wal() { + let wal_options = serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions { + topic: "test-topic".to_string(), + })) + .unwrap(); + let capabilities = OpenCandidateRegion::required_capabilities( + &OpenRegionReason::RegionFailover, + &HashMap::from([(1, wal_options)]), + 1, + ) + .unwrap(); + assert_eq!( + capabilities, + OpenRegionCapability::OBJECT_STORAGE | OpenRegionCapability::REMOTE_WAL + ); + } + + #[test] + fn test_required_capabilities_for_failover_with_local_wal() { + let capabilities = OpenCandidateRegion::required_capabilities( + &OpenRegionReason::RegionFailover, + &HashMap::new(), + 1, + ) + .unwrap(); + assert_eq!(capabilities, OpenRegionCapability::OBJECT_STORAGE); + } + #[tokio::test] async fn test_datanode_table_is_not_found_error() { let state = OpenCandidateRegion; diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index ef4d802cfc..abc50dc69f 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -620,6 +620,7 @@ mod test { options: physical_region_option, skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }; engine .handle_request(physical_region_id, RegionRequest::Open(open_request)) @@ -644,6 +645,7 @@ mod test { options: HashMap::new(), skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }; engine .handle_request( @@ -721,6 +723,7 @@ mod test { options: physical_region_option, skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }; // Opening an already opened region should succeed. // Since the region is already open, no metadata recovery operations will be performed. @@ -749,6 +752,7 @@ mod test { options: physical_region_option, skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }; let err = metric_engine .handle_request(physical_region_id, RegionRequest::Open(open_request)) @@ -854,6 +858,7 @@ mod test { options: options.clone(), skip_wal_replay: true, checkpoint: None, + required_capabilities: Default::default(), }, ) }) diff --git a/src/metric-engine/src/engine/open.rs b/src/metric-engine/src/engine/open.rs index 59b1cfd928..9dbba04265 100644 --- a/src/metric-engine/src/engine/open.rs +++ b/src/metric-engine/src/engine/open.rs @@ -211,6 +211,7 @@ impl MetricEngineInner { ) -> (RegionOpenRequest, RegionOpenRequest) { let metadata_region_options = region_options_for_metadata_region(&request.options); let checkpoint = request.checkpoint; + let required_capabilities = request.required_capabilities; let open_metadata_region_request = RegionOpenRequest { table_dir: request.table_dir.clone(), @@ -222,6 +223,7 @@ impl MetricEngineInner { entry_id: checkpoint.metadata_entry_id.unwrap_or_default(), metadata_entry_id: None, }), + required_capabilities, }; let mut data_region_options = request.options; @@ -239,6 +241,7 @@ impl MetricEngineInner { entry_id: checkpoint.entry_id, metadata_entry_id: None, }), + required_capabilities, }; (open_metadata_region_request, open_data_region_request) diff --git a/src/metric-engine/src/engine/sync/region.rs b/src/metric-engine/src/engine/sync/region.rs index cbe6515a19..d04bfb07b9 100644 --- a/src/metric-engine/src/engine/sync/region.rs +++ b/src/metric-engine/src/engine/sync/region.rs @@ -321,6 +321,7 @@ mod tests { options: physical_region_option, skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }), ) .await diff --git a/src/metric-engine/src/test_util.rs b/src/metric-engine/src/test_util.rs index ec55a01903..80ad16f7f1 100644 --- a/src/metric-engine/src/test_util.rs +++ b/src/metric-engine/src/test_util.rs @@ -144,6 +144,7 @@ impl TestEnv { options: physical_region_option, skip_wal_replay: true, checkpoint: None, + required_capabilities: Default::default(), }), ) .await diff --git a/src/mito2/src/engine/alter_test.rs b/src/mito2/src/engine/alter_test.rs index a7798a7678..5b81f353cf 100644 --- a/src/mito2/src/engine/alter_test.rs +++ b/src/mito2/src/engine/alter_test.rs @@ -277,6 +277,7 @@ async fn test_alter_region_with_format(flat_format: bool) { options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }), ) .await @@ -481,6 +482,7 @@ async fn test_put_after_alter_with_format(flat_format: bool) { options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }), ) .await @@ -844,6 +846,7 @@ async fn test_alter_column_fulltext_options_with_format(flat_format: bool) { options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }), ) .await @@ -979,6 +982,7 @@ async fn test_alter_column_set_inverted_index_with_format(flat_format: bool) { options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }), ) .await @@ -1248,6 +1252,7 @@ async fn test_alter_region_sst_format_with_flush() { options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }), ) .await @@ -1366,6 +1371,7 @@ async fn test_alter_region_sst_format_without_flush() { options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }), ) .await @@ -1492,6 +1498,7 @@ async fn test_alter_region_sst_format_flat_to_pk_with_flush() { options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }), ) .await @@ -1610,6 +1617,7 @@ async fn test_alter_region_sst_format_flat_to_pk_without_flush() { options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }), ) .await @@ -1725,6 +1733,7 @@ async fn test_alter_region_append_mode_with_flush() { options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }), ) .await @@ -1843,6 +1852,7 @@ async fn test_alter_region_append_mode_without_flush() { options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }), ) .await diff --git a/src/mito2/src/engine/append_mode_test.rs b/src/mito2/src/engine/append_mode_test.rs index de58e04e46..8215cb857b 100644 --- a/src/mito2/src/engine/append_mode_test.rs +++ b/src/mito2/src/engine/append_mode_test.rs @@ -348,6 +348,7 @@ async fn test_alter_append_mode_clears_merge_mode_with_format(flat_format: bool) options, skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }), ) .await diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index f256f88694..3d8e205670 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -196,6 +196,7 @@ async fn test_region_replay_with_format(factory: Option, flat_f options, skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }), ) .await diff --git a/src/mito2/src/engine/batch_catchup_test.rs b/src/mito2/src/engine/batch_catchup_test.rs index dc0b552adc..2492a5283d 100644 --- a/src/mito2/src/engine/batch_catchup_test.rs +++ b/src/mito2/src/engine/batch_catchup_test.rs @@ -160,6 +160,7 @@ async fn test_batch_catchup_with_format(factory: Option, flat_f skip_wal_replay: true, path_type: PathType::Bare, checkpoint: None, + required_capabilities: Default::default(), }, ) }) @@ -185,6 +186,7 @@ async fn test_batch_catchup_with_format(factory: Option, flat_f metadata_entry_id: None, location_id: None, checkpoint: None, + required_capabilities: Default::default(), }, ) }) @@ -232,6 +234,7 @@ async fn test_batch_catchup_err_with_format(factory: Option, fl metadata_entry_id: None, location_id: None, checkpoint: None, + required_capabilities: Default::default(), }, ) }) diff --git a/src/mito2/src/engine/batch_open_test.rs b/src/mito2/src/engine/batch_open_test.rs index 6b16b3c120..b3164dd204 100644 --- a/src/mito2/src/engine/batch_open_test.rs +++ b/src/mito2/src/engine/batch_open_test.rs @@ -136,6 +136,7 @@ async fn test_batch_open_with_format(factory: Option, flat_form skip_wal_replay: false, path_type: PathType::Bare, checkpoint: None, + required_capabilities: Default::default(), }, ) }) @@ -149,6 +150,7 @@ async fn test_batch_open_with_format(factory: Option, flat_form skip_wal_replay: false, path_type: PathType::Bare, checkpoint: None, + required_capabilities: Default::default(), }, )); @@ -221,6 +223,7 @@ async fn test_batch_open_err_with_format(factory: Option, flat_ skip_wal_replay: false, path_type: PathType::Bare, checkpoint: None, + required_capabilities: Default::default(), }, ) }) diff --git a/src/mito2/src/engine/bump_committed_sequence_test.rs b/src/mito2/src/engine/bump_committed_sequence_test.rs index 12db0044c5..41fc77665a 100644 --- a/src/mito2/src/engine/bump_committed_sequence_test.rs +++ b/src/mito2/src/engine/bump_committed_sequence_test.rs @@ -112,6 +112,7 @@ async fn test_bump_committed_sequence_with_format(flat_format: bool) { options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }), ) .await @@ -151,6 +152,7 @@ async fn test_bump_committed_sequence_with_format(flat_format: bool) { options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }), ) .await diff --git a/src/mito2/src/engine/catchup_test.rs b/src/mito2/src/engine/catchup_test.rs index e10e91b51b..2484b5f18f 100644 --- a/src/mito2/src/engine/catchup_test.rs +++ b/src/mito2/src/engine/catchup_test.rs @@ -97,6 +97,7 @@ async fn test_catchup_with_last_entry_id(factory: Option) { options, skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }), ) .await @@ -218,6 +219,7 @@ async fn test_catchup_with_incorrect_last_entry_id(factory: Option) { options, skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }), ) .await @@ -423,6 +426,7 @@ async fn test_catchup_with_manifest_update(factory: Option) { options, skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }), ) .await @@ -527,6 +531,7 @@ async fn open_region( skip_wal_replay, path_type: PathType::Bare, checkpoint: None, + required_capabilities: Default::default(), }), ) .await @@ -622,6 +627,7 @@ async fn test_local_catchup(factory: Option) { skip_wal_replay: true, path_type: PathType::Bare, checkpoint: None, + required_capabilities: Default::default(), }), ) .await diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs index f76e9f8bf9..3aae0739b4 100644 --- a/src/mito2/src/engine/compaction_test.rs +++ b/src/mito2/src/engine/compaction_test.rs @@ -1023,6 +1023,7 @@ async fn test_change_region_compaction_window_with_format(flat_format: bool) { options: Default::default(), skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }), ) .await @@ -1125,6 +1126,7 @@ async fn test_open_overwrite_compaction_window_with_format(flat_format: bool) { options, skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }), ) .await diff --git a/src/mito2/src/engine/open_test.rs b/src/mito2/src/engine/open_test.rs index 28ad1de71e..63769084b6 100644 --- a/src/mito2/src/engine/open_test.rs +++ b/src/mito2/src/engine/open_test.rs @@ -64,6 +64,7 @@ async fn test_engine_open_empty_with_format(flat_format: bool) { options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }), ) .await @@ -110,6 +111,7 @@ async fn test_engine_open_existing_with_format(flat_format: bool) { options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }), ) .await @@ -237,6 +239,7 @@ async fn test_engine_region_open_with_options_with_format(flat_format: bool) { options: HashMap::from([("ttl".to_string(), "4d".to_string())]), skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }), ) .await @@ -297,6 +300,7 @@ async fn test_engine_region_open_with_custom_store_with_format(flat_format: bool options: HashMap::from([("storage".to_string(), "Gcs".to_string())]), skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }), ) .await @@ -392,6 +396,7 @@ async fn test_open_region_skip_wal_replay_with_format(flat_format: bool) { options: Default::default(), skip_wal_replay: true, checkpoint: None, + required_capabilities: Default::default(), }), ) .await @@ -431,6 +436,7 @@ async fn test_open_region_skip_wal_replay_with_format(flat_format: bool) { options: Default::default(), skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }), ) .await @@ -484,6 +490,7 @@ async fn test_open_region_wait_for_opening_region_ok_with_format(flat_format: bo options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }), ) .await @@ -535,6 +542,7 @@ async fn test_open_region_wait_for_opening_region_err_with_format(flat_format: b options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }), ) .await @@ -691,6 +699,7 @@ async fn test_open_backfills_partition_expr_with_fetcher() { options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }), ) .await @@ -725,6 +734,7 @@ async fn test_open_backfills_partition_expr_with_fetcher() { options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }), ) .await @@ -766,6 +776,7 @@ async fn test_open_keeps_none_without_fetcher() { options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }), ) .await diff --git a/src/mito2/src/engine/parallel_test.rs b/src/mito2/src/engine/parallel_test.rs index b88a60739b..06247b83cc 100644 --- a/src/mito2/src/engine/parallel_test.rs +++ b/src/mito2/src/engine/parallel_test.rs @@ -52,6 +52,7 @@ async fn scan_in_parallel( skip_wal_replay: false, path_type: PathType::Bare, checkpoint: None, + required_capabilities: Default::default(), }), ) .await diff --git a/src/mito2/src/engine/skip_wal_test.rs b/src/mito2/src/engine/skip_wal_test.rs index 97f159b8ac..706c16f539 100644 --- a/src/mito2/src/engine/skip_wal_test.rs +++ b/src/mito2/src/engine/skip_wal_test.rs @@ -87,6 +87,7 @@ async fn test_close_region_skip_wal(insert: bool) { options: request.options.clone(), skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }), ) .await @@ -154,6 +155,7 @@ async fn test_close_follower_region_skip_wal() { options: request.options.clone(), skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }), ) .await @@ -271,6 +273,7 @@ async fn test_close_region_after_truncate_skip_wal() { options: request.options, skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }), ) .await diff --git a/src/mito2/src/engine/sync_test.rs b/src/mito2/src/engine/sync_test.rs index 17d73b1848..ee9667f875 100644 --- a/src/mito2/src/engine/sync_test.rs +++ b/src/mito2/src/engine/sync_test.rs @@ -127,6 +127,7 @@ async fn test_sync_after_flush_region_with_format(flat_format: bool) { // Ensure the region is not replayed from the WAL. skip_wal_replay: true, checkpoint: None, + required_capabilities: Default::default(), }), ) .await @@ -239,6 +240,7 @@ async fn test_sync_after_alter_region_with_format(flat_format: bool) { // Ensure the region is not replayed from the WAL. skip_wal_replay: true, checkpoint: None, + required_capabilities: Default::default(), }), ) .await diff --git a/src/mito2/src/engine/truncate_test.rs b/src/mito2/src/engine/truncate_test.rs index 8c3fdad75d..463b08deff 100644 --- a/src/mito2/src/engine/truncate_test.rs +++ b/src/mito2/src/engine/truncate_test.rs @@ -323,6 +323,7 @@ async fn test_engine_truncate_reopen_with_format(flat_format: bool) { options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }), ) .await @@ -447,6 +448,7 @@ async fn test_engine_truncate_during_flush_with_format(flat_format: bool) { options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + required_capabilities: Default::default(), }), ) .await diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index c4ca7cc7b3..06ad2bec8c 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -1282,6 +1282,7 @@ pub async fn reopen_region( skip_wal_replay: false, path_type: PathType::Bare, checkpoint: None, + required_capabilities: Default::default(), }), ) .await diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs index 73bdca775c..8bb8daa8ba 100644 --- a/src/mito2/src/worker/handle_open.rs +++ b/src/mito2/src/worker/handle_open.rs @@ -18,38 +18,94 @@ use std::sync::Arc; use std::time::Instant; use common_telemetry::info; +use common_wal::options::WalOptions; use object_store::util::join_path; use snafu::{OptionExt, ResultExt}; use store_api::logstore::LogStore; -use store_api::region_request::RegionOpenRequest; +use store_api::region_request::{OpenRegionCapability, RegionOpenRequest}; use store_api::storage::RegionId; use table::requests::STORAGE_KEY; use crate::error::{ - ObjectStoreNotFoundSnafu, OpenDalSnafu, OpenRegionSnafu, RegionNotFoundSnafu, Result, + InvalidRequestSnafu, ObjectStoreNotFoundSnafu, OpenDalSnafu, OpenRegionSnafu, + RegionNotFoundSnafu, Result, }; use crate::region::opener::{RegionOpener, sanitize_open_request_options}; +use crate::region::options::parse_wal_options; use crate::request::OptionOutputTx; use crate::sst::location::region_dir_from_table_dir; use crate::wal::entry_distributor::WalEntryReceiver; use crate::worker::handle_drop::remove_region_dir_once; use crate::worker::{DROPPING_MARKER_FILE, RegionWorkerLoop}; +fn has_object_storage_capability(object_store: &object_store::ObjectStore) -> bool { + object_store.info().scheme() != object_store::services::FS_SCHEME +} + +fn has_remote_wal_capability(request: &RegionOpenRequest) -> Result { + Ok(matches!( + parse_wal_options(&request.options).context(crate::error::SerdeJsonSnafu)?, + WalOptions::Kafka(_) + )) +} + impl RegionWorkerLoop { + fn object_store_from_request( + &self, + request: &RegionOpenRequest, + ) -> Result<&object_store::ObjectStore> { + if let Some(storage_name) = request.options.get(STORAGE_KEY) { + self.object_store_manager + .find(storage_name) + .with_context(|| ObjectStoreNotFoundSnafu { + object_store: storage_name.clone(), + }) + } else { + Ok(self.object_store_manager.default_object_store()) + } + } + + fn validate_open_region_capabilities( + &self, + region_id: RegionId, + request: &RegionOpenRequest, + object_store: &object_store::ObjectStore, + ) -> Result<()> { + let required_capabilities = request.required_capabilities; + if required_capabilities.contains(OpenRegionCapability::OBJECT_STORAGE) + && !has_object_storage_capability(object_store) + { + return InvalidRequestSnafu { + region_id, + reason: format!( + "opening region requires ObjectStorage capability, but storage scheme is {}", + object_store.info().scheme() + ), + } + .fail(); + } + + if required_capabilities.contains(OpenRegionCapability::REMOTE_WAL) + && !has_remote_wal_capability(request)? + { + return InvalidRequestSnafu { + region_id, + reason: "opening region requires RemoteWal capability, but wal options are not remote wal" + .to_string(), + } + .fail(); + } + + Ok(()) + } + async fn check_and_cleanup_region( &self, region_id: RegionId, request: &RegionOpenRequest, ) -> Result<()> { - let object_store = if let Some(storage_name) = request.options.get(STORAGE_KEY) { - self.object_store_manager - .find(storage_name) - .with_context(|| ObjectStoreNotFoundSnafu { - object_store: storage_name.clone(), - })? - } else { - self.object_store_manager.default_object_store() - }; + let object_store = self.object_store_from_request(request)?; + self.validate_open_region_capabilities(region_id, request, object_store)?; // Check if this region is pending drop. And clean the entire dir if so. let region_dir = region_dir_from_table_dir(&request.table_dir, region_id, request.path_type); @@ -159,3 +215,51 @@ impl RegionWorkerLoop { }); } } + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use common_wal::options::{KafkaWalOptions, WAL_OPTIONS_KEY, WalOptions}; + use object_store::ObjectStore; + use object_store::services::{Fs, Memory}; + use store_api::region_request::{OpenRegionCapability, PathType, RegionOpenRequest}; + + use super::{has_object_storage_capability, has_remote_wal_capability}; + + fn new_open_request(options: HashMap) -> RegionOpenRequest { + RegionOpenRequest { + engine: "mito2".to_string(), + table_dir: "test".to_string(), + path_type: PathType::Bare, + options, + skip_wal_replay: false, + checkpoint: None, + required_capabilities: OpenRegionCapability::empty(), + } + } + + #[test] + fn test_has_object_storage_capability() { + let dir = common_test_util::temp_dir::create_temp_dir("handle-open"); + let fs = ObjectStore::new(Fs::default().root(dir.path().to_str().unwrap())) + .unwrap() + .finish(); + assert!(!has_object_storage_capability(&fs)); + + let memory = ObjectStore::new(Memory::default()).unwrap().finish(); + assert!(has_object_storage_capability(&memory)); + } + + #[test] + fn test_has_remote_wal_capability() { + assert!(!has_remote_wal_capability(&new_open_request(HashMap::new())).unwrap()); + + let wal_options = serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions { + topic: "test-topic".to_string(), + })) + .unwrap(); + let request = new_open_request(HashMap::from([(WAL_OPTIONS_KEY.to_string(), wal_options)])); + assert!(has_remote_wal_capability(&request).unwrap()); + } +} diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 951abca1be..cb229e3926 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -315,6 +315,7 @@ fn make_region_open(open: OpenRequest) -> Result> options: open.options, skip_wal_replay: false, checkpoint: None, + required_capabilities: OpenRegionCapability::empty(), }), )]) } @@ -567,6 +568,30 @@ pub struct RegionDropRequest { } /// Open region request. +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] +pub struct OpenRegionCapability(u8); + +impl OpenRegionCapability { + pub const OBJECT_STORAGE: Self = Self(1 << 0); + pub const REMOTE_WAL: Self = Self(1 << 1); + + pub const fn empty() -> Self { + Self(0) + } + + pub const fn from_bits_retain(bits: u8) -> Self { + Self(bits) + } + + pub const fn contains(self, other: Self) -> bool { + (self.0 & other.0) == other.0 + } + + pub const fn is_empty(&self) -> bool { + self.0 == 0 + } +} + #[derive(Debug, Clone)] pub struct RegionOpenRequest { /// Region engine name @@ -581,6 +606,8 @@ pub struct RegionOpenRequest { pub skip_wal_replay: bool, /// Replay checkpoint. pub checkpoint: Option, + /// Required capabilities for opening the region. + pub required_capabilities: OpenRegionCapability, } #[derive(Debug, Clone, Copy, PartialEq, Eq)]