chore: check region wal provider on startup to avoid inconsistence (#5687)

This commit is contained in:
Lei, HUANG
2025-03-12 01:51:18 +08:00
committed by GitHub
parent b8070adc3a
commit 7c97fae522
3 changed files with 34 additions and 7 deletions

View File

@@ -6,7 +6,7 @@ license.workspace = true
[features]
default = []
test = ["common-test-util", "log-store", "rstest", "rstest_reuse", "rskafka"]
test = ["common-test-util", "rstest", "rstest_reuse", "rskafka"]
[lints]
workspace = true
@@ -45,7 +45,7 @@ humantime-serde.workspace = true
index.workspace = true
itertools.workspace = true
lazy_static = "1.4"
log-store = { workspace = true, optional = true }
log-store = { workspace = true }
memcomparable = "0.2"
moka = { workspace = true, features = ["sync", "future"] }
object-store.workspace = true

View File

@@ -968,6 +968,9 @@ pub enum Error {
#[snafu(display("Manual compaction is override by following operations."))]
ManualCompactionOverride {},
#[snafu(display("Incompatible WAL provider change. This is typically caused by changing WAL provider in database config file without completely cleaning existing files. Global provider: {}, region provider: {}", global, region))]
IncompatibleWalProviderChange { global: String, region: String },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -1114,6 +1117,8 @@ impl ErrorExt for Error {
}
ManualCompactionOverride {} => StatusCode::Cancelled,
IncompatibleWalProviderChange { .. } => StatusCode::InvalidArguments,
}
}

View File

@@ -14,6 +14,7 @@
//! Region opener.
use std::any::TypeId;
use std::collections::HashMap;
use std::sync::atomic::AtomicI64;
use std::sync::Arc;
@@ -22,6 +23,8 @@ use common_telemetry::{debug, error, info, warn};
use common_wal::options::WalOptions;
use futures::future::BoxFuture;
use futures::StreamExt;
use log_store::kafka::log_store::KafkaLogStore;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use object_store::manager::ObjectStoreManagerRef;
use object_store::util::{join_dir, normalize_dir};
use snafu::{ensure, OptionExt, ResultExt};
@@ -34,6 +37,7 @@ use store_api::storage::{ColumnId, RegionId};
use crate::access_layer::AccessLayer;
use crate::cache::CacheManagerRef;
use crate::config::MitoConfig;
use crate::error;
use crate::error::{
EmptyRegionDirSnafu, InvalidMetadataSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu,
Result, StaleLogEntrySnafu,
@@ -204,7 +208,7 @@ impl RegionOpener {
// Safety: must be set before calling this method.
let options = self.options.take().unwrap();
let object_store = self.object_store(&options.storage)?.clone();
let provider = self.provider(&options.wal_options);
let provider = self.provider::<S>(&options.wal_options)?;
let metadata = Arc::new(metadata);
// Create a manifest manager for this region and writes regions to the manifest file.
let region_manifest_options = self.manifest_options(config, &options)?;
@@ -297,10 +301,28 @@ impl RegionOpener {
Ok(region)
}
fn provider(&self, wal_options: &WalOptions) -> Provider {
fn provider<S: LogStore>(&self, wal_options: &WalOptions) -> Result<Provider> {
match wal_options {
WalOptions::RaftEngine => Provider::raft_engine_provider(self.region_id.as_u64()),
WalOptions::Kafka(options) => Provider::kafka_provider(options.topic.to_string()),
WalOptions::RaftEngine => {
ensure!(
TypeId::of::<RaftEngineLogStore>() == TypeId::of::<S>(),
error::IncompatibleWalProviderChangeSnafu {
global: "`kafka`",
region: "`raft_engine`",
}
);
Ok(Provider::raft_engine_provider(self.region_id.as_u64()))
}
WalOptions::Kafka(options) => {
ensure!(
TypeId::of::<KafkaLogStore>() == TypeId::of::<S>(),
error::IncompatibleWalProviderChangeSnafu {
global: "`raft_engine`",
region: "`kafka`",
}
);
Ok(Provider::kafka_provider(options.topic.to_string()))
}
}
}
@@ -326,7 +348,7 @@ impl RegionOpener {
let metadata = manifest.metadata.clone();
let region_id = self.region_id;
let provider = self.provider(&region_options.wal_options);
let provider = self.provider::<S>(&region_options.wal_options)?;
let wal_entry_reader = self
.wal_entry_reader
.take()