feat: check open region requirements (#8194)

* feat: check open region capabilities

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: allow file region migration tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: refine open region requirements

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: use fs scheme constant

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: cover open region requirement predicate

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: check file engine open requirements

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2026-06-02 14:43:23 +08:00
committed by GitHub
parent c9f6f67ae9
commit 935ef9a361
34 changed files with 456 additions and 39 deletions

View File

@@ -524,6 +524,7 @@ impl ScanbenchCommand {
options: HashMap::default(),
skip_wal_replay: !self.enable_wal,
checkpoint: None,
requirements: Default::default(),
};
engine

View File

@@ -18,7 +18,7 @@ use std::time::Duration;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use store_api::region_engine::SyncRegionFromRequest;
use store_api::region_request::RegionFlushReason;
use store_api::region_request::{RegionFlushReason, RegionRequirements};
use store_api::storage::{FileRefsManifest, GcReport, RegionId, RegionNumber};
use strum::Display;
use table::metadata::TableId;
@@ -179,12 +179,24 @@ impl Display for OpenRegion {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"OpenRegion(region_ident={}, region_storage_path={})",
self.region_ident, self.region_storage_path
"OpenRegion(region_ident={}, region_storage_path={}, reason={:?})",
self.region_ident, self.region_storage_path, self.reason
)
}
}
/// The reason why an open region instruction is triggered.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum OpenRegionReason {
/// Open triggered before region migration.
RegionMigration,
/// Open triggered by region failover.
RegionFailover,
/// Open triggered when adding a follower region.
#[cfg(feature = "enterprise")]
RegionFollower,
}
#[serde_with::serde_as]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct OpenRegion {
@@ -196,6 +208,10 @@ pub struct OpenRegion {
pub region_wal_options: HashMap<RegionNumber, String>,
#[serde(default)]
pub skip_wal_replay: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub reason: Option<OpenRegionReason>,
#[serde(default)]
pub requirements: RegionRequirements,
}
impl OpenRegion {
@@ -205,6 +221,8 @@ impl OpenRegion {
region_options: HashMap<String, String>,
region_wal_options: HashMap<RegionNumber, String>,
skip_wal_replay: bool,
reason: Option<OpenRegionReason>,
requirements: RegionRequirements,
) -> Self {
Self {
region_ident,
@@ -212,6 +230,8 @@ impl OpenRegion {
region_options,
region_wal_options,
skip_wal_replay,
reason,
requirements,
}
}
}
@@ -1126,11 +1146,13 @@ mod tests {
HashMap::new(),
HashMap::new(),
false,
None,
RegionRequirements::empty(),
)]);
let serialized = serde_json::to_string(&open_region).unwrap();
assert_eq!(
r#"{"OpenRegions":[{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}]}"#,
r#"{"OpenRegions":[{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false,"requirements":{"object_storage":false}}]}"#,
serialized
);
@@ -1213,6 +1235,8 @@ mod tests {
HashMap::new(),
HashMap::new(),
false,
None,
RegionRequirements::empty(),
)]);
assert_eq!(open_region_instruction, open_region);
@@ -1368,10 +1392,41 @@ mod tests {
region_options,
region_wal_options: HashMap::new(),
skip_wal_replay: false,
reason: None,
requirements: RegionRequirements::empty(),
};
assert_eq!(expected, deserialized);
}
#[test]
fn test_serialize_open_region_with_reason_and_requirements() {
let open_region = OpenRegion::new(
RegionIdent {
datanode_id: 2,
table_id: 1024,
region_number: 1,
engine: "mito2".to_string(),
},
"test/foo",
HashMap::new(),
HashMap::new(),
false,
Some(OpenRegionReason::RegionMigration),
RegionRequirements::object_storage(),
);
let serialized = serde_json::to_string(&open_region).unwrap();
assert!(serialized.contains(r#""reason":"RegionMigration""#));
assert!(serialized.contains(r#""object_storage":true"#));
let deserialized: OpenRegion = serde_json::from_str(&serialized).unwrap();
assert_eq!(Some(OpenRegionReason::RegionMigration), deserialized.reason);
assert_eq!(
RegionRequirements::object_storage(),
deserialized.requirements
);
}
#[test]
fn test_flush_regions_creation() {
let region_id = RegionId::new(1024, 1);

View File

@@ -313,7 +313,7 @@ mod tests {
use mito2::test_util::{CreateRequestBuilder, TestEnv};
use store_api::path_utils::table_dir;
use store_api::region_engine::RegionRole;
use store_api::region_request::{RegionCloseRequest, RegionRequest};
use store_api::region_request::{RegionCloseRequest, RegionRequest, RegionRequirements};
use store_api::storage::RegionId;
use tokio::sync::mpsc::{self, Receiver};
@@ -442,6 +442,8 @@ mod tests {
HashMap::new(),
HashMap::new(),
false,
None,
RegionRequirements::empty(),
)])
}

View File

@@ -14,6 +14,7 @@
use common_meta::instruction::{InstructionReply, OpenRegion, SimpleReply};
use common_meta::wal_provider::prepare_wal_options;
use common_telemetry::info;
use store_api::path_utils::table_dir;
use store_api::region_request::{PathType, RegionOpenRequest};
use store_api::storage::RegionId;
@@ -41,8 +42,13 @@ impl InstructionHandler for OpenRegionsHandler {
mut region_options,
region_wal_options,
skip_wal_replay,
reason,
requirements,
} = open_region;
let region_id = RegionId::new(region_ident.table_id, region_ident.region_number);
info!(
"Received open region instruction, region_id: {region_id}, reason: {reason:?}"
);
prepare_wal_options(&mut region_options, region_id, &region_wal_options);
let request = RegionOpenRequest {
engine: region_ident.engine,
@@ -51,6 +57,7 @@ impl InstructionHandler for OpenRegionsHandler {
options: region_options,
skip_wal_replay,
checkpoint: None,
requirements,
};
(region_id, request)
})
@@ -85,7 +92,7 @@ mod tests {
use mito2::engine::MITO_ENGINE_NAME;
use mito2::test_util::{CreateRequestBuilder, TestEnv};
use store_api::path_utils::table_dir;
use store_api::region_request::{RegionCloseRequest, RegionRequest};
use store_api::region_request::{RegionCloseRequest, RegionRequest, RegionRequirements};
use store_api::storage::RegionId;
use crate::heartbeat::handler::RegionHeartbeatResponseHandler;
@@ -98,17 +105,21 @@ mod tests {
) -> Instruction {
let region_idents = region_ids
.into_iter()
.map(|region_id| OpenRegion {
region_ident: RegionIdent {
datanode_id: 0,
table_id: region_id.table_id(),
region_number: region_id.region_number(),
engine: MITO_ENGINE_NAME.to_string(),
},
region_storage_path: storage_path.to_string(),
region_options: HashMap::new(),
region_wal_options: HashMap::new(),
skip_wal_replay: false,
.map(|region_id| {
OpenRegion::new(
RegionIdent {
datanode_id: 0,
table_id: region_id.table_id(),
region_number: region_id.region_number(),
engine: MITO_ENGINE_NAME.to_string(),
},
storage_path,
HashMap::new(),
HashMap::new(),
false,
None,
RegionRequirements::empty(),
)
})
.collect();

View File

@@ -2057,6 +2057,7 @@ mod tests {
options: Default::default(),
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
}),
)
.await
@@ -2235,6 +2236,7 @@ mod tests {
options: Default::default(),
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
},
),
(
@@ -2246,6 +2248,7 @@ mod tests {
options: Default::default(),
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
},
),
],
@@ -2268,6 +2271,7 @@ mod tests {
options: Default::default(),
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
},
),
(
@@ -2279,6 +2283,7 @@ mod tests {
options: Default::default(),
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
},
),
],

View File

@@ -175,6 +175,7 @@ pub async fn build_region_open_requests(
options,
skip_wal_replay: false,
checkpoint,
requirements: Default::default(),
},
));
}
@@ -193,6 +194,7 @@ pub async fn build_region_open_requests(
options,
skip_wal_replay: true,
checkpoint: None,
requirements: Default::default(),
},
));
}

View File

@@ -32,7 +32,7 @@ use store_api::region_engine::{
};
use store_api::region_request::{
AffectedRows, RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest,
RegionRequest,
RegionRequest, RegionRequirements,
};
use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
use tokio::sync::Mutex;
@@ -186,6 +186,24 @@ struct EngineInner {
type EngineInnerRef = Arc<EngineInner>;
fn ensure_open_requirements(
requirements: RegionRequirements,
object_store: &ObjectStore,
) -> EngineResult<()> {
if !requirements.object_storage {
return Ok(());
}
ensure!(
object_store::util::is_object_storage(object_store),
UnsupportedSnafu {
operation: "open region with object storage requirement on non-object storage"
}
);
Ok(())
}
impl EngineInner {
fn new(object_store: ObjectStore) -> Self {
Self {
@@ -289,6 +307,8 @@ impl EngineInner {
return Ok(0);
}
ensure_open_requirements(request.requirements, &self.object_store)?;
let res = FileRegion::open(region_id, request, &self.object_store).await;
let region = res.inspect_err(|err| {
error!(
@@ -356,3 +376,53 @@ impl EngineInner {
self.regions.read().unwrap().contains_key(&region_id)
}
}
#[cfg(test)]
mod tests {
use object_store::services::{Fs, S3};
use super::*;
use crate::error::Error;
fn build_fs_object_store() -> ObjectStore {
ObjectStore::new(Fs::default().root("/tmp"))
.unwrap()
.finish()
}
fn build_s3_object_store() -> ObjectStore {
ObjectStore::new(
S3::default()
.bucket("test-bucket")
.region("us-east-1")
.disable_ec2_metadata(),
)
.unwrap()
.finish()
}
#[test]
fn test_empty_open_requirements_are_supported() {
ensure_open_requirements(RegionRequirements::empty(), &build_fs_object_store()).unwrap();
}
#[test]
fn test_object_storage_open_requirement_rejects_fs_object_store() {
let err = ensure_open_requirements(
RegionRequirements::object_storage(),
&build_fs_object_store(),
)
.unwrap_err();
assert!(matches!(err, Error::Unsupported { .. }));
}
#[test]
fn test_object_storage_open_requirement_accepts_s3_object_store() {
ensure_open_requirements(
RegionRequirements::object_storage(),
&build_s3_object_store(),
)
.unwrap();
}
}

View File

@@ -181,6 +181,7 @@ mod tests {
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
};
let region = FileRegion::open(region_id, request, &object_store)
@@ -238,6 +239,7 @@ mod tests {
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
};
let err = FileRegion::open(region_id, request, &object_store)
.await

View File

@@ -18,7 +18,9 @@ use std::ops::Div;
use api::v1::meta::MailboxMessage;
use common_meta::RegionIdent;
use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
use common_meta::instruction::{
Instruction, InstructionReply, OpenRegion, OpenRegionReason, SimpleReply,
};
use common_meta::key::datanode_table::RegionInfo;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::info;
@@ -26,12 +28,13 @@ use common_telemetry::tracing_context::TracingContext;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::region_engine::RegionRole;
use store_api::region_request::RegionRequirements;
use tokio::time::Instant;
use crate::error::{self, Result};
use crate::handler::HeartbeatMailbox;
use crate::procedure::region_migration::flush_leader_region::PreFlushRegion;
use crate::procedure::region_migration::{Context, State};
use crate::procedure::region_migration::{Context, RegionMigrationTriggerReason, State};
use crate::service::mailbox::Channel;
#[derive(Debug, Serialize, Deserialize)]
@@ -67,6 +70,10 @@ impl OpenCandidateRegion {
let region_ids = ctx.persistent_ctx.region_ids.clone();
let from_peer_id = ctx.persistent_ctx.from_peer.id;
let to_peer_id = ctx.persistent_ctx.to_peer.id;
let reason = match ctx.persistent_ctx.trigger_reason {
RegionMigrationTriggerReason::Failover => OpenRegionReason::RegionFailover,
_ => OpenRegionReason::RegionMigration,
};
let datanode_table_values = ctx.get_from_peer_datanode_table_values().await?;
let mut open_regions = Vec::with_capacity(region_ids.len());
@@ -97,6 +104,8 @@ impl OpenCandidateRegion {
region_options,
region_wal_options,
true,
Some(reason),
RegionRequirements::object_storage(),
));
}
@@ -233,18 +242,20 @@ mod tests {
}
fn new_mock_open_instruction(datanode_id: DatanodeId, region_id: RegionId) -> Instruction {
Instruction::OpenRegions(vec![OpenRegion {
region_ident: RegionIdent {
Instruction::OpenRegions(vec![OpenRegion::new(
RegionIdent {
datanode_id,
table_id: region_id.table_id(),
region_number: region_id.region_number(),
engine: MITO2_ENGINE.to_string(),
},
region_storage_path: "/bar/foo/region/".to_string(),
region_options: Default::default(),
region_wal_options: Default::default(),
skip_wal_replay: true,
}])
"/bar/foo/region/",
Default::default(),
Default::default(),
true,
Some(OpenRegionReason::RegionMigration),
RegionRequirements::object_storage(),
)])
}
#[tokio::test]
@@ -263,6 +274,57 @@ mod tests {
assert!(!err.is_retryable());
}
#[tokio::test]
async fn test_build_open_region_instruction_reason() {
let state = OpenCandidateRegion;
let mut persistent_context = new_persistent_context();
let from_peer_id = persistent_context.from_peer.id;
let region_id = persistent_context.region_ids[0];
let env = TestingEnv::new();
let table_info = new_test_table_info(1024);
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(Peer::empty(from_peer_id)),
..Default::default()
}];
env.table_metadata_manager()
.create_table_metadata(
table_info,
TableRouteValue::physical(region_routes),
HashMap::default(),
)
.await
.unwrap();
let mut ctx = env
.context_factory()
.new_context(persistent_context.clone());
let instruction = state.build_open_region_instruction(&mut ctx).await.unwrap();
let open_regions = instruction.into_open_regions().unwrap();
assert_eq!(
Some(OpenRegionReason::RegionMigration),
open_regions[0].reason
);
assert_eq!(
RegionRequirements::object_storage(),
open_regions[0].requirements
);
persistent_context.trigger_reason = RegionMigrationTriggerReason::Failover;
let mut ctx = env.context_factory().new_context(persistent_context);
let instruction = state.build_open_region_instruction(&mut ctx).await.unwrap();
let open_regions = instruction.into_open_regions().unwrap();
assert_eq!(
Some(OpenRegionReason::RegionFailover),
open_regions[0].reason
);
assert_eq!(
RegionRequirements::object_storage(),
open_regions[0].requirements
);
}
#[tokio::test]
async fn test_datanode_is_unreachable() {
let state = OpenCandidateRegion;

View File

@@ -620,6 +620,7 @@ mod test {
options: physical_region_option,
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
};
engine
.handle_request(physical_region_id, RegionRequest::Open(open_request))
@@ -644,6 +645,7 @@ mod test {
options: HashMap::new(),
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
};
engine
.handle_request(
@@ -721,6 +723,7 @@ mod test {
options: physical_region_option,
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
};
// Opening an already opened region should succeed.
// Since the region is already open, no metadata recovery operations will be performed.
@@ -749,6 +752,7 @@ mod test {
options: physical_region_option,
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
};
let err = metric_engine
.handle_request(physical_region_id, RegionRequest::Open(open_request))
@@ -854,6 +858,7 @@ mod test {
options: options.clone(),
skip_wal_replay: true,
checkpoint: None,
requirements: Default::default(),
},
)
})

View File

@@ -222,6 +222,7 @@ impl MetricEngineInner {
entry_id: checkpoint.metadata_entry_id.unwrap_or_default(),
metadata_entry_id: None,
}),
requirements: request.requirements,
};
let mut data_region_options = request.options;
@@ -239,6 +240,7 @@ impl MetricEngineInner {
entry_id: checkpoint.entry_id,
metadata_entry_id: None,
}),
requirements: request.requirements,
};
(open_metadata_region_request, open_data_region_request)

View File

@@ -321,6 +321,7 @@ mod tests {
options: physical_region_option,
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
}),
)
.await

View File

@@ -144,6 +144,7 @@ impl TestEnv {
options: physical_region_option,
skip_wal_replay: true,
checkpoint: None,
requirements: Default::default(),
}),
)
.await

View File

@@ -8,6 +8,7 @@ license.workspace = true
default = []
test = ["common-test-util", "rstest", "rstest_reuse", "rskafka"]
testing = ["test"]
test-shared-fs-region-migration = []
enterprise = []
vector_index = ["dep:roaring", "index/vector_index"]

View File

@@ -277,6 +277,7 @@ async fn test_alter_region_with_format(flat_format: bool) {
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
}),
)
.await
@@ -481,6 +482,7 @@ async fn test_put_after_alter_with_format(flat_format: bool) {
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
}),
)
.await
@@ -844,6 +846,7 @@ async fn test_alter_column_fulltext_options_with_format(flat_format: bool) {
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
}),
)
.await
@@ -979,6 +982,7 @@ async fn test_alter_column_set_inverted_index_with_format(flat_format: bool) {
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
}),
)
.await
@@ -1248,6 +1252,7 @@ async fn test_alter_region_sst_format_with_flush() {
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
}),
)
.await
@@ -1366,6 +1371,7 @@ async fn test_alter_region_sst_format_without_flush() {
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
}),
)
.await
@@ -1492,6 +1498,7 @@ async fn test_alter_region_sst_format_flat_to_pk_with_flush() {
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
}),
)
.await
@@ -1610,6 +1617,7 @@ async fn test_alter_region_sst_format_flat_to_pk_without_flush() {
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
}),
)
.await
@@ -1725,6 +1733,7 @@ async fn test_alter_region_append_mode_with_flush() {
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
}),
)
.await
@@ -1843,6 +1852,7 @@ async fn test_alter_region_append_mode_without_flush() {
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
}),
)
.await

View File

@@ -348,6 +348,7 @@ async fn test_alter_append_mode_clears_merge_mode_with_format(flat_format: bool)
options,
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
}),
)
.await

View File

@@ -196,6 +196,7 @@ async fn test_region_replay_with_format(factory: Option<LogStoreFactory>, flat_f
options,
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
}),
)
.await

View File

@@ -160,6 +160,7 @@ async fn test_batch_catchup_with_format(factory: Option<LogStoreFactory>, flat_f
skip_wal_replay: true,
path_type: PathType::Bare,
checkpoint: None,
requirements: Default::default(),
},
)
})

View File

@@ -136,6 +136,7 @@ async fn test_batch_open_with_format(factory: Option<LogStoreFactory>, flat_form
skip_wal_replay: false,
path_type: PathType::Bare,
checkpoint: None,
requirements: Default::default(),
},
)
})
@@ -149,6 +150,7 @@ async fn test_batch_open_with_format(factory: Option<LogStoreFactory>, flat_form
skip_wal_replay: false,
path_type: PathType::Bare,
checkpoint: None,
requirements: Default::default(),
},
));
@@ -221,6 +223,7 @@ async fn test_batch_open_err_with_format(factory: Option<LogStoreFactory>, flat_
skip_wal_replay: false,
path_type: PathType::Bare,
checkpoint: None,
requirements: Default::default(),
},
)
})

View File

@@ -112,6 +112,7 @@ async fn test_bump_committed_sequence_with_format(flat_format: bool) {
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
}),
)
.await
@@ -151,6 +152,7 @@ async fn test_bump_committed_sequence_with_format(flat_format: bool) {
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
}),
)
.await

View File

@@ -97,6 +97,7 @@ async fn test_catchup_with_last_entry_id(factory: Option<LogStoreFactory>) {
options,
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
}),
)
.await
@@ -218,6 +219,7 @@ async fn test_catchup_with_incorrect_last_entry_id(factory: Option<LogStoreFacto
options,
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
}),
)
.await
@@ -321,6 +323,7 @@ async fn test_catchup_without_last_entry_id(factory: Option<LogStoreFactory>) {
options,
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
}),
)
.await
@@ -423,6 +426,7 @@ async fn test_catchup_with_manifest_update(factory: Option<LogStoreFactory>) {
options,
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
}),
)
.await
@@ -527,6 +531,7 @@ async fn open_region(
skip_wal_replay,
path_type: PathType::Bare,
checkpoint: None,
requirements: Default::default(),
}),
)
.await
@@ -622,6 +627,7 @@ async fn test_local_catchup(factory: Option<LogStoreFactory>) {
skip_wal_replay: true,
path_type: PathType::Bare,
checkpoint: None,
requirements: Default::default(),
}),
)
.await

View File

@@ -1023,6 +1023,7 @@ async fn test_change_region_compaction_window_with_format(flat_format: bool) {
options: Default::default(),
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
}),
)
.await
@@ -1125,6 +1126,7 @@ async fn test_open_overwrite_compaction_window_with_format(flat_format: bool) {
options,
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
}),
)
.await

View File

@@ -64,6 +64,7 @@ async fn test_engine_open_empty_with_format(flat_format: bool) {
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
}),
)
.await
@@ -110,6 +111,7 @@ async fn test_engine_open_existing_with_format(flat_format: bool) {
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
}),
)
.await
@@ -237,6 +239,7 @@ async fn test_engine_region_open_with_options_with_format(flat_format: bool) {
options: HashMap::from([("ttl".to_string(), "4d".to_string())]),
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
}),
)
.await
@@ -297,6 +300,7 @@ async fn test_engine_region_open_with_custom_store_with_format(flat_format: bool
options: HashMap::from([("storage".to_string(), "Gcs".to_string())]),
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
}),
)
.await
@@ -392,6 +396,7 @@ async fn test_open_region_skip_wal_replay_with_format(flat_format: bool) {
options: Default::default(),
skip_wal_replay: true,
checkpoint: None,
requirements: Default::default(),
}),
)
.await
@@ -431,6 +436,7 @@ async fn test_open_region_skip_wal_replay_with_format(flat_format: bool) {
options: Default::default(),
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
}),
)
.await
@@ -484,6 +490,7 @@ async fn test_open_region_wait_for_opening_region_ok_with_format(flat_format: bo
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
}),
)
.await
@@ -535,6 +542,7 @@ async fn test_open_region_wait_for_opening_region_err_with_format(flat_format: b
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
}),
)
.await
@@ -691,6 +699,7 @@ async fn test_open_backfills_partition_expr_with_fetcher() {
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
}),
)
.await
@@ -725,6 +734,7 @@ async fn test_open_backfills_partition_expr_with_fetcher() {
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
}),
)
.await
@@ -766,6 +776,7 @@ async fn test_open_keeps_none_without_fetcher() {
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
}),
)
.await

View File

@@ -52,6 +52,7 @@ async fn scan_in_parallel(
skip_wal_replay: false,
path_type: PathType::Bare,
checkpoint: None,
requirements: Default::default(),
}),
)
.await

View File

@@ -87,6 +87,7 @@ async fn test_close_region_skip_wal(insert: bool) {
options: request.options.clone(),
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
}),
)
.await
@@ -154,6 +155,7 @@ async fn test_close_follower_region_skip_wal() {
options: request.options.clone(),
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
}),
)
.await
@@ -271,6 +273,7 @@ async fn test_close_region_after_truncate_skip_wal() {
options: request.options,
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
}),
)
.await

View File

@@ -127,6 +127,7 @@ async fn test_sync_after_flush_region_with_format(flat_format: bool) {
// Ensure the region is not replayed from the WAL.
skip_wal_replay: true,
checkpoint: None,
requirements: Default::default(),
}),
)
.await
@@ -239,6 +240,7 @@ async fn test_sync_after_alter_region_with_format(flat_format: bool) {
// Ensure the region is not replayed from the WAL.
skip_wal_replay: true,
checkpoint: None,
requirements: Default::default(),
}),
)
.await

View File

@@ -323,6 +323,7 @@ async fn test_engine_truncate_reopen_with_format(flat_format: bool) {
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
}),
)
.await
@@ -447,6 +448,7 @@ async fn test_engine_truncate_during_flush_with_format(flat_format: bool) {
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
}),
)
.await

View File

@@ -916,6 +916,20 @@ pub enum Error {
source: Arc<Error>,
},
#[snafu(display(
"Region {} does not satisfy open requirement '{}': {}",
region_id,
requirement,
reason
))]
OpenRegionRequirement {
region_id: RegionId,
requirement: &'static str,
reason: &'static str,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to parse job id"))]
ParseJobId {
#[snafu(implicit)]
@@ -1376,6 +1390,7 @@ impl ErrorExt for Error {
PrimaryKeyLengthMismatch { .. } => StatusCode::InvalidArguments,
InvalidSender { .. } => StatusCode::InvalidArguments,
InvalidSchedulerState { .. } => StatusCode::InvalidArguments,
OpenRegionRequirement { .. } => StatusCode::InvalidArguments,
DeleteSsts { .. } | DeleteIndex { .. } | DeleteIndexes { .. } => {
StatusCode::StorageUnavailable
}

View File

@@ -27,8 +27,9 @@ use futures::future::BoxFuture;
use log_store::kafka::log_store::KafkaLogStore;
use log_store::noop::log_store::NoopLogStore;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use object_store::ObjectStore;
use object_store::manager::ObjectStoreManagerRef;
use object_store::util::normalize_dir;
use object_store::util::{is_object_storage, normalize_dir};
use snafu::{OptionExt, ResultExt, ensure};
use store_api::logstore::LogStore;
use store_api::logstore::provider::Provider;
@@ -36,7 +37,7 @@ use store_api::metadata::{
ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
};
use store_api::region_engine::RegionRole;
use store_api::region_request::PathType;
use store_api::region_request::{PathType, RegionRequirements};
use store_api::storage::{ColumnId, RegionId};
use tokio::sync::Semaphore;
@@ -46,8 +47,8 @@ use crate::cache::file_cache::{FileCache, FileType, IndexKey};
use crate::config::MitoConfig;
use crate::error;
use crate::error::{
EmptyRegionDirSnafu, InvalidMetadataSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu,
Result, StaleLogEntrySnafu,
EmptyRegionDirSnafu, InvalidMetadataSnafu, InvalidRegionOptionsSnafu, ObjectStoreNotFoundSnafu,
RegionCorruptedSnafu, Result, StaleLogEntrySnafu,
};
use crate::manifest::action::RegionManifest;
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
@@ -206,6 +207,29 @@ impl RegionOpener {
Ok(self)
}
/// Ensures the current region open request satisfies its requirements.
pub(crate) fn ensure_open_requirements(&self, requirements: RegionRequirements) -> Result<()> {
if !requirements.object_storage {
return Ok(());
}
let options = self.options.as_ref().context(InvalidRegionOptionsSnafu {
reason: "missing region options before requirement check".to_string(),
})?;
let object_store = get_object_store(&options.storage, &self.object_store_manager)?;
ensure!(
supports_open_region_object_storage_requirement(&object_store),
error::OpenRegionRequirementSnafu {
region_id: self.region_id,
requirement: "object storage",
reason: "region data must be accessible from another datanode",
}
);
Ok(())
}
/// Sets the cache manager for the region.
pub(crate) fn cache(mut self, cache_manager: Option<CacheManagerRef>) -> Self {
self.cache_manager = cache_manager;
@@ -597,6 +621,21 @@ impl RegionOpener {
}
}
#[cfg(not(feature = "test-shared-fs-region-migration"))]
fn supports_open_region_object_storage_requirement(object_store: &ObjectStore) -> bool {
is_object_storage(object_store)
}
#[cfg(feature = "test-shared-fs-region-migration")]
fn supports_open_region_object_storage_requirement(object_store: &ObjectStore) -> bool {
// Integration tests can configure multiple datanodes to share the same
// temporary home dir. That makes file storage accessible to all test
// datanodes, but production file storage still does not satisfy this
// requirement.
is_object_storage(object_store)
|| object_store.info().scheme() == object_store::services::FS_SCHEME
}
/// Creates a version builder from a region manifest.
pub(crate) fn version_builder_from_manifest(
manifest: &RegionManifest,
@@ -1172,14 +1211,17 @@ mod tests {
use datatypes::arrow::array::{ArrayRef, BinaryArray, Int64Array};
use datatypes::arrow::record_batch::RecordBatch;
use object_store::ObjectStore;
use object_store::services::{Fs, Memory};
use object_store::services::{Fs, Memory, S3};
use parquet::arrow::ArrowWriter;
use parquet::file::metadata::KeyValue;
use parquet::file::properties::WriterProperties;
use store_api::region_request::PathType;
use store_api::storage::{FileId, RegionId};
use super::{preload_parquet_meta_cache_for_files, sanitize_region_options};
use super::{
preload_parquet_meta_cache_for_files, sanitize_region_options,
supports_open_region_object_storage_requirement,
};
use crate::cache::CacheManager;
use crate::cache::file_cache::{FileType, IndexKey};
use crate::manifest::action::{RegionManifest, RemovedFilesRecord};
@@ -1207,6 +1249,48 @@ mod tests {
}
}
fn build_fs_object_store() -> ObjectStore {
ObjectStore::new(Fs::default().root("/tmp"))
.unwrap()
.finish()
}
#[test]
#[cfg(not(feature = "test-shared-fs-region-migration"))]
fn test_open_requirement_rejects_fs_object_store() {
let object_store = build_fs_object_store();
assert!(!supports_open_region_object_storage_requirement(
&object_store
));
}
#[test]
#[cfg(feature = "test-shared-fs-region-migration")]
fn test_open_requirement_accepts_shared_fs_object_store_for_tests() {
let object_store = build_fs_object_store();
assert!(supports_open_region_object_storage_requirement(
&object_store
));
}
#[test]
fn test_open_requirement_accepts_s3_object_store() {
let object_store = ObjectStore::new(
S3::default()
.bucket("test-bucket")
.region("us-east-1")
.disable_ec2_metadata(),
)
.unwrap()
.finish();
assert!(supports_open_region_object_storage_requirement(
&object_store
));
}
#[test]
fn test_sanitize_region_options_options_format_wins() {
// Manifest persisted PrimaryKey, but the re-parsed options now request Flat

View File

@@ -1282,6 +1282,7 @@ pub async fn reopen_region(
skip_wal_replay: false,
path_type: PathType::Bare,
checkpoint: None,
requirements: Default::default(),
}),
)
.await

View File

@@ -87,14 +87,11 @@ impl<S: LogStore> RegionWorkerLoop<S> {
else {
return;
};
if let Err(err) = self.check_and_cleanup_region(region_id, &request).await {
sender.send(Err(err));
return;
}
info!("Try to open region {}, worker: {}", region_id, self.id);
sanitize_open_request_options(&mut request.options);
// Open region from specific region dir.
let requirements = request.requirements;
let opener = match RegionOpener::new(
region_id,
&request.table_dir,
@@ -112,7 +109,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
.cache(Some(self.cache_manager.clone()))
.wal_entry_reader(wal_entry_receiver.map(|receiver| Box::new(receiver) as _))
.replay_checkpoint(request.checkpoint.map(|checkpoint| checkpoint.entry_id))
.parse_options(request.options)
.parse_options(request.options.clone())
{
Ok(opener) => opener,
Err(err) => {
@@ -121,6 +118,16 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}
};
if let Err(err) = opener.ensure_open_requirements(requirements) {
sender.send(Err(err));
return;
}
if let Err(err) = self.check_and_cleanup_region(region_id, &request).await {
sender.send(Err(err));
return;
}
let now = Instant::now();
let regions = self.regions.clone();
let wal = self.wal.clone();

View File

@@ -22,11 +22,17 @@ use opendal::layers::{
LoggingInterceptor, LoggingLayer, RetryEvent, RetryInterceptor, RetryLayer, TracingLayer,
};
use opendal::raw::{AccessorInfo, HttpClient, Operation};
use opendal::services::FS_SCHEME;
use snafu::ResultExt;
use crate::config::HttpClientConfig;
use crate::{ObjectStore, error};
/// Returns true if the object store is not backed by local filesystem.
pub fn is_object_storage(object_store: &ObjectStore) -> bool {
object_store.info().scheme() != FS_SCHEME
}
/// Join two paths and normalize the output dir.
///
/// The output dir is always ends with `/`. e.g.
@@ -249,7 +255,11 @@ impl RetryInterceptor for PrintDetailedError {
#[cfg(test)]
mod tests {
use opendal::services::Fs;
use super::*;
use crate::ObjectStore;
use crate::util::is_object_storage;
#[test]
fn test_normalize_dir() {
@@ -289,4 +299,14 @@ mod tests {
assert_eq!("/abc", join_path("//", "/abc"));
assert_eq!("abc/def", join_path("abc/", "//def"));
}
#[test]
fn test_fs_is_not_object_storage() {
let object_store = ObjectStore::new(Fs::default().root("/tmp"))
.unwrap()
.finish();
assert_eq!(FS_SCHEME, object_store.info().scheme());
assert!(!is_object_storage(&object_store));
}
}

View File

@@ -315,6 +315,7 @@ fn make_region_open(open: OpenRequest) -> Result<Vec<(RegionId, RegionRequest)>>
options: open.options,
skip_wal_replay: false,
checkpoint: None,
requirements: Default::default(),
}),
)])
}
@@ -566,6 +567,28 @@ pub struct RegionDropRequest {
pub partial_drop: bool,
}
/// Requirements for a region request.
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(default)]
pub struct RegionRequirements {
/// Whether the region data must be backed by object storage.
pub object_storage: bool,
}
impl RegionRequirements {
/// Returns empty requirements.
pub fn empty() -> Self {
Self::default()
}
/// Returns requirements for object storage.
pub fn object_storage() -> Self {
Self {
object_storage: true,
}
}
}
/// Open region request.
#[derive(Debug, Clone)]
pub struct RegionOpenRequest {
@@ -581,6 +604,8 @@ pub struct RegionOpenRequest {
pub skip_wal_replay: bool,
/// Replay checkpoint.
pub checkpoint: Option<ReplayCheckpoint>,
/// Requirements for opening the region.
pub requirements: RegionRequirements,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]

View File

@@ -63,7 +63,7 @@ log-query = { workspace = true }
loki-proto.workspace = true
meta-client.workspace = true
meta-srv = { workspace = true, features = ["mock"] }
mito2.workspace = true
mito2 = { workspace = true, features = ["test-shared-fs-region-migration"] }
object-store.workspace = true
operator = { workspace = true, features = ["testing"] }
plugins.workspace = true