From 935ef9a3613ada227e39b5ebe32d67cadf36cd17 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Tue, 2 Jun 2026 14:43:23 +0800 Subject: [PATCH] feat: check open region requirements (#8194) * feat: check open region capabilities Signed-off-by: WenyXu * test: allow file region migration tests Signed-off-by: WenyXu * refactor: refine open region requirements Signed-off-by: WenyXu * refactor: use fs scheme constant Signed-off-by: WenyXu * test: cover open region requirement predicate Signed-off-by: WenyXu * fix: check file engine open requirements Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- src/cmd/src/datanode/scanbench.rs | 1 + src/common/meta/src/instruction.rs | 63 +++++++++++- src/datanode/src/heartbeat/handler.rs | 4 +- .../src/heartbeat/handler/open_region.rs | 35 ++++--- src/datanode/src/region_server.rs | 5 + src/datanode/src/utils.rs | 2 + src/file-engine/src/engine.rs | 72 +++++++++++++- src/file-engine/src/region.rs | 2 + .../region_migration/open_candidate_region.rs | 80 ++++++++++++++-- src/metric-engine/src/engine.rs | 5 + src/metric-engine/src/engine/open.rs | 2 + src/metric-engine/src/engine/sync/region.rs | 1 + src/metric-engine/src/test_util.rs | 1 + src/mito2/Cargo.toml | 1 + src/mito2/src/engine/alter_test.rs | 10 ++ src/mito2/src/engine/append_mode_test.rs | 1 + src/mito2/src/engine/basic_test.rs | 1 + src/mito2/src/engine/batch_catchup_test.rs | 1 + src/mito2/src/engine/batch_open_test.rs | 3 + .../engine/bump_committed_sequence_test.rs | 2 + src/mito2/src/engine/catchup_test.rs | 6 ++ src/mito2/src/engine/compaction_test.rs | 2 + src/mito2/src/engine/open_test.rs | 11 +++ src/mito2/src/engine/parallel_test.rs | 1 + src/mito2/src/engine/skip_wal_test.rs | 3 + src/mito2/src/engine/sync_test.rs | 2 + src/mito2/src/engine/truncate_test.rs | 2 + src/mito2/src/error.rs | 15 +++ src/mito2/src/region/opener.rs | 96 +++++++++++++++++-- src/mito2/src/test_util.rs | 1 + src/mito2/src/worker/handle_open.rs | 17 +++- src/object-store/src/util.rs | 20 ++++ src/store-api/src/region_request.rs | 25 +++++ tests-integration/Cargo.toml | 2 +- 34 files changed, 456 insertions(+), 39 deletions(-) diff --git a/src/cmd/src/datanode/scanbench.rs b/src/cmd/src/datanode/scanbench.rs index b26705991c..b2a715ad31 100644 --- a/src/cmd/src/datanode/scanbench.rs +++ b/src/cmd/src/datanode/scanbench.rs @@ -524,6 +524,7 @@ impl ScanbenchCommand { options: HashMap::default(), skip_wal_replay: !self.enable_wal, checkpoint: None, + requirements: Default::default(), }; engine diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index 3fa6b1bad0..6872b9ad55 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -18,7 +18,7 @@ use std::time::Duration; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use store_api::region_engine::SyncRegionFromRequest; -use store_api::region_request::RegionFlushReason; +use store_api::region_request::{RegionFlushReason, RegionRequirements}; use store_api::storage::{FileRefsManifest, GcReport, RegionId, RegionNumber}; use strum::Display; use table::metadata::TableId; @@ -179,12 +179,24 @@ impl Display for OpenRegion { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "OpenRegion(region_ident={}, region_storage_path={})", - self.region_ident, self.region_storage_path + "OpenRegion(region_ident={}, region_storage_path={}, reason={:?})", + self.region_ident, self.region_storage_path, self.reason ) } } +/// The reason why an open region instruction is triggered. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +pub enum OpenRegionReason { + /// Open triggered before region migration. + RegionMigration, + /// Open triggered by region failover. + RegionFailover, + /// Open triggered when adding a follower region. + #[cfg(feature = "enterprise")] + RegionFollower, +} + #[serde_with::serde_as] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct OpenRegion { @@ -196,6 +208,10 @@ pub struct OpenRegion { pub region_wal_options: HashMap, #[serde(default)] pub skip_wal_replay: bool, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub reason: Option, + #[serde(default)] + pub requirements: RegionRequirements, } impl OpenRegion { @@ -205,6 +221,8 @@ impl OpenRegion { region_options: HashMap, region_wal_options: HashMap, skip_wal_replay: bool, + reason: Option, + requirements: RegionRequirements, ) -> Self { Self { region_ident, @@ -212,6 +230,8 @@ impl OpenRegion { region_options, region_wal_options, skip_wal_replay, + reason, + requirements, } } } @@ -1126,11 +1146,13 @@ mod tests { HashMap::new(), HashMap::new(), false, + None, + RegionRequirements::empty(), )]); let serialized = serde_json::to_string(&open_region).unwrap(); assert_eq!( - r#"{"OpenRegions":[{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}]}"#, + r#"{"OpenRegions":[{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false,"requirements":{"object_storage":false}}]}"#, serialized ); @@ -1213,6 +1235,8 @@ mod tests { HashMap::new(), HashMap::new(), false, + None, + RegionRequirements::empty(), )]); assert_eq!(open_region_instruction, open_region); @@ -1368,10 +1392,41 @@ mod tests { region_options, region_wal_options: HashMap::new(), skip_wal_replay: false, + reason: None, + requirements: RegionRequirements::empty(), }; assert_eq!(expected, deserialized); } + #[test] + fn test_serialize_open_region_with_reason_and_requirements() { + let open_region = OpenRegion::new( + RegionIdent { + datanode_id: 2, + table_id: 1024, + region_number: 1, + engine: "mito2".to_string(), + }, + "test/foo", + HashMap::new(), + HashMap::new(), + false, + Some(OpenRegionReason::RegionMigration), + RegionRequirements::object_storage(), + ); + + let serialized = serde_json::to_string(&open_region).unwrap(); + assert!(serialized.contains(r#""reason":"RegionMigration""#)); + assert!(serialized.contains(r#""object_storage":true"#)); + + let deserialized: OpenRegion = serde_json::from_str(&serialized).unwrap(); + assert_eq!(Some(OpenRegionReason::RegionMigration), deserialized.reason); + assert_eq!( + RegionRequirements::object_storage(), + deserialized.requirements + ); + } + #[test] fn test_flush_regions_creation() { let region_id = RegionId::new(1024, 1); diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index 10948a3e7c..79e0baaef3 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -313,7 +313,7 @@ mod tests { use mito2::test_util::{CreateRequestBuilder, TestEnv}; use store_api::path_utils::table_dir; use store_api::region_engine::RegionRole; - use store_api::region_request::{RegionCloseRequest, RegionRequest}; + use store_api::region_request::{RegionCloseRequest, RegionRequest, RegionRequirements}; use store_api::storage::RegionId; use tokio::sync::mpsc::{self, Receiver}; @@ -442,6 +442,8 @@ mod tests { HashMap::new(), HashMap::new(), false, + None, + RegionRequirements::empty(), )]) } diff --git a/src/datanode/src/heartbeat/handler/open_region.rs b/src/datanode/src/heartbeat/handler/open_region.rs index 56c07a3efe..9c483e588d 100644 --- a/src/datanode/src/heartbeat/handler/open_region.rs +++ b/src/datanode/src/heartbeat/handler/open_region.rs @@ -14,6 +14,7 @@ use common_meta::instruction::{InstructionReply, OpenRegion, SimpleReply}; use common_meta::wal_provider::prepare_wal_options; +use common_telemetry::info; use store_api::path_utils::table_dir; use store_api::region_request::{PathType, RegionOpenRequest}; use store_api::storage::RegionId; @@ -41,8 +42,13 @@ impl InstructionHandler for OpenRegionsHandler { mut region_options, region_wal_options, skip_wal_replay, + reason, + requirements, } = open_region; let region_id = RegionId::new(region_ident.table_id, region_ident.region_number); + info!( + "Received open region instruction, region_id: {region_id}, reason: {reason:?}" + ); prepare_wal_options(&mut region_options, region_id, ®ion_wal_options); let request = RegionOpenRequest { engine: region_ident.engine, @@ -51,6 +57,7 @@ impl InstructionHandler for OpenRegionsHandler { options: region_options, skip_wal_replay, checkpoint: None, + requirements, }; (region_id, request) }) @@ -85,7 +92,7 @@ mod tests { use mito2::engine::MITO_ENGINE_NAME; use mito2::test_util::{CreateRequestBuilder, TestEnv}; use store_api::path_utils::table_dir; - use store_api::region_request::{RegionCloseRequest, RegionRequest}; + use store_api::region_request::{RegionCloseRequest, RegionRequest, RegionRequirements}; use store_api::storage::RegionId; use crate::heartbeat::handler::RegionHeartbeatResponseHandler; @@ -98,17 +105,21 @@ mod tests { ) -> Instruction { let region_idents = region_ids .into_iter() - .map(|region_id| OpenRegion { - region_ident: RegionIdent { - datanode_id: 0, - table_id: region_id.table_id(), - region_number: region_id.region_number(), - engine: MITO_ENGINE_NAME.to_string(), - }, - region_storage_path: storage_path.to_string(), - region_options: HashMap::new(), - region_wal_options: HashMap::new(), - skip_wal_replay: false, + .map(|region_id| { + OpenRegion::new( + RegionIdent { + datanode_id: 0, + table_id: region_id.table_id(), + region_number: region_id.region_number(), + engine: MITO_ENGINE_NAME.to_string(), + }, + storage_path, + HashMap::new(), + HashMap::new(), + false, + None, + RegionRequirements::empty(), + ) }) .collect(); diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index d5711e1761..e2c19a318e 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -2057,6 +2057,7 @@ mod tests { options: Default::default(), skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }), ) .await @@ -2235,6 +2236,7 @@ mod tests { options: Default::default(), skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }, ), ( @@ -2246,6 +2248,7 @@ mod tests { options: Default::default(), skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }, ), ], @@ -2268,6 +2271,7 @@ mod tests { options: Default::default(), skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }, ), ( @@ -2279,6 +2283,7 @@ mod tests { options: Default::default(), skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }, ), ], diff --git a/src/datanode/src/utils.rs b/src/datanode/src/utils.rs index c5cd008c28..816ae021ba 100644 --- a/src/datanode/src/utils.rs +++ b/src/datanode/src/utils.rs @@ -175,6 +175,7 @@ pub async fn build_region_open_requests( options, skip_wal_replay: false, checkpoint, + requirements: Default::default(), }, )); } @@ -193,6 +194,7 @@ pub async fn build_region_open_requests( options, skip_wal_replay: true, checkpoint: None, + requirements: Default::default(), }, )); } diff --git a/src/file-engine/src/engine.rs b/src/file-engine/src/engine.rs index 175ebef237..2ddbb6c414 100644 --- a/src/file-engine/src/engine.rs +++ b/src/file-engine/src/engine.rs @@ -32,7 +32,7 @@ use store_api::region_engine::{ }; use store_api::region_request::{ AffectedRows, RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest, - RegionRequest, + RegionRequest, RegionRequirements, }; use store_api::storage::{RegionId, ScanRequest, SequenceNumber}; use tokio::sync::Mutex; @@ -186,6 +186,24 @@ struct EngineInner { type EngineInnerRef = Arc; +fn ensure_open_requirements( + requirements: RegionRequirements, + object_store: &ObjectStore, +) -> EngineResult<()> { + if !requirements.object_storage { + return Ok(()); + } + + ensure!( + object_store::util::is_object_storage(object_store), + UnsupportedSnafu { + operation: "open region with object storage requirement on non-object storage" + } + ); + + Ok(()) +} + impl EngineInner { fn new(object_store: ObjectStore) -> Self { Self { @@ -289,6 +307,8 @@ impl EngineInner { return Ok(0); } + ensure_open_requirements(request.requirements, &self.object_store)?; + let res = FileRegion::open(region_id, request, &self.object_store).await; let region = res.inspect_err(|err| { error!( @@ -356,3 +376,53 @@ impl EngineInner { self.regions.read().unwrap().contains_key(®ion_id) } } + +#[cfg(test)] +mod tests { + use object_store::services::{Fs, S3}; + + use super::*; + use crate::error::Error; + + fn build_fs_object_store() -> ObjectStore { + ObjectStore::new(Fs::default().root("/tmp")) + .unwrap() + .finish() + } + + fn build_s3_object_store() -> ObjectStore { + ObjectStore::new( + S3::default() + .bucket("test-bucket") + .region("us-east-1") + .disable_ec2_metadata(), + ) + .unwrap() + .finish() + } + + #[test] + fn test_empty_open_requirements_are_supported() { + ensure_open_requirements(RegionRequirements::empty(), &build_fs_object_store()).unwrap(); + } + + #[test] + fn test_object_storage_open_requirement_rejects_fs_object_store() { + let err = ensure_open_requirements( + RegionRequirements::object_storage(), + &build_fs_object_store(), + ) + .unwrap_err(); + + assert!(matches!(err, Error::Unsupported { .. })); + } + + #[test] + fn test_object_storage_open_requirement_accepts_s3_object_store() { + ensure_open_requirements( + RegionRequirements::object_storage(), + &build_s3_object_store(), + ) + .unwrap(); + } +} diff --git a/src/file-engine/src/region.rs b/src/file-engine/src/region.rs index 3808b33a67..aceec21aa5 100644 --- a/src/file-engine/src/region.rs +++ b/src/file-engine/src/region.rs @@ -181,6 +181,7 @@ mod tests { options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }; let region = FileRegion::open(region_id, request, &object_store) @@ -238,6 +239,7 @@ mod tests { options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }; let err = FileRegion::open(region_id, request, &object_store) .await diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs index 0c0e5de5d7..792c66bdc9 100644 --- a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs @@ -18,7 +18,9 @@ use std::ops::Div; use api::v1::meta::MailboxMessage; use common_meta::RegionIdent; use common_meta::distributed_time_constants::default_distributed_time_constants; -use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply}; +use common_meta::instruction::{ + Instruction, InstructionReply, OpenRegion, OpenRegionReason, SimpleReply, +}; use common_meta::key::datanode_table::RegionInfo; use common_procedure::{Context as ProcedureContext, Status}; use common_telemetry::info; @@ -26,12 +28,13 @@ use common_telemetry::tracing_context::TracingContext; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use store_api::region_engine::RegionRole; +use store_api::region_request::RegionRequirements; use tokio::time::Instant; use crate::error::{self, Result}; use crate::handler::HeartbeatMailbox; use crate::procedure::region_migration::flush_leader_region::PreFlushRegion; -use crate::procedure::region_migration::{Context, State}; +use crate::procedure::region_migration::{Context, RegionMigrationTriggerReason, State}; use crate::service::mailbox::Channel; #[derive(Debug, Serialize, Deserialize)] @@ -67,6 +70,10 @@ impl OpenCandidateRegion { let region_ids = ctx.persistent_ctx.region_ids.clone(); let from_peer_id = ctx.persistent_ctx.from_peer.id; let to_peer_id = ctx.persistent_ctx.to_peer.id; + let reason = match ctx.persistent_ctx.trigger_reason { + RegionMigrationTriggerReason::Failover => OpenRegionReason::RegionFailover, + _ => OpenRegionReason::RegionMigration, + }; let datanode_table_values = ctx.get_from_peer_datanode_table_values().await?; let mut open_regions = Vec::with_capacity(region_ids.len()); @@ -97,6 +104,8 @@ impl OpenCandidateRegion { region_options, region_wal_options, true, + Some(reason), + RegionRequirements::object_storage(), )); } @@ -233,18 +242,20 @@ mod tests { } fn new_mock_open_instruction(datanode_id: DatanodeId, region_id: RegionId) -> Instruction { - Instruction::OpenRegions(vec![OpenRegion { - region_ident: RegionIdent { + Instruction::OpenRegions(vec![OpenRegion::new( + RegionIdent { datanode_id, table_id: region_id.table_id(), region_number: region_id.region_number(), engine: MITO2_ENGINE.to_string(), }, - region_storage_path: "/bar/foo/region/".to_string(), - region_options: Default::default(), - region_wal_options: Default::default(), - skip_wal_replay: true, - }]) + "/bar/foo/region/", + Default::default(), + Default::default(), + true, + Some(OpenRegionReason::RegionMigration), + RegionRequirements::object_storage(), + )]) } #[tokio::test] @@ -263,6 +274,57 @@ mod tests { assert!(!err.is_retryable()); } + #[tokio::test] + async fn test_build_open_region_instruction_reason() { + let state = OpenCandidateRegion; + let mut persistent_context = new_persistent_context(); + let from_peer_id = persistent_context.from_peer.id; + let region_id = persistent_context.region_ids[0]; + let env = TestingEnv::new(); + + let table_info = new_test_table_info(1024); + let region_routes = vec![RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(Peer::empty(from_peer_id)), + ..Default::default() + }]; + env.table_metadata_manager() + .create_table_metadata( + table_info, + TableRouteValue::physical(region_routes), + HashMap::default(), + ) + .await + .unwrap(); + + let mut ctx = env + .context_factory() + .new_context(persistent_context.clone()); + let instruction = state.build_open_region_instruction(&mut ctx).await.unwrap(); + let open_regions = instruction.into_open_regions().unwrap(); + assert_eq!( + Some(OpenRegionReason::RegionMigration), + open_regions[0].reason + ); + assert_eq!( + RegionRequirements::object_storage(), + open_regions[0].requirements + ); + + persistent_context.trigger_reason = RegionMigrationTriggerReason::Failover; + let mut ctx = env.context_factory().new_context(persistent_context); + let instruction = state.build_open_region_instruction(&mut ctx).await.unwrap(); + let open_regions = instruction.into_open_regions().unwrap(); + assert_eq!( + Some(OpenRegionReason::RegionFailover), + open_regions[0].reason + ); + assert_eq!( + RegionRequirements::object_storage(), + open_regions[0].requirements + ); + } + #[tokio::test] async fn test_datanode_is_unreachable() { let state = OpenCandidateRegion; diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index ef4d802cfc..fa9ef804cc 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -620,6 +620,7 @@ mod test { options: physical_region_option, skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }; engine .handle_request(physical_region_id, RegionRequest::Open(open_request)) @@ -644,6 +645,7 @@ mod test { options: HashMap::new(), skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }; engine .handle_request( @@ -721,6 +723,7 @@ mod test { options: physical_region_option, skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }; // Opening an already opened region should succeed. // Since the region is already open, no metadata recovery operations will be performed. @@ -749,6 +752,7 @@ mod test { options: physical_region_option, skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }; let err = metric_engine .handle_request(physical_region_id, RegionRequest::Open(open_request)) @@ -854,6 +858,7 @@ mod test { options: options.clone(), skip_wal_replay: true, checkpoint: None, + requirements: Default::default(), }, ) }) diff --git a/src/metric-engine/src/engine/open.rs b/src/metric-engine/src/engine/open.rs index 59b1cfd928..8fcdfcd821 100644 --- a/src/metric-engine/src/engine/open.rs +++ b/src/metric-engine/src/engine/open.rs @@ -222,6 +222,7 @@ impl MetricEngineInner { entry_id: checkpoint.metadata_entry_id.unwrap_or_default(), metadata_entry_id: None, }), + requirements: request.requirements, }; let mut data_region_options = request.options; @@ -239,6 +240,7 @@ impl MetricEngineInner { entry_id: checkpoint.entry_id, metadata_entry_id: None, }), + requirements: request.requirements, }; (open_metadata_region_request, open_data_region_request) diff --git a/src/metric-engine/src/engine/sync/region.rs b/src/metric-engine/src/engine/sync/region.rs index cbe6515a19..d1f92bef64 100644 --- a/src/metric-engine/src/engine/sync/region.rs +++ b/src/metric-engine/src/engine/sync/region.rs @@ -321,6 +321,7 @@ mod tests { options: physical_region_option, skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }), ) .await diff --git a/src/metric-engine/src/test_util.rs b/src/metric-engine/src/test_util.rs index ec55a01903..8d4a822b6b 100644 --- a/src/metric-engine/src/test_util.rs +++ b/src/metric-engine/src/test_util.rs @@ -144,6 +144,7 @@ impl TestEnv { options: physical_region_option, skip_wal_replay: true, checkpoint: None, + requirements: Default::default(), }), ) .await diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 99e3439879..ea281f2c32 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -8,6 +8,7 @@ license.workspace = true default = [] test = ["common-test-util", "rstest", "rstest_reuse", "rskafka"] testing = ["test"] +test-shared-fs-region-migration = [] enterprise = [] vector_index = ["dep:roaring", "index/vector_index"] diff --git a/src/mito2/src/engine/alter_test.rs b/src/mito2/src/engine/alter_test.rs index a7798a7678..b43f057ea6 100644 --- a/src/mito2/src/engine/alter_test.rs +++ b/src/mito2/src/engine/alter_test.rs @@ -277,6 +277,7 @@ async fn test_alter_region_with_format(flat_format: bool) { options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }), ) .await @@ -481,6 +482,7 @@ async fn test_put_after_alter_with_format(flat_format: bool) { options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }), ) .await @@ -844,6 +846,7 @@ async fn test_alter_column_fulltext_options_with_format(flat_format: bool) { options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }), ) .await @@ -979,6 +982,7 @@ async fn test_alter_column_set_inverted_index_with_format(flat_format: bool) { options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }), ) .await @@ -1248,6 +1252,7 @@ async fn test_alter_region_sst_format_with_flush() { options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }), ) .await @@ -1366,6 +1371,7 @@ async fn test_alter_region_sst_format_without_flush() { options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }), ) .await @@ -1492,6 +1498,7 @@ async fn test_alter_region_sst_format_flat_to_pk_with_flush() { options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }), ) .await @@ -1610,6 +1617,7 @@ async fn test_alter_region_sst_format_flat_to_pk_without_flush() { options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }), ) .await @@ -1725,6 +1733,7 @@ async fn test_alter_region_append_mode_with_flush() { options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }), ) .await @@ -1843,6 +1852,7 @@ async fn test_alter_region_append_mode_without_flush() { options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }), ) .await diff --git a/src/mito2/src/engine/append_mode_test.rs b/src/mito2/src/engine/append_mode_test.rs index de58e04e46..188e28ccf5 100644 --- a/src/mito2/src/engine/append_mode_test.rs +++ b/src/mito2/src/engine/append_mode_test.rs @@ -348,6 +348,7 @@ async fn test_alter_append_mode_clears_merge_mode_with_format(flat_format: bool) options, skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }), ) .await diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index e1e462f692..0cc122573e 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -196,6 +196,7 @@ async fn test_region_replay_with_format(factory: Option, flat_f options, skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }), ) .await diff --git a/src/mito2/src/engine/batch_catchup_test.rs b/src/mito2/src/engine/batch_catchup_test.rs index dc0b552adc..a3808b1999 100644 --- a/src/mito2/src/engine/batch_catchup_test.rs +++ b/src/mito2/src/engine/batch_catchup_test.rs @@ -160,6 +160,7 @@ async fn test_batch_catchup_with_format(factory: Option, flat_f skip_wal_replay: true, path_type: PathType::Bare, checkpoint: None, + requirements: Default::default(), }, ) }) diff --git a/src/mito2/src/engine/batch_open_test.rs b/src/mito2/src/engine/batch_open_test.rs index 6b16b3c120..2522cf2f84 100644 --- a/src/mito2/src/engine/batch_open_test.rs +++ b/src/mito2/src/engine/batch_open_test.rs @@ -136,6 +136,7 @@ async fn test_batch_open_with_format(factory: Option, flat_form skip_wal_replay: false, path_type: PathType::Bare, checkpoint: None, + requirements: Default::default(), }, ) }) @@ -149,6 +150,7 @@ async fn test_batch_open_with_format(factory: Option, flat_form skip_wal_replay: false, path_type: PathType::Bare, checkpoint: None, + requirements: Default::default(), }, )); @@ -221,6 +223,7 @@ async fn test_batch_open_err_with_format(factory: Option, flat_ skip_wal_replay: false, path_type: PathType::Bare, checkpoint: None, + requirements: Default::default(), }, ) }) diff --git a/src/mito2/src/engine/bump_committed_sequence_test.rs b/src/mito2/src/engine/bump_committed_sequence_test.rs index 12db0044c5..23a5af8865 100644 --- a/src/mito2/src/engine/bump_committed_sequence_test.rs +++ b/src/mito2/src/engine/bump_committed_sequence_test.rs @@ -112,6 +112,7 @@ async fn test_bump_committed_sequence_with_format(flat_format: bool) { options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }), ) .await @@ -151,6 +152,7 @@ async fn test_bump_committed_sequence_with_format(flat_format: bool) { options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }), ) .await diff --git a/src/mito2/src/engine/catchup_test.rs b/src/mito2/src/engine/catchup_test.rs index e10e91b51b..b79a2b0625 100644 --- a/src/mito2/src/engine/catchup_test.rs +++ b/src/mito2/src/engine/catchup_test.rs @@ -97,6 +97,7 @@ async fn test_catchup_with_last_entry_id(factory: Option) { options, skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }), ) .await @@ -218,6 +219,7 @@ async fn test_catchup_with_incorrect_last_entry_id(factory: Option) { options, skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }), ) .await @@ -423,6 +426,7 @@ async fn test_catchup_with_manifest_update(factory: Option) { options, skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }), ) .await @@ -527,6 +531,7 @@ async fn open_region( skip_wal_replay, path_type: PathType::Bare, checkpoint: None, + requirements: Default::default(), }), ) .await @@ -622,6 +627,7 @@ async fn test_local_catchup(factory: Option) { skip_wal_replay: true, path_type: PathType::Bare, checkpoint: None, + requirements: Default::default(), }), ) .await diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs index f76e9f8bf9..fd0982b7e5 100644 --- a/src/mito2/src/engine/compaction_test.rs +++ b/src/mito2/src/engine/compaction_test.rs @@ -1023,6 +1023,7 @@ async fn test_change_region_compaction_window_with_format(flat_format: bool) { options: Default::default(), skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }), ) .await @@ -1125,6 +1126,7 @@ async fn test_open_overwrite_compaction_window_with_format(flat_format: bool) { options, skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }), ) .await diff --git a/src/mito2/src/engine/open_test.rs b/src/mito2/src/engine/open_test.rs index 28ad1de71e..11279954a9 100644 --- a/src/mito2/src/engine/open_test.rs +++ b/src/mito2/src/engine/open_test.rs @@ -64,6 +64,7 @@ async fn test_engine_open_empty_with_format(flat_format: bool) { options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }), ) .await @@ -110,6 +111,7 @@ async fn test_engine_open_existing_with_format(flat_format: bool) { options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }), ) .await @@ -237,6 +239,7 @@ async fn test_engine_region_open_with_options_with_format(flat_format: bool) { options: HashMap::from([("ttl".to_string(), "4d".to_string())]), skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }), ) .await @@ -297,6 +300,7 @@ async fn test_engine_region_open_with_custom_store_with_format(flat_format: bool options: HashMap::from([("storage".to_string(), "Gcs".to_string())]), skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }), ) .await @@ -392,6 +396,7 @@ async fn test_open_region_skip_wal_replay_with_format(flat_format: bool) { options: Default::default(), skip_wal_replay: true, checkpoint: None, + requirements: Default::default(), }), ) .await @@ -431,6 +436,7 @@ async fn test_open_region_skip_wal_replay_with_format(flat_format: bool) { options: Default::default(), skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }), ) .await @@ -484,6 +490,7 @@ async fn test_open_region_wait_for_opening_region_ok_with_format(flat_format: bo options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }), ) .await @@ -535,6 +542,7 @@ async fn test_open_region_wait_for_opening_region_err_with_format(flat_format: b options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }), ) .await @@ -691,6 +699,7 @@ async fn test_open_backfills_partition_expr_with_fetcher() { options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }), ) .await @@ -725,6 +734,7 @@ async fn test_open_backfills_partition_expr_with_fetcher() { options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }), ) .await @@ -766,6 +776,7 @@ async fn test_open_keeps_none_without_fetcher() { options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }), ) .await diff --git a/src/mito2/src/engine/parallel_test.rs b/src/mito2/src/engine/parallel_test.rs index b88a60739b..5a1354ec15 100644 --- a/src/mito2/src/engine/parallel_test.rs +++ b/src/mito2/src/engine/parallel_test.rs @@ -52,6 +52,7 @@ async fn scan_in_parallel( skip_wal_replay: false, path_type: PathType::Bare, checkpoint: None, + requirements: Default::default(), }), ) .await diff --git a/src/mito2/src/engine/skip_wal_test.rs b/src/mito2/src/engine/skip_wal_test.rs index 97f159b8ac..3b6cf89f07 100644 --- a/src/mito2/src/engine/skip_wal_test.rs +++ b/src/mito2/src/engine/skip_wal_test.rs @@ -87,6 +87,7 @@ async fn test_close_region_skip_wal(insert: bool) { options: request.options.clone(), skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }), ) .await @@ -154,6 +155,7 @@ async fn test_close_follower_region_skip_wal() { options: request.options.clone(), skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }), ) .await @@ -271,6 +273,7 @@ async fn test_close_region_after_truncate_skip_wal() { options: request.options, skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }), ) .await diff --git a/src/mito2/src/engine/sync_test.rs b/src/mito2/src/engine/sync_test.rs index 17d73b1848..657ee868ce 100644 --- a/src/mito2/src/engine/sync_test.rs +++ b/src/mito2/src/engine/sync_test.rs @@ -127,6 +127,7 @@ async fn test_sync_after_flush_region_with_format(flat_format: bool) { // Ensure the region is not replayed from the WAL. skip_wal_replay: true, checkpoint: None, + requirements: Default::default(), }), ) .await @@ -239,6 +240,7 @@ async fn test_sync_after_alter_region_with_format(flat_format: bool) { // Ensure the region is not replayed from the WAL. skip_wal_replay: true, checkpoint: None, + requirements: Default::default(), }), ) .await diff --git a/src/mito2/src/engine/truncate_test.rs b/src/mito2/src/engine/truncate_test.rs index 8c3fdad75d..8c6dd023f0 100644 --- a/src/mito2/src/engine/truncate_test.rs +++ b/src/mito2/src/engine/truncate_test.rs @@ -323,6 +323,7 @@ async fn test_engine_truncate_reopen_with_format(flat_format: bool) { options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }), ) .await @@ -447,6 +448,7 @@ async fn test_engine_truncate_during_flush_with_format(flat_format: bool) { options: HashMap::default(), skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }), ) .await diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 3571f7c0c4..2278b61669 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -916,6 +916,20 @@ pub enum Error { source: Arc, }, + #[snafu(display( + "Region {} does not satisfy open requirement '{}': {}", + region_id, + requirement, + reason + ))] + OpenRegionRequirement { + region_id: RegionId, + requirement: &'static str, + reason: &'static str, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to parse job id"))] ParseJobId { #[snafu(implicit)] @@ -1376,6 +1390,7 @@ impl ErrorExt for Error { PrimaryKeyLengthMismatch { .. } => StatusCode::InvalidArguments, InvalidSender { .. } => StatusCode::InvalidArguments, InvalidSchedulerState { .. } => StatusCode::InvalidArguments, + OpenRegionRequirement { .. } => StatusCode::InvalidArguments, DeleteSsts { .. } | DeleteIndex { .. } | DeleteIndexes { .. } => { StatusCode::StorageUnavailable } diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 3142a87c38..412172aead 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -27,8 +27,9 @@ use futures::future::BoxFuture; use log_store::kafka::log_store::KafkaLogStore; use log_store::noop::log_store::NoopLogStore; use log_store::raft_engine::log_store::RaftEngineLogStore; +use object_store::ObjectStore; use object_store::manager::ObjectStoreManagerRef; -use object_store::util::normalize_dir; +use object_store::util::{is_object_storage, normalize_dir}; use snafu::{OptionExt, ResultExt, ensure}; use store_api::logstore::LogStore; use store_api::logstore::provider::Provider; @@ -36,7 +37,7 @@ use store_api::metadata::{ ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef, }; use store_api::region_engine::RegionRole; -use store_api::region_request::PathType; +use store_api::region_request::{PathType, RegionRequirements}; use store_api::storage::{ColumnId, RegionId}; use tokio::sync::Semaphore; @@ -46,8 +47,8 @@ use crate::cache::file_cache::{FileCache, FileType, IndexKey}; use crate::config::MitoConfig; use crate::error; use crate::error::{ - EmptyRegionDirSnafu, InvalidMetadataSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu, - Result, StaleLogEntrySnafu, + EmptyRegionDirSnafu, InvalidMetadataSnafu, InvalidRegionOptionsSnafu, ObjectStoreNotFoundSnafu, + RegionCorruptedSnafu, Result, StaleLogEntrySnafu, }; use crate::manifest::action::RegionManifest; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; @@ -206,6 +207,29 @@ impl RegionOpener { Ok(self) } + /// Ensures the current region open request satisfies its requirements. + pub(crate) fn ensure_open_requirements(&self, requirements: RegionRequirements) -> Result<()> { + if !requirements.object_storage { + return Ok(()); + } + + let options = self.options.as_ref().context(InvalidRegionOptionsSnafu { + reason: "missing region options before requirement check".to_string(), + })?; + let object_store = get_object_store(&options.storage, &self.object_store_manager)?; + + ensure!( + supports_open_region_object_storage_requirement(&object_store), + error::OpenRegionRequirementSnafu { + region_id: self.region_id, + requirement: "object storage", + reason: "region data must be accessible from another datanode", + } + ); + + Ok(()) + } + /// Sets the cache manager for the region. pub(crate) fn cache(mut self, cache_manager: Option) -> Self { self.cache_manager = cache_manager; @@ -597,6 +621,21 @@ impl RegionOpener { } } +#[cfg(not(feature = "test-shared-fs-region-migration"))] +fn supports_open_region_object_storage_requirement(object_store: &ObjectStore) -> bool { + is_object_storage(object_store) +} + +#[cfg(feature = "test-shared-fs-region-migration")] +fn supports_open_region_object_storage_requirement(object_store: &ObjectStore) -> bool { + // Integration tests can configure multiple datanodes to share the same + // temporary home dir. That makes file storage accessible to all test + // datanodes, but production file storage still does not satisfy this + // requirement. + is_object_storage(object_store) + || object_store.info().scheme() == object_store::services::FS_SCHEME +} + /// Creates a version builder from a region manifest. pub(crate) fn version_builder_from_manifest( manifest: &RegionManifest, @@ -1172,14 +1211,17 @@ mod tests { use datatypes::arrow::array::{ArrayRef, BinaryArray, Int64Array}; use datatypes::arrow::record_batch::RecordBatch; use object_store::ObjectStore; - use object_store::services::{Fs, Memory}; + use object_store::services::{Fs, Memory, S3}; use parquet::arrow::ArrowWriter; use parquet::file::metadata::KeyValue; use parquet::file::properties::WriterProperties; use store_api::region_request::PathType; use store_api::storage::{FileId, RegionId}; - use super::{preload_parquet_meta_cache_for_files, sanitize_region_options}; + use super::{ + preload_parquet_meta_cache_for_files, sanitize_region_options, + supports_open_region_object_storage_requirement, + }; use crate::cache::CacheManager; use crate::cache::file_cache::{FileType, IndexKey}; use crate::manifest::action::{RegionManifest, RemovedFilesRecord}; @@ -1207,6 +1249,48 @@ mod tests { } } + fn build_fs_object_store() -> ObjectStore { + ObjectStore::new(Fs::default().root("/tmp")) + .unwrap() + .finish() + } + + #[test] + #[cfg(not(feature = "test-shared-fs-region-migration"))] + fn test_open_requirement_rejects_fs_object_store() { + let object_store = build_fs_object_store(); + + assert!(!supports_open_region_object_storage_requirement( + &object_store + )); + } + + #[test] + #[cfg(feature = "test-shared-fs-region-migration")] + fn test_open_requirement_accepts_shared_fs_object_store_for_tests() { + let object_store = build_fs_object_store(); + + assert!(supports_open_region_object_storage_requirement( + &object_store + )); + } + + #[test] + fn test_open_requirement_accepts_s3_object_store() { + let object_store = ObjectStore::new( + S3::default() + .bucket("test-bucket") + .region("us-east-1") + .disable_ec2_metadata(), + ) + .unwrap() + .finish(); + + assert!(supports_open_region_object_storage_requirement( + &object_store + )); + } + #[test] fn test_sanitize_region_options_options_format_wins() { // Manifest persisted PrimaryKey, but the re-parsed options now request Flat diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index c4ca7cc7b3..711fd0a969 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -1282,6 +1282,7 @@ pub async fn reopen_region( skip_wal_replay: false, path_type: PathType::Bare, checkpoint: None, + requirements: Default::default(), }), ) .await diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs index 73bdca775c..a154140d98 100644 --- a/src/mito2/src/worker/handle_open.rs +++ b/src/mito2/src/worker/handle_open.rs @@ -87,14 +87,11 @@ impl RegionWorkerLoop { else { return; }; - if let Err(err) = self.check_and_cleanup_region(region_id, &request).await { - sender.send(Err(err)); - return; - } info!("Try to open region {}, worker: {}", region_id, self.id); sanitize_open_request_options(&mut request.options); // Open region from specific region dir. + let requirements = request.requirements; let opener = match RegionOpener::new( region_id, &request.table_dir, @@ -112,7 +109,7 @@ impl RegionWorkerLoop { .cache(Some(self.cache_manager.clone())) .wal_entry_reader(wal_entry_receiver.map(|receiver| Box::new(receiver) as _)) .replay_checkpoint(request.checkpoint.map(|checkpoint| checkpoint.entry_id)) - .parse_options(request.options) + .parse_options(request.options.clone()) { Ok(opener) => opener, Err(err) => { @@ -121,6 +118,16 @@ impl RegionWorkerLoop { } }; + if let Err(err) = opener.ensure_open_requirements(requirements) { + sender.send(Err(err)); + return; + } + + if let Err(err) = self.check_and_cleanup_region(region_id, &request).await { + sender.send(Err(err)); + return; + } + let now = Instant::now(); let regions = self.regions.clone(); let wal = self.wal.clone(); diff --git a/src/object-store/src/util.rs b/src/object-store/src/util.rs index 849f91b729..92f0bd7299 100644 --- a/src/object-store/src/util.rs +++ b/src/object-store/src/util.rs @@ -22,11 +22,17 @@ use opendal::layers::{ LoggingInterceptor, LoggingLayer, RetryEvent, RetryInterceptor, RetryLayer, TracingLayer, }; use opendal::raw::{AccessorInfo, HttpClient, Operation}; +use opendal::services::FS_SCHEME; use snafu::ResultExt; use crate::config::HttpClientConfig; use crate::{ObjectStore, error}; +/// Returns true if the object store is not backed by local filesystem. +pub fn is_object_storage(object_store: &ObjectStore) -> bool { + object_store.info().scheme() != FS_SCHEME +} + /// Join two paths and normalize the output dir. /// /// The output dir is always ends with `/`. e.g. @@ -249,7 +255,11 @@ impl RetryInterceptor for PrintDetailedError { #[cfg(test)] mod tests { + use opendal::services::Fs; + use super::*; + use crate::ObjectStore; + use crate::util::is_object_storage; #[test] fn test_normalize_dir() { @@ -289,4 +299,14 @@ mod tests { assert_eq!("/abc", join_path("//", "/abc")); assert_eq!("abc/def", join_path("abc/", "//def")); } + + #[test] + fn test_fs_is_not_object_storage() { + let object_store = ObjectStore::new(Fs::default().root("/tmp")) + .unwrap() + .finish(); + + assert_eq!(FS_SCHEME, object_store.info().scheme()); + assert!(!is_object_storage(&object_store)); + } } diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 951abca1be..abf96736ec 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -315,6 +315,7 @@ fn make_region_open(open: OpenRequest) -> Result> options: open.options, skip_wal_replay: false, checkpoint: None, + requirements: Default::default(), }), )]) } @@ -566,6 +567,28 @@ pub struct RegionDropRequest { pub partial_drop: bool, } +/// Requirements for a region request. +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)] +#[serde(default)] +pub struct RegionRequirements { + /// Whether the region data must be backed by object storage. + pub object_storage: bool, +} + +impl RegionRequirements { + /// Returns empty requirements. + pub fn empty() -> Self { + Self::default() + } + + /// Returns requirements for object storage. + pub fn object_storage() -> Self { + Self { + object_storage: true, + } + } +} + /// Open region request. #[derive(Debug, Clone)] pub struct RegionOpenRequest { @@ -581,6 +604,8 @@ pub struct RegionOpenRequest { pub skip_wal_replay: bool, /// Replay checkpoint. pub checkpoint: Option, + /// Requirements for opening the region. + pub requirements: RegionRequirements, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 43850e4ed3..f51badc8d6 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -63,7 +63,7 @@ log-query = { workspace = true } loki-proto.workspace = true meta-client.workspace = true meta-srv = { workspace = true, features = ["mock"] } -mito2.workspace = true +mito2 = { workspace = true, features = ["test-shared-fs-region-migration"] } object-store.workspace = true operator = { workspace = true, features = ["testing"] } plugins.workspace = true