diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 11f31502b1..2de3b3923f 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -6,7 +6,7 @@ license.workspace = true [features] default = [] -test = ["common-test-util", "log-store", "rstest", "rstest_reuse", "rskafka"] +test = ["common-test-util", "rstest", "rstest_reuse", "rskafka"] [lints] workspace = true @@ -45,7 +45,7 @@ humantime-serde.workspace = true index.workspace = true itertools.workspace = true lazy_static = "1.4" -log-store = { workspace = true, optional = true } +log-store = { workspace = true } memcomparable = "0.2" moka = { workspace = true, features = ["sync", "future"] } object-store.workspace = true diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 86b310e1ac..ecaeb0786d 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -968,6 +968,9 @@ pub enum Error { #[snafu(display("Manual compaction is override by following operations."))] ManualCompactionOverride {}, + + #[snafu(display("Incompatible WAL provider change. This is typically caused by changing WAL provider in database config file without completely cleaning existing files. Global provider: {}, region provider: {}", global, region))] + IncompatibleWalProviderChange { global: String, region: String }, } pub type Result = std::result::Result; @@ -1114,6 +1117,8 @@ impl ErrorExt for Error { } ManualCompactionOverride {} => StatusCode::Cancelled, + + IncompatibleWalProviderChange { .. } => StatusCode::InvalidArguments, } } diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 2992a475ab..e984c5c5f2 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -14,6 +14,7 @@ //! Region opener. +use std::any::TypeId; use std::collections::HashMap; use std::sync::atomic::AtomicI64; use std::sync::Arc; @@ -22,6 +23,8 @@ use common_telemetry::{debug, error, info, warn}; use common_wal::options::WalOptions; use futures::future::BoxFuture; use futures::StreamExt; +use log_store::kafka::log_store::KafkaLogStore; +use log_store::raft_engine::log_store::RaftEngineLogStore; use object_store::manager::ObjectStoreManagerRef; use object_store::util::{join_dir, normalize_dir}; use snafu::{ensure, OptionExt, ResultExt}; @@ -34,6 +37,7 @@ use store_api::storage::{ColumnId, RegionId}; use crate::access_layer::AccessLayer; use crate::cache::CacheManagerRef; use crate::config::MitoConfig; +use crate::error; use crate::error::{ EmptyRegionDirSnafu, InvalidMetadataSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu, Result, StaleLogEntrySnafu, @@ -204,7 +208,7 @@ impl RegionOpener { // Safety: must be set before calling this method. let options = self.options.take().unwrap(); let object_store = self.object_store(&options.storage)?.clone(); - let provider = self.provider(&options.wal_options); + let provider = self.provider::(&options.wal_options)?; let metadata = Arc::new(metadata); // Create a manifest manager for this region and writes regions to the manifest file. let region_manifest_options = self.manifest_options(config, &options)?; @@ -297,10 +301,28 @@ impl RegionOpener { Ok(region) } - fn provider(&self, wal_options: &WalOptions) -> Provider { + fn provider(&self, wal_options: &WalOptions) -> Result { match wal_options { - WalOptions::RaftEngine => Provider::raft_engine_provider(self.region_id.as_u64()), - WalOptions::Kafka(options) => Provider::kafka_provider(options.topic.to_string()), + WalOptions::RaftEngine => { + ensure!( + TypeId::of::() == TypeId::of::(), + error::IncompatibleWalProviderChangeSnafu { + global: "`kafka`", + region: "`raft_engine`", + } + ); + Ok(Provider::raft_engine_provider(self.region_id.as_u64())) + } + WalOptions::Kafka(options) => { + ensure!( + TypeId::of::() == TypeId::of::(), + error::IncompatibleWalProviderChangeSnafu { + global: "`raft_engine`", + region: "`kafka`", + } + ); + Ok(Provider::kafka_provider(options.topic.to_string())) + } } } @@ -326,7 +348,7 @@ impl RegionOpener { let metadata = manifest.metadata.clone(); let region_id = self.region_id; - let provider = self.provider(®ion_options.wal_options); + let provider = self.provider::(®ion_options.wal_options)?; let wal_entry_reader = self .wal_entry_reader .take()