Add open region capability plumbing

This commit is contained in:
copilot-swe-agent[bot]
2026-05-26 18:44:56 +00:00
committed by GitHub
parent 6f0e1bba4f
commit cbc8657f77
29 changed files with 427 additions and 71 deletions

View File

@@ -524,6 +524,7 @@ impl ScanbenchCommand {
options: HashMap::default(),
skip_wal_replay: !self.enable_wal,
checkpoint: None,
required_capabilities: Default::default(),
};
engine

View File

@@ -179,12 +179,72 @@ impl Display for OpenRegion {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"OpenRegion(region_ident={}, region_storage_path={}, reason={:?})",
self.region_ident, self.region_storage_path, self.reason
"OpenRegion(region_ident={}, region_storage_path={}, reason={:?}, required_capabilities={:?})",
self.region_ident,
self.region_storage_path,
self.reason,
self.required_capabilities()
)
}
}
/// Required capabilities for opening a region.
#[derive(Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(transparent)]
pub struct OpenRegionCapability(u8);
impl OpenRegionCapability {
pub const OBJECT_STORAGE: Self = Self(1 << 0);
pub const REMOTE_WAL: Self = Self(1 << 1);
pub const fn empty() -> Self {
Self(0)
}
pub const fn contains(self, other: Self) -> bool {
(self.0 & other.0) == other.0
}
pub const fn bits(self) -> u8 {
self.0
}
pub const fn is_empty(&self) -> bool {
self.0 == 0
}
}
impl std::fmt::Debug for OpenRegionCapability {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
if self.is_empty() {
return f.write_str("None");
}
let mut capabilities = Vec::new();
if self.contains(Self::OBJECT_STORAGE) {
capabilities.push("ObjectStorage");
}
if self.contains(Self::REMOTE_WAL) {
capabilities.push("RemoteWal");
}
f.write_str(&capabilities.join("|"))
}
}
impl std::ops::BitOr for OpenRegionCapability {
type Output = Self;
fn bitor(self, rhs: Self) -> Self::Output {
Self(self.0 | rhs.0)
}
}
impl std::ops::BitOrAssign for OpenRegionCapability {
fn bitor_assign(&mut self, rhs: Self) {
self.0 |= rhs.0;
}
}
/// The reason why a region is being opened.
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
@@ -203,13 +263,15 @@ impl OpenRegionReason {
matches!(self, Self::Normal)
}
/// Returns true if this open reason requires the datanode to be backed by object storage.
///
/// When a region is migrated or failed over to this node, the region data must reside in
/// a shared object storage that both the source and destination datanodes can access.
/// If the datanode only has local file storage, the data would not be accessible.
pub fn requires_object_storage(&self) -> bool {
matches!(self, Self::RegionMigration | Self::RegionFailover)
/// Returns the default required capabilities for this open reason.
pub fn default_required_capabilities(&self) -> OpenRegionCapability {
match self {
Self::Normal => OpenRegionCapability::empty(),
Self::RegionMigration => OpenRegionCapability::OBJECT_STORAGE,
Self::RegionFailover => {
OpenRegionCapability::OBJECT_STORAGE | OpenRegionCapability::REMOTE_WAL
}
}
}
}
@@ -227,6 +289,11 @@ pub struct OpenRegion {
/// The reason why this region is being opened.
#[serde(default, skip_serializing_if = "OpenRegionReason::is_normal")]
pub reason: OpenRegionReason,
/// Explicit required capabilities for opening this region.
///
/// When empty, the receiver falls back to the default capabilities mapped from [`reason`].
#[serde(default, skip_serializing_if = "OpenRegionCapability::is_empty")]
pub required_capabilities: OpenRegionCapability,
}
impl OpenRegion {
@@ -244,6 +311,7 @@ impl OpenRegion {
region_wal_options,
skip_wal_replay,
reason: OpenRegionReason::Normal,
required_capabilities: OpenRegionCapability::empty(),
}
}
@@ -252,6 +320,24 @@ impl OpenRegion {
self.reason = reason;
self
}
/// Sets explicit required capabilities for opening this region.
pub fn with_required_capabilities(
mut self,
required_capabilities: OpenRegionCapability,
) -> Self {
self.required_capabilities = required_capabilities;
self
}
/// Returns the effective required capabilities for this open request.
pub fn required_capabilities(&self) -> OpenRegionCapability {
if self.required_capabilities.is_empty() {
self.reason.default_required_capabilities()
} else {
self.required_capabilities
}
}
}
/// The instruction of downgrading leader region.
@@ -1407,10 +1493,55 @@ mod tests {
region_wal_options: HashMap::new(),
skip_wal_replay: false,
reason: OpenRegionReason::Normal,
required_capabilities: OpenRegionCapability::empty(),
};
assert_eq!(expected, deserialized);
}
#[test]
fn test_open_region_reason_default_required_capabilities() {
assert_eq!(
OpenRegionReason::Normal.default_required_capabilities(),
OpenRegionCapability::empty()
);
assert_eq!(
OpenRegionReason::RegionMigration.default_required_capabilities(),
OpenRegionCapability::OBJECT_STORAGE
);
assert_eq!(
OpenRegionReason::RegionFailover.default_required_capabilities(),
OpenRegionCapability::OBJECT_STORAGE | OpenRegionCapability::REMOTE_WAL
);
}
#[test]
fn test_open_region_required_capabilities_fallback() {
let open_region = OpenRegion::new(
RegionIdent {
datanode_id: 1,
table_id: 1024,
region_number: 1,
engine: "mito2".to_string(),
},
"test/foo",
HashMap::new(),
HashMap::new(),
false,
)
.with_reason(OpenRegionReason::RegionFailover);
assert_eq!(
open_region.required_capabilities(),
OpenRegionCapability::OBJECT_STORAGE | OpenRegionCapability::REMOTE_WAL
);
let open_region =
open_region.with_required_capabilities(OpenRegionCapability::OBJECT_STORAGE);
assert_eq!(
open_region.required_capabilities(),
OpenRegionCapability::OBJECT_STORAGE
);
}
#[test]
fn test_flush_regions_creation() {
let region_id = RegionId::new(1024, 1);

View File

@@ -98,8 +98,7 @@ impl HeartbeatTask {
Arc::new(SuspendHandler::new(region_server.suspend_state())),
Arc::new(
RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend)
.with_open_region_parallelism(opts.init_regions_parallelism)
.with_is_object_storage(opts.storage.is_object_storage()),
.with_open_region_parallelism(opts.init_regions_parallelism),
),
Arc::new(InvalidateCacheHandler::new(cache_invalidator)),
]));

View File

@@ -60,8 +60,6 @@ pub struct RegionHeartbeatResponseHandler {
open_region_parallelism: usize,
gc_tasks: TaskTracker<GcReport>,
kv_backend: KvBackendRef,
/// Whether the datanode's default storage is backed by object storage.
is_object_storage: bool,
}
#[async_trait::async_trait]
@@ -107,7 +105,6 @@ impl RegionHeartbeatResponseHandler {
open_region_parallelism: (num_cpus::get() / 2).max(1),
gc_tasks: TaskTracker::new(),
kv_backend,
is_object_storage: false,
}
}
@@ -117,12 +114,6 @@ impl RegionHeartbeatResponseHandler {
self
}
/// Sets whether the datanode is backed by object storage.
pub fn with_is_object_storage(mut self, is_object_storage: bool) -> Self {
self.is_object_storage = is_object_storage;
self
}
fn build_handler(
&self,
instruction: &Instruction,
@@ -132,7 +123,6 @@ impl RegionHeartbeatResponseHandler {
Instruction::OpenRegions(_) => Ok(Some(Box::new(
OpenRegionsHandler {
open_region_parallelism: self.open_region_parallelism,
is_object_storage: self.is_object_storage,
}
.into(),
))),

View File

@@ -14,21 +14,14 @@
use common_meta::instruction::{InstructionReply, OpenRegion, SimpleReply};
use common_meta::wal_provider::prepare_wal_options;
use common_telemetry::warn;
use store_api::path_utils::table_dir;
use store_api::region_request::{PathType, RegionOpenRequest};
use store_api::region_request::{OpenRegionCapability, PathType, RegionOpenRequest};
use store_api::storage::RegionId;
use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
pub struct OpenRegionsHandler {
pub open_region_parallelism: usize,
/// Whether the datanode is backed by object storage (e.g., S3, OSS, etc.).
///
/// When `false`, region migration and failover open requests are rejected
/// because the region data only exists on the source node's local storage
/// and cannot be accessed by this node.
pub is_object_storage: bool,
}
#[async_trait::async_trait]
@@ -39,38 +32,12 @@ impl InstructionHandler for OpenRegionsHandler {
ctx: &HandlerContext,
open_regions: Self::Instruction,
) -> Option<InstructionReply> {
// Check capability: if this datanode is not backed by object storage, reject
// open requests that require shared object storage (migration/failover).
if !self.is_object_storage {
let rejected: Vec<_> = open_regions
.iter()
.filter(|r| r.reason.requires_object_storage())
.collect();
if let Some(first) = rejected.first() {
let reason = &first.reason;
let regions: Vec<_> = rejected
.iter()
.map(|r| r.region_ident.get_region_id())
.collect();
warn!(
"Rejecting open_region instruction for regions {:?}: reason={:?} requires \
object storage, but this datanode uses local file storage.",
regions, reason,
);
return Some(InstructionReply::OpenRegions(SimpleReply {
result: false,
error: Some(format!(
"Cannot open regions {regions:?} on a datanode without object storage: \
open reason {:?} requires shared object storage.",
reason,
)),
}));
}
}
let requests = open_regions
.into_iter()
.map(|open_region| {
let required_capabilities = OpenRegionCapability::from_bits_retain(
open_region.required_capabilities().bits(),
);
let OpenRegion {
region_ident,
region_storage_path,
@@ -88,6 +55,7 @@ impl InstructionHandler for OpenRegionsHandler {
options: region_options,
skip_wal_replay,
checkpoint: None,
required_capabilities,
};
(region_id, request)
})
@@ -147,6 +115,7 @@ mod tests {
region_wal_options: HashMap::new(),
skip_wal_replay: false,
reason: Default::default(),
required_capabilities: Default::default(),
})
.collect();

View File

@@ -2057,6 +2057,7 @@ mod tests {
options: Default::default(),
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await
@@ -2235,6 +2236,7 @@ mod tests {
options: Default::default(),
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
},
),
(
@@ -2246,6 +2248,7 @@ mod tests {
options: Default::default(),
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
},
),
],
@@ -2268,6 +2271,7 @@ mod tests {
options: Default::default(),
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
},
),
(
@@ -2279,6 +2283,7 @@ mod tests {
options: Default::default(),
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
},
),
],

View File

@@ -174,6 +174,7 @@ pub(crate) async fn build_region_open_requests(
options,
skip_wal_replay: true,
checkpoint: None,
required_capabilities: Default::default(),
},
));
}

View File

@@ -181,6 +181,7 @@ mod tests {
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
};
let region = FileRegion::open(region_id, request, &object_store)
@@ -238,6 +239,7 @@ mod tests {
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
};
let err = FileRegion::open(region_id, request, &object_store)
.await

View File

@@ -13,21 +13,24 @@
// limitations under the License.
use std::any::Any;
use std::collections::HashMap;
use std::ops::Div;
use api::v1::meta::MailboxMessage;
use common_meta::RegionIdent;
use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_meta::instruction::{
Instruction, InstructionReply, OpenRegion, OpenRegionReason, SimpleReply,
Instruction, InstructionReply, OpenRegion, OpenRegionCapability, OpenRegionReason, SimpleReply,
};
use common_meta::key::datanode_table::RegionInfo;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::info;
use common_telemetry::tracing_context::TracingContext;
use common_wal::options::WalOptions;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::region_engine::RegionRole;
use store_api::storage::RegionNumber;
use tokio::time::Instant;
use crate::error::{self, Result};
@@ -62,6 +65,27 @@ impl State for OpenCandidateRegion {
}
impl OpenCandidateRegion {
fn required_capabilities(
open_reason: &OpenRegionReason,
region_wal_options: &HashMap<RegionNumber, String>,
region_number: RegionNumber,
) -> Result<OpenRegionCapability> {
let mut required_capabilities = open_reason.default_required_capabilities();
if matches!(open_reason, OpenRegionReason::RegionFailover) {
let wal_options = region_wal_options
.get(&region_number)
.map(|wal_options| serde_json::from_str::<WalOptions>(wal_options))
.transpose()
.context(error::ParseWalOptionsSnafu)?;
if !matches!(wal_options.unwrap_or_default(), WalOptions::Kafka(_)) {
required_capabilities = OpenRegionCapability::OBJECT_STORAGE;
}
}
Ok(required_capabilities)
}
/// Builds open region instructions
///
/// Abort(non-retry):
@@ -92,6 +116,8 @@ impl OpenCandidateRegion {
region_wal_options,
engine,
} = datanode_table_value.region_info.clone();
let required_capabilities =
Self::required_capabilities(&open_reason, &region_wal_options, region_number)?;
open_regions.push(
OpenRegion::new(
@@ -106,7 +132,8 @@ impl OpenCandidateRegion {
region_wal_options,
true,
)
.with_reason(open_reason.clone()),
.with_reason(open_reason.clone())
.with_required_capabilities(required_capabilities),
);
}
@@ -228,6 +255,7 @@ mod tests {
use common_meta::key::test_utils::new_test_table_info;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_wal::options::{KafkaWalOptions, WalOptions};
use store_api::storage::RegionId;
use super::*;
@@ -255,9 +283,50 @@ mod tests {
region_wal_options: Default::default(),
skip_wal_replay: true,
reason: OpenRegionReason::RegionMigration,
required_capabilities: OpenRegionCapability::OBJECT_STORAGE,
}])
}
#[test]
fn test_required_capabilities_for_region_migration() {
let capabilities = OpenCandidateRegion::required_capabilities(
&OpenRegionReason::RegionMigration,
&HashMap::new(),
1,
)
.unwrap();
assert_eq!(capabilities, OpenRegionCapability::OBJECT_STORAGE);
}
#[test]
fn test_required_capabilities_for_failover_with_remote_wal() {
let wal_options = serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
topic: "test-topic".to_string(),
}))
.unwrap();
let capabilities = OpenCandidateRegion::required_capabilities(
&OpenRegionReason::RegionFailover,
&HashMap::from([(1, wal_options)]),
1,
)
.unwrap();
assert_eq!(
capabilities,
OpenRegionCapability::OBJECT_STORAGE | OpenRegionCapability::REMOTE_WAL
);
}
#[test]
fn test_required_capabilities_for_failover_with_local_wal() {
let capabilities = OpenCandidateRegion::required_capabilities(
&OpenRegionReason::RegionFailover,
&HashMap::new(),
1,
)
.unwrap();
assert_eq!(capabilities, OpenRegionCapability::OBJECT_STORAGE);
}
#[tokio::test]
async fn test_datanode_table_is_not_found_error() {
let state = OpenCandidateRegion;

View File

@@ -620,6 +620,7 @@ mod test {
options: physical_region_option,
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
};
engine
.handle_request(physical_region_id, RegionRequest::Open(open_request))
@@ -644,6 +645,7 @@ mod test {
options: HashMap::new(),
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
};
engine
.handle_request(
@@ -721,6 +723,7 @@ mod test {
options: physical_region_option,
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
};
// Opening an already opened region should succeed.
// Since the region is already open, no metadata recovery operations will be performed.
@@ -749,6 +752,7 @@ mod test {
options: physical_region_option,
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
};
let err = metric_engine
.handle_request(physical_region_id, RegionRequest::Open(open_request))
@@ -854,6 +858,7 @@ mod test {
options: options.clone(),
skip_wal_replay: true,
checkpoint: None,
required_capabilities: Default::default(),
},
)
})

View File

@@ -211,6 +211,7 @@ impl MetricEngineInner {
) -> (RegionOpenRequest, RegionOpenRequest) {
let metadata_region_options = region_options_for_metadata_region(&request.options);
let checkpoint = request.checkpoint;
let required_capabilities = request.required_capabilities;
let open_metadata_region_request = RegionOpenRequest {
table_dir: request.table_dir.clone(),
@@ -222,6 +223,7 @@ impl MetricEngineInner {
entry_id: checkpoint.metadata_entry_id.unwrap_or_default(),
metadata_entry_id: None,
}),
required_capabilities,
};
let mut data_region_options = request.options;
@@ -239,6 +241,7 @@ impl MetricEngineInner {
entry_id: checkpoint.entry_id,
metadata_entry_id: None,
}),
required_capabilities,
};
(open_metadata_region_request, open_data_region_request)

View File

@@ -321,6 +321,7 @@ mod tests {
options: physical_region_option,
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await

View File

@@ -144,6 +144,7 @@ impl TestEnv {
options: physical_region_option,
skip_wal_replay: true,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await

View File

@@ -277,6 +277,7 @@ async fn test_alter_region_with_format(flat_format: bool) {
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await
@@ -481,6 +482,7 @@ async fn test_put_after_alter_with_format(flat_format: bool) {
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await
@@ -844,6 +846,7 @@ async fn test_alter_column_fulltext_options_with_format(flat_format: bool) {
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await
@@ -979,6 +982,7 @@ async fn test_alter_column_set_inverted_index_with_format(flat_format: bool) {
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await
@@ -1248,6 +1252,7 @@ async fn test_alter_region_sst_format_with_flush() {
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await
@@ -1366,6 +1371,7 @@ async fn test_alter_region_sst_format_without_flush() {
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await
@@ -1492,6 +1498,7 @@ async fn test_alter_region_sst_format_flat_to_pk_with_flush() {
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await
@@ -1610,6 +1617,7 @@ async fn test_alter_region_sst_format_flat_to_pk_without_flush() {
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await
@@ -1725,6 +1733,7 @@ async fn test_alter_region_append_mode_with_flush() {
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await
@@ -1843,6 +1852,7 @@ async fn test_alter_region_append_mode_without_flush() {
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await

View File

@@ -348,6 +348,7 @@ async fn test_alter_append_mode_clears_merge_mode_with_format(flat_format: bool)
options,
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await

View File

@@ -196,6 +196,7 @@ async fn test_region_replay_with_format(factory: Option<LogStoreFactory>, flat_f
options,
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await

View File

@@ -160,6 +160,7 @@ async fn test_batch_catchup_with_format(factory: Option<LogStoreFactory>, flat_f
skip_wal_replay: true,
path_type: PathType::Bare,
checkpoint: None,
required_capabilities: Default::default(),
},
)
})
@@ -185,6 +186,7 @@ async fn test_batch_catchup_with_format(factory: Option<LogStoreFactory>, flat_f
metadata_entry_id: None,
location_id: None,
checkpoint: None,
required_capabilities: Default::default(),
},
)
})
@@ -232,6 +234,7 @@ async fn test_batch_catchup_err_with_format(factory: Option<LogStoreFactory>, fl
metadata_entry_id: None,
location_id: None,
checkpoint: None,
required_capabilities: Default::default(),
},
)
})

View File

@@ -136,6 +136,7 @@ async fn test_batch_open_with_format(factory: Option<LogStoreFactory>, flat_form
skip_wal_replay: false,
path_type: PathType::Bare,
checkpoint: None,
required_capabilities: Default::default(),
},
)
})
@@ -149,6 +150,7 @@ async fn test_batch_open_with_format(factory: Option<LogStoreFactory>, flat_form
skip_wal_replay: false,
path_type: PathType::Bare,
checkpoint: None,
required_capabilities: Default::default(),
},
));
@@ -221,6 +223,7 @@ async fn test_batch_open_err_with_format(factory: Option<LogStoreFactory>, flat_
skip_wal_replay: false,
path_type: PathType::Bare,
checkpoint: None,
required_capabilities: Default::default(),
},
)
})

View File

@@ -112,6 +112,7 @@ async fn test_bump_committed_sequence_with_format(flat_format: bool) {
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await
@@ -151,6 +152,7 @@ async fn test_bump_committed_sequence_with_format(flat_format: bool) {
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await

View File

@@ -97,6 +97,7 @@ async fn test_catchup_with_last_entry_id(factory: Option<LogStoreFactory>) {
options,
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await
@@ -218,6 +219,7 @@ async fn test_catchup_with_incorrect_last_entry_id(factory: Option<LogStoreFacto
options,
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await
@@ -321,6 +323,7 @@ async fn test_catchup_without_last_entry_id(factory: Option<LogStoreFactory>) {
options,
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await
@@ -423,6 +426,7 @@ async fn test_catchup_with_manifest_update(factory: Option<LogStoreFactory>) {
options,
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await
@@ -527,6 +531,7 @@ async fn open_region(
skip_wal_replay,
path_type: PathType::Bare,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await
@@ -622,6 +627,7 @@ async fn test_local_catchup(factory: Option<LogStoreFactory>) {
skip_wal_replay: true,
path_type: PathType::Bare,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await

View File

@@ -1023,6 +1023,7 @@ async fn test_change_region_compaction_window_with_format(flat_format: bool) {
options: Default::default(),
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await
@@ -1125,6 +1126,7 @@ async fn test_open_overwrite_compaction_window_with_format(flat_format: bool) {
options,
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await

View File

@@ -64,6 +64,7 @@ async fn test_engine_open_empty_with_format(flat_format: bool) {
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await
@@ -110,6 +111,7 @@ async fn test_engine_open_existing_with_format(flat_format: bool) {
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await
@@ -237,6 +239,7 @@ async fn test_engine_region_open_with_options_with_format(flat_format: bool) {
options: HashMap::from([("ttl".to_string(), "4d".to_string())]),
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await
@@ -297,6 +300,7 @@ async fn test_engine_region_open_with_custom_store_with_format(flat_format: bool
options: HashMap::from([("storage".to_string(), "Gcs".to_string())]),
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await
@@ -392,6 +396,7 @@ async fn test_open_region_skip_wal_replay_with_format(flat_format: bool) {
options: Default::default(),
skip_wal_replay: true,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await
@@ -431,6 +436,7 @@ async fn test_open_region_skip_wal_replay_with_format(flat_format: bool) {
options: Default::default(),
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await
@@ -484,6 +490,7 @@ async fn test_open_region_wait_for_opening_region_ok_with_format(flat_format: bo
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await
@@ -535,6 +542,7 @@ async fn test_open_region_wait_for_opening_region_err_with_format(flat_format: b
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await
@@ -691,6 +699,7 @@ async fn test_open_backfills_partition_expr_with_fetcher() {
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await
@@ -725,6 +734,7 @@ async fn test_open_backfills_partition_expr_with_fetcher() {
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await
@@ -766,6 +776,7 @@ async fn test_open_keeps_none_without_fetcher() {
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await

View File

@@ -52,6 +52,7 @@ async fn scan_in_parallel(
skip_wal_replay: false,
path_type: PathType::Bare,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await

View File

@@ -87,6 +87,7 @@ async fn test_close_region_skip_wal(insert: bool) {
options: request.options.clone(),
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await
@@ -154,6 +155,7 @@ async fn test_close_follower_region_skip_wal() {
options: request.options.clone(),
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await
@@ -271,6 +273,7 @@ async fn test_close_region_after_truncate_skip_wal() {
options: request.options,
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await

View File

@@ -127,6 +127,7 @@ async fn test_sync_after_flush_region_with_format(flat_format: bool) {
// Ensure the region is not replayed from the WAL.
skip_wal_replay: true,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await
@@ -239,6 +240,7 @@ async fn test_sync_after_alter_region_with_format(flat_format: bool) {
// Ensure the region is not replayed from the WAL.
skip_wal_replay: true,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await

View File

@@ -323,6 +323,7 @@ async fn test_engine_truncate_reopen_with_format(flat_format: bool) {
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await
@@ -447,6 +448,7 @@ async fn test_engine_truncate_during_flush_with_format(flat_format: bool) {
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await

View File

@@ -1282,6 +1282,7 @@ pub async fn reopen_region(
skip_wal_replay: false,
path_type: PathType::Bare,
checkpoint: None,
required_capabilities: Default::default(),
}),
)
.await

View File

@@ -18,38 +18,94 @@ use std::sync::Arc;
use std::time::Instant;
use common_telemetry::info;
use common_wal::options::WalOptions;
use object_store::util::join_path;
use snafu::{OptionExt, ResultExt};
use store_api::logstore::LogStore;
use store_api::region_request::RegionOpenRequest;
use store_api::region_request::{OpenRegionCapability, RegionOpenRequest};
use store_api::storage::RegionId;
use table::requests::STORAGE_KEY;
use crate::error::{
ObjectStoreNotFoundSnafu, OpenDalSnafu, OpenRegionSnafu, RegionNotFoundSnafu, Result,
InvalidRequestSnafu, ObjectStoreNotFoundSnafu, OpenDalSnafu, OpenRegionSnafu,
RegionNotFoundSnafu, Result,
};
use crate::region::opener::{RegionOpener, sanitize_open_request_options};
use crate::region::options::parse_wal_options;
use crate::request::OptionOutputTx;
use crate::sst::location::region_dir_from_table_dir;
use crate::wal::entry_distributor::WalEntryReceiver;
use crate::worker::handle_drop::remove_region_dir_once;
use crate::worker::{DROPPING_MARKER_FILE, RegionWorkerLoop};
fn has_object_storage_capability(object_store: &object_store::ObjectStore) -> bool {
object_store.info().scheme() != object_store::services::FS_SCHEME
}
fn has_remote_wal_capability(request: &RegionOpenRequest) -> Result<bool> {
Ok(matches!(
parse_wal_options(&request.options).context(crate::error::SerdeJsonSnafu)?,
WalOptions::Kafka(_)
))
}
impl<S: LogStore> RegionWorkerLoop<S> {
fn object_store_from_request(
&self,
request: &RegionOpenRequest,
) -> Result<&object_store::ObjectStore> {
if let Some(storage_name) = request.options.get(STORAGE_KEY) {
self.object_store_manager
.find(storage_name)
.with_context(|| ObjectStoreNotFoundSnafu {
object_store: storage_name.clone(),
})
} else {
Ok(self.object_store_manager.default_object_store())
}
}
fn validate_open_region_capabilities(
&self,
region_id: RegionId,
request: &RegionOpenRequest,
object_store: &object_store::ObjectStore,
) -> Result<()> {
let required_capabilities = request.required_capabilities;
if required_capabilities.contains(OpenRegionCapability::OBJECT_STORAGE)
&& !has_object_storage_capability(object_store)
{
return InvalidRequestSnafu {
region_id,
reason: format!(
"opening region requires ObjectStorage capability, but storage scheme is {}",
object_store.info().scheme()
),
}
.fail();
}
if required_capabilities.contains(OpenRegionCapability::REMOTE_WAL)
&& !has_remote_wal_capability(request)?
{
return InvalidRequestSnafu {
region_id,
reason: "opening region requires RemoteWal capability, but wal options are not remote wal"
.to_string(),
}
.fail();
}
Ok(())
}
async fn check_and_cleanup_region(
&self,
region_id: RegionId,
request: &RegionOpenRequest,
) -> Result<()> {
let object_store = if let Some(storage_name) = request.options.get(STORAGE_KEY) {
self.object_store_manager
.find(storage_name)
.with_context(|| ObjectStoreNotFoundSnafu {
object_store: storage_name.clone(),
})?
} else {
self.object_store_manager.default_object_store()
};
let object_store = self.object_store_from_request(request)?;
self.validate_open_region_capabilities(region_id, request, object_store)?;
// Check if this region is pending drop. And clean the entire dir if so.
let region_dir =
region_dir_from_table_dir(&request.table_dir, region_id, request.path_type);
@@ -159,3 +215,51 @@ impl<S: LogStore> RegionWorkerLoop<S> {
});
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use common_wal::options::{KafkaWalOptions, WAL_OPTIONS_KEY, WalOptions};
use object_store::ObjectStore;
use object_store::services::{Fs, Memory};
use store_api::region_request::{OpenRegionCapability, PathType, RegionOpenRequest};
use super::{has_object_storage_capability, has_remote_wal_capability};
fn new_open_request(options: HashMap<String, String>) -> RegionOpenRequest {
RegionOpenRequest {
engine: "mito2".to_string(),
table_dir: "test".to_string(),
path_type: PathType::Bare,
options,
skip_wal_replay: false,
checkpoint: None,
required_capabilities: OpenRegionCapability::empty(),
}
}
#[test]
fn test_has_object_storage_capability() {
let dir = common_test_util::temp_dir::create_temp_dir("handle-open");
let fs = ObjectStore::new(Fs::default().root(dir.path().to_str().unwrap()))
.unwrap()
.finish();
assert!(!has_object_storage_capability(&fs));
let memory = ObjectStore::new(Memory::default()).unwrap().finish();
assert!(has_object_storage_capability(&memory));
}
#[test]
fn test_has_remote_wal_capability() {
assert!(!has_remote_wal_capability(&new_open_request(HashMap::new())).unwrap());
let wal_options = serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
topic: "test-topic".to_string(),
}))
.unwrap();
let request = new_open_request(HashMap::from([(WAL_OPTIONS_KEY.to_string(), wal_options)]));
assert!(has_remote_wal_capability(&request).unwrap());
}
}

View File

@@ -315,6 +315,7 @@ fn make_region_open(open: OpenRequest) -> Result<Vec<(RegionId, RegionRequest)>>
options: open.options,
skip_wal_replay: false,
checkpoint: None,
required_capabilities: OpenRegionCapability::empty(),
}),
)])
}
@@ -567,6 +568,30 @@ pub struct RegionDropRequest {
}
/// Open region request.
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct OpenRegionCapability(u8);
impl OpenRegionCapability {
pub const OBJECT_STORAGE: Self = Self(1 << 0);
pub const REMOTE_WAL: Self = Self(1 << 1);
pub const fn empty() -> Self {
Self(0)
}
pub const fn from_bits_retain(bits: u8) -> Self {
Self(bits)
}
pub const fn contains(self, other: Self) -> bool {
(self.0 & other.0) == other.0
}
pub const fn is_empty(&self) -> bool {
self.0 == 0
}
}
#[derive(Debug, Clone)]
pub struct RegionOpenRequest {
/// Region engine name
@@ -581,6 +606,8 @@ pub struct RegionOpenRequest {
pub skip_wal_replay: bool,
/// Replay checkpoint.
pub checkpoint: Option<ReplayCheckpoint>,
/// Required capabilities for opening the region.
pub required_capabilities: OpenRegionCapability,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]