diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index 0af9966fc6..a5bc914999 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -345,6 +345,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to build wal options allocator"))] + BuildWalOptionsAllocator { + #[snafu(implicit)] + location: Location, + source: common_meta::error::Error, + }, } pub type Result = std::result::Result; @@ -378,7 +385,8 @@ impl ErrorExt for Error { Error::StartProcedureManager { source, .. } | Error::StopProcedureManager { source, .. } => source.status_code(), - Error::StartWalOptionsAllocator { source, .. } => source.status_code(), + Error::BuildWalOptionsAllocator { source, .. } + | Error::StartWalOptionsAllocator { source, .. } => source.status_code(), Error::ReplCreation { .. } | Error::Readline { .. } | Error::HttpQuerySql { .. } => { StatusCode::Internal } diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index c52499eccf..93abfa9605 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -43,7 +43,7 @@ use common_meta::node_manager::NodeManagerRef; use common_meta::peer::Peer; use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::sequence::SequenceBuilder; -use common_meta::wal_options_allocator::{WalOptionsAllocator, WalOptionsAllocatorRef}; +use common_meta::wal_options_allocator::{build_wal_options_allocator, WalOptionsAllocatorRef}; use common_procedure::{ProcedureInfo, ProcedureManagerRef}; use common_telemetry::info; use common_telemetry::logging::{LoggingOptions, TracingOptions}; @@ -76,10 +76,10 @@ use tokio::sync::{broadcast, RwLock}; use tracing_appender::non_blocking::WorkerGuard; use crate::error::{ - BuildCacheRegistrySnafu, CreateDirSnafu, IllegalConfigSnafu, InitDdlManagerSnafu, - InitMetadataSnafu, InitTimezoneSnafu, LoadLayeredConfigSnafu, OtherSnafu, Result, - ShutdownDatanodeSnafu, ShutdownFlownodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu, - StartFlownodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu, + BuildCacheRegistrySnafu, BuildWalOptionsAllocatorSnafu, CreateDirSnafu, IllegalConfigSnafu, + InitDdlManagerSnafu, InitMetadataSnafu, InitTimezoneSnafu, LoadLayeredConfigSnafu, OtherSnafu, + Result, ShutdownDatanodeSnafu, ShutdownFlownodeSnafu, ShutdownFrontendSnafu, + StartDatanodeSnafu, StartFlownodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu, StartWalOptionsAllocatorSnafu, StopProcedureManagerSnafu, }; use crate::options::{GlobalOptions, GreptimeOptions}; @@ -569,10 +569,11 @@ impl StartCommand { .step(10) .build(), ); - let wal_options_allocator = Arc::new(WalOptionsAllocator::new( - opts.wal.clone().into(), - kv_backend.clone(), - )); + let kafka_options = opts.wal.clone().into(); + let wal_options_allocator = build_wal_options_allocator(&kafka_options, kv_backend.clone()) + .await + .context(BuildWalOptionsAllocatorSnafu)?; + let wal_options_allocator = Arc::new(wal_options_allocator); let table_meta_allocator = Arc::new(TableMetadataAllocator::new( table_id_sequence, wal_options_allocator.clone(), diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index b6aa57d497..ac03941bfe 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -51,10 +51,14 @@ //! 10. Table flow key: `__flow/source_table/{table_id}/{flownode_id}/{flow_id}/{partition_id}` //! - Mapping source table's {table_id} to {flownode_id} //! - Used in `Flownode` booting. +//! //! 11. View info key: `__view_info/{view_id}` //! - The value is a [ViewInfoValue] struct; it contains the encoded logical plan. //! - This key is mainly used in constructing the view in Datanode and Frontend. //! +//! 12. Kafka topic key: `__topic_name/kafka/{topic_name}` +//! - The key is used to mark existing topics in kafka for WAL. +//! //! All keys have related managers. The managers take care of the serialization and deserialization //! of keys and values, and the interaction with the underlying KV store backend. //! @@ -100,6 +104,7 @@ pub mod table_route; #[cfg(any(test, feature = "testing"))] pub mod test_utils; mod tombstone; +pub mod topic_name; pub(crate) mod txn_helper; pub mod view_info; @@ -158,6 +163,9 @@ pub const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name"; pub const SCHEMA_NAME_KEY_PREFIX: &str = "__schema_name"; pub const TABLE_ROUTE_PREFIX: &str = "__table_route"; pub const NODE_ADDRESS_PREFIX: &str = "__node_address"; +pub const KAFKA_TOPIC_KEY_PREFIX: &str = "__topic_name/kafka"; +// The legacy topic key prefix is used to store the topic name in previous versions. +pub const LEGACY_TOPIC_KEY_PREFIX: &str = "__created_wal_topics/kafka"; /// The keys with these prefixes will be loaded into the cache when the leader starts. pub const CACHE_KEY_PREFIXES: [&str; 5] = [ @@ -223,6 +231,11 @@ lazy_static! { Regex::new(&format!("^{NODE_ADDRESS_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap(); } +lazy_static! { + pub static ref KAFKA_TOPIC_KEY_PATTERN: Regex = + Regex::new(&format!("^{KAFKA_TOPIC_KEY_PREFIX}/(.*)$")).unwrap(); +} + /// The key of metadata. pub trait MetadataKey<'a, T> { fn to_bytes(&self) -> Vec; diff --git a/src/common/meta/src/key/topic_name.rs b/src/common/meta/src/key/topic_name.rs new file mode 100644 index 0000000000..533762d999 --- /dev/null +++ b/src/common/meta/src/key/topic_name.rs @@ -0,0 +1,218 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::{self, Display}; +use std::sync::Arc; + +use serde::{Deserialize, Serialize}; +use snafu::{OptionExt, ResultExt}; + +use crate::error::{DecodeJsonSnafu, Error, InvalidMetadataSnafu, Result}; +use crate::key::{ + MetadataKey, KAFKA_TOPIC_KEY_PATTERN, KAFKA_TOPIC_KEY_PREFIX, LEGACY_TOPIC_KEY_PREFIX, +}; +use crate::kv_backend::memory::MemoryKvBackend; +use crate::kv_backend::txn::{Txn, TxnOp}; +use crate::kv_backend::KvBackendRef; +use crate::rpc::store::{BatchPutRequest, RangeRequest}; +use crate::rpc::KeyValue; + +#[derive(Debug, Clone, PartialEq)] +pub struct TopicNameKey<'a> { + pub topic: &'a str, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct TopicNameValue; + +impl<'a> TopicNameKey<'a> { + pub fn new(topic: &'a str) -> Self { + Self { topic } + } + + pub fn gen_with_id_and_prefix(id: usize, prefix: &'a str) -> String { + format!("{}_{}", prefix, id) + } + + pub fn range_start_key() -> String { + KAFKA_TOPIC_KEY_PREFIX.to_string() + } +} + +impl<'a> MetadataKey<'a, TopicNameKey<'a>> for TopicNameKey<'_> { + fn to_bytes(&self) -> Vec { + self.to_string().into_bytes() + } + + fn from_bytes(bytes: &'a [u8]) -> Result> { + let key = std::str::from_utf8(bytes).map_err(|e| { + InvalidMetadataSnafu { + err_msg: format!( + "TopicNameKey '{}' is not a valid UTF8 string: {e}", + String::from_utf8_lossy(bytes) + ), + } + .build() + })?; + TopicNameKey::try_from(key) + } +} + +impl Display for TopicNameKey<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}/{}", KAFKA_TOPIC_KEY_PREFIX, self.topic) + } +} + +impl<'a> TryFrom<&'a str> for TopicNameKey<'a> { + type Error = Error; + + fn try_from(value: &'a str) -> Result> { + let captures = KAFKA_TOPIC_KEY_PATTERN + .captures(value) + .context(InvalidMetadataSnafu { + err_msg: format!("Invalid topic name key: {}", value), + })?; + + // Safety: pass the regex check above + Ok(TopicNameKey { + topic: captures.get(1).unwrap().as_str(), + }) + } +} + +/// Convert a key-value pair to a topic name. +fn topic_decoder(kv: &KeyValue) -> Result { + let key = TopicNameKey::from_bytes(&kv.key)?; + Ok(key.topic.to_string()) +} + +pub struct TopicNameManager { + kv_backend: KvBackendRef, +} + +impl Default for TopicNameManager { + fn default() -> Self { + Self::new(Arc::new(MemoryKvBackend::default())) + } +} + +impl TopicNameManager { + pub fn new(kv_backend: KvBackendRef) -> Self { + Self { kv_backend } + } + + /// Update the topics in legacy format to the new format. + pub async fn update_legacy_topics(&self) -> Result<()> { + if let Some(kv) = self + .kv_backend + .get(LEGACY_TOPIC_KEY_PREFIX.as_bytes()) + .await? + { + let topics = + serde_json::from_slice::>(&kv.value).context(DecodeJsonSnafu)?; + let mut reqs = topics + .iter() + .map(|topic| { + let key = TopicNameKey::new(topic); + TxnOp::Put(key.to_bytes(), vec![]) + }) + .collect::>(); + let delete_req = TxnOp::Delete(LEGACY_TOPIC_KEY_PREFIX.as_bytes().to_vec()); + reqs.push(delete_req); + let txn = Txn::new().and_then(reqs); + self.kv_backend.txn(txn).await?; + } + Ok(()) + } + + /// Range query for topics. + /// Caution: this method returns keys as String instead of values of range query since the topics are stoired in keys. + pub async fn range(&self) -> Result> { + let prefix = TopicNameKey::range_start_key(); + let raw_prefix = prefix.as_bytes(); + let req = RangeRequest::new().with_prefix(raw_prefix); + let resp = self.kv_backend.range(req).await?; + resp.kvs + .iter() + .map(topic_decoder) + .collect::>>() + } + + /// Put topics into kvbackend. + pub async fn batch_put(&self, topic_name_keys: Vec>) -> Result<()> { + let req = BatchPutRequest { + kvs: topic_name_keys + .iter() + .map(|key| KeyValue { + key: key.to_bytes(), + value: vec![], + }) + .collect(), + prev_kv: false, + }; + self.kv_backend.batch_put(req).await?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::*; + use crate::kv_backend::memory::MemoryKvBackend; + use crate::kv_backend::KvBackend; + use crate::rpc::store::PutRequest; + + #[tokio::test] + async fn test_topic_name_key_manager() { + let kv_backend = Arc::new(MemoryKvBackend::default()); + let manager = TopicNameManager::new(kv_backend.clone()); + + let mut all_topics = (0..16) + .map(|i| format!("{}/{}", KAFKA_TOPIC_KEY_PREFIX, i)) + .collect::>(); + all_topics.sort(); + let topic_name_keys = all_topics + .iter() + .map(|topic| TopicNameKey::new(topic)) + .collect::>(); + + manager.batch_put(topic_name_keys.clone()).await.unwrap(); + + let topics = manager.range().await.unwrap(); + assert_eq!(topics, all_topics); + + kv_backend + .put(PutRequest { + key: LEGACY_TOPIC_KEY_PREFIX.as_bytes().to_vec(), + value: serde_json::to_vec(&all_topics).unwrap(), + prev_kv: false, + }) + .await + .unwrap(); + manager.update_legacy_topics().await.unwrap(); + let res = kv_backend + .get(LEGACY_TOPIC_KEY_PREFIX.as_bytes()) + .await + .unwrap(); + assert!(res.is_none()); + let topics = manager.range().await.unwrap(); + assert_eq!(topics, all_topics); + + let topics = manager.range().await.unwrap(); + assert_eq!(topics, all_topics); + } +} diff --git a/src/common/meta/src/wal_options_allocator.rs b/src/common/meta/src/wal_options_allocator.rs index 283f43b9a8..4f50c88bfc 100644 --- a/src/common/meta/src/wal_options_allocator.rs +++ b/src/common/meta/src/wal_options_allocator.rs @@ -12,7 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod kafka; +mod selector; +mod topic_creator; +mod topic_manager; +mod topic_pool; use std::collections::HashMap; use std::sync::Arc; @@ -26,35 +29,26 @@ use store_api::storage::{RegionId, RegionNumber}; use crate::error::{EncodeWalOptionsSnafu, Result}; use crate::kv_backend::KvBackendRef; use crate::leadership_notifier::LeadershipChangeListener; -use crate::wal_options_allocator::kafka::topic_manager::TopicManager as KafkaTopicManager; +use crate::wal_options_allocator::topic_creator::build_kafka_topic_creator; +use crate::wal_options_allocator::topic_pool::KafkaTopicPool; /// Allocates wal options in region granularity. #[derive(Default)] pub enum WalOptionsAllocator { #[default] RaftEngine, - Kafka(KafkaTopicManager), + Kafka(KafkaTopicPool), } /// Arc wrapper of WalOptionsAllocator. pub type WalOptionsAllocatorRef = Arc; impl WalOptionsAllocator { - /// Creates a WalOptionsAllocator. - pub fn new(config: MetasrvWalConfig, kv_backend: KvBackendRef) -> Self { - match config { - MetasrvWalConfig::RaftEngine => Self::RaftEngine, - MetasrvWalConfig::Kafka(kafka_config) => { - Self::Kafka(KafkaTopicManager::new(kafka_config, kv_backend)) - } - } - } - /// Tries to start the allocator. pub async fn start(&self) -> Result<()> { match self { Self::RaftEngine => Ok(()), - Self::Kafka(kafka_topic_manager) => kafka_topic_manager.start().await, + Self::Kafka(kafka_topic_manager) => kafka_topic_manager.activate().await, } } @@ -111,6 +105,21 @@ impl LeadershipChangeListener for WalOptionsAllocator { } } +/// Builds a wal options allocator based on the given configuration. +pub async fn build_wal_options_allocator( + config: &MetasrvWalConfig, + kv_backend: KvBackendRef, +) -> Result { + match config { + MetasrvWalConfig::RaftEngine => Ok(WalOptionsAllocator::RaftEngine), + MetasrvWalConfig::Kafka(kafka_config) => { + let topic_creator = build_kafka_topic_creator(kafka_config).await?; + let topic_pool = KafkaTopicPool::new(kafka_config, kv_backend, topic_creator); + Ok(WalOptionsAllocator::Kafka(topic_pool)) + } + } +} + /// Allocates a wal options for each region. The allocated wal options is encoded immediately. pub fn allocate_region_wal_options( regions: Vec, @@ -146,14 +155,15 @@ mod tests { use super::*; use crate::kv_backend::memory::MemoryKvBackend; - use crate::wal_options_allocator::kafka::topic_selector::RoundRobinTopicSelector; // Tests that the wal options allocator could successfully allocate raft-engine wal options. #[tokio::test] async fn test_allocator_with_raft_engine() { let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; let wal_config = MetasrvWalConfig::RaftEngine; - let allocator = WalOptionsAllocator::new(wal_config, kv_backend); + let allocator = build_wal_options_allocator(&wal_config, kv_backend) + .await + .unwrap(); allocator.start().await.unwrap(); let num_regions = 32; @@ -191,14 +201,13 @@ mod tests { ..Default::default() }; let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; - let mut topic_manager = KafkaTopicManager::new(config.clone(), kv_backend); - // Replaces the default topic pool with the constructed topics. - topic_manager.topic_pool.clone_from(&topics); - // Replaces the default selector with a round-robin selector without shuffled. - topic_manager.topic_selector = Arc::new(RoundRobinTopicSelector::default()); + let topic_creator = build_kafka_topic_creator(&config).await.unwrap(); + let mut topic_pool = KafkaTopicPool::new(&config, kv_backend, topic_creator); + topic_pool.topics.clone_from(&topics); + topic_pool.selector = Arc::new(selector::RoundRobinTopicSelector::default()); // Creates an options allocator. - let allocator = WalOptionsAllocator::Kafka(topic_manager); + let allocator = WalOptionsAllocator::Kafka(topic_pool); allocator.start().await.unwrap(); let num_regions = 32; diff --git a/src/common/meta/src/wal_options_allocator/kafka.rs b/src/common/meta/src/wal_options_allocator/kafka.rs deleted file mode 100644 index cc454a3c7d..0000000000 --- a/src/common/meta/src/wal_options_allocator/kafka.rs +++ /dev/null @@ -1,16 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -pub mod topic_manager; -pub mod topic_selector; diff --git a/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs b/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs deleted file mode 100644 index 3f1ffb4c45..0000000000 --- a/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs +++ /dev/null @@ -1,350 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashSet; -use std::sync::Arc; - -use common_telemetry::{error, info}; -use common_wal::config::kafka::MetasrvKafkaConfig; -use common_wal::TopicSelectorType; -use rskafka::client::controller::ControllerClient; -use rskafka::client::error::Error as RsKafkaError; -use rskafka::client::error::ProtocolError::TopicAlreadyExists; -use rskafka::client::partition::{Compression, UnknownTopicHandling}; -use rskafka::client::{Client, ClientBuilder}; -use rskafka::record::Record; -use rskafka::BackoffConfig; -use snafu::{ensure, ResultExt}; - -use crate::error::{ - BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, BuildKafkaPartitionClientSnafu, - CreateKafkaWalTopicSnafu, DecodeJsonSnafu, EncodeJsonSnafu, InvalidNumTopicsSnafu, - ProduceRecordSnafu, ResolveKafkaEndpointSnafu, Result, TlsConfigSnafu, -}; -use crate::kv_backend::KvBackendRef; -use crate::rpc::store::PutRequest; -use crate::wal_options_allocator::kafka::topic_selector::{ - RoundRobinTopicSelector, TopicSelectorRef, -}; - -const CREATED_TOPICS_KEY: &str = "__created_wal_topics/kafka/"; - -// Each topic only has one partition for now. -// The `DEFAULT_PARTITION` refers to the index of the partition. -const DEFAULT_PARTITION: i32 = 0; - -/// Manages topic initialization and selection. -pub struct TopicManager { - config: MetasrvKafkaConfig, - pub(crate) topic_pool: Vec, - pub(crate) topic_selector: TopicSelectorRef, - kv_backend: KvBackendRef, -} - -impl TopicManager { - /// Creates a new topic manager. - pub fn new(config: MetasrvKafkaConfig, kv_backend: KvBackendRef) -> Self { - // Topics should be created. - let topics = (0..config.kafka_topic.num_topics) - .map(|topic_id| format!("{}_{topic_id}", config.kafka_topic.topic_name_prefix)) - .collect::>(); - - let selector = match config.kafka_topic.selector_type { - TopicSelectorType::RoundRobin => RoundRobinTopicSelector::with_shuffle(), - }; - - Self { - config, - topic_pool: topics, - topic_selector: Arc::new(selector), - kv_backend, - } - } - - /// Tries to initialize the topic manager. - /// The initializer first tries to restore persisted topics from the kv backend. - /// If not enough topics retrieved, the initializer will try to contact the Kafka cluster and request creating more topics. - pub async fn start(&self) -> Result<()> { - // Skip creating topics. - if !self.config.auto_create_topics { - return Ok(()); - } - let num_topics = self.config.kafka_topic.num_topics; - ensure!(num_topics > 0, InvalidNumTopicsSnafu { num_topics }); - - // Topics should be created. - let topics = &self.topic_pool; - - // Topics already created. - // There may have extra topics created but it's okay since those topics won't break topic allocation. - let created_topics = Self::restore_created_topics(&self.kv_backend) - .await? - .into_iter() - .collect::>(); - - // Creates missing topics. - let to_be_created = topics - .iter() - .enumerate() - .filter_map(|(i, topic)| { - if created_topics.contains(topic) { - return None; - } - Some(i) - }) - .collect::>(); - - if !to_be_created.is_empty() { - self.try_create_topics(topics, &to_be_created).await?; - Self::persist_created_topics(topics, &self.kv_backend).await?; - } - Ok(()) - } - - /// Tries to create topics specified by indexes in `to_be_created`. - async fn try_create_topics(&self, topics: &[String], to_be_created: &[usize]) -> Result<()> { - // Builds an kafka controller client for creating topics. - let backoff_config = BackoffConfig { - init_backoff: self.config.backoff.init, - max_backoff: self.config.backoff.max, - base: self.config.backoff.base as f64, - deadline: self.config.backoff.deadline, - }; - let broker_endpoints = - common_wal::resolve_to_ipv4(&self.config.connection.broker_endpoints) - .await - .context(ResolveKafkaEndpointSnafu)?; - let mut builder = ClientBuilder::new(broker_endpoints).backoff_config(backoff_config); - if let Some(sasl) = &self.config.connection.sasl { - builder = builder.sasl_config(sasl.config.clone().into_sasl_config()); - }; - if let Some(tls) = &self.config.connection.tls { - builder = builder.tls_config(tls.to_tls_config().await.context(TlsConfigSnafu)?) - }; - let client = builder - .build() - .await - .with_context(|_| BuildKafkaClientSnafu { - broker_endpoints: self.config.connection.broker_endpoints.clone(), - })?; - - let control_client = client - .controller_client() - .context(BuildKafkaCtrlClientSnafu)?; - - // Try to create missing topics. - let tasks = to_be_created - .iter() - .map(|i| async { - self.try_create_topic(&topics[*i], &control_client).await?; - self.try_append_noop_record(&topics[*i], &client).await?; - Ok(()) - }) - .collect::>(); - futures::future::try_join_all(tasks).await.map(|_| ()) - } - - /// Selects one topic from the topic pool through the topic selector. - pub fn select(&self) -> Result<&String> { - self.topic_selector.select(&self.topic_pool) - } - - /// Selects a batch of topics from the topic pool through the topic selector. - pub fn select_batch(&self, num_topics: usize) -> Result> { - (0..num_topics) - .map(|_| self.topic_selector.select(&self.topic_pool)) - .collect() - } - - async fn try_append_noop_record(&self, topic: &String, client: &Client) -> Result<()> { - let partition_client = client - .partition_client(topic, DEFAULT_PARTITION, UnknownTopicHandling::Retry) - .await - .context(BuildKafkaPartitionClientSnafu { - topic, - partition: DEFAULT_PARTITION, - })?; - - partition_client - .produce( - vec![Record { - key: None, - value: None, - timestamp: chrono::Utc::now(), - headers: Default::default(), - }], - Compression::Lz4, - ) - .await - .context(ProduceRecordSnafu { topic })?; - - Ok(()) - } - - async fn try_create_topic(&self, topic: &String, client: &ControllerClient) -> Result<()> { - match client - .create_topic( - topic.clone(), - self.config.kafka_topic.num_partitions, - self.config.kafka_topic.replication_factor, - self.config.kafka_topic.create_topic_timeout.as_millis() as i32, - ) - .await - { - Ok(_) => { - info!("Successfully created topic {}", topic); - Ok(()) - } - Err(e) => { - if Self::is_topic_already_exist_err(&e) { - info!("The topic {} already exists", topic); - Ok(()) - } else { - error!("Failed to create a topic {}, error {:?}", topic, e); - Err(e).context(CreateKafkaWalTopicSnafu) - } - } - } - } - - async fn restore_created_topics(kv_backend: &KvBackendRef) -> Result> { - kv_backend - .get(CREATED_TOPICS_KEY.as_bytes()) - .await? - .map_or_else( - || Ok(vec![]), - |key_value| serde_json::from_slice(&key_value.value).context(DecodeJsonSnafu), - ) - } - - async fn persist_created_topics(topics: &[String], kv_backend: &KvBackendRef) -> Result<()> { - let raw_topics = serde_json::to_vec(topics).context(EncodeJsonSnafu)?; - kv_backend - .put(PutRequest { - key: CREATED_TOPICS_KEY.as_bytes().to_vec(), - value: raw_topics, - prev_kv: false, - }) - .await - .map(|_| ()) - } - - fn is_topic_already_exist_err(e: &RsKafkaError) -> bool { - matches!( - e, - &RsKafkaError::ServerError { - protocol_error: TopicAlreadyExists, - .. - } - ) - } -} - -#[cfg(test)] -mod tests { - use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig}; - use common_wal::test_util::run_test_with_kafka_wal; - - use super::*; - use crate::kv_backend::memory::MemoryKvBackend; - - // Tests that topics can be successfully persisted into the kv backend and can be successfully restored from the kv backend. - #[tokio::test] - async fn test_restore_persisted_topics() { - let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; - let topic_name_prefix = "greptimedb_wal_topic"; - let num_topics = 16; - - // Constructs mock topics. - let topics = (0..num_topics) - .map(|topic| format!("{topic_name_prefix}{topic}")) - .collect::>(); - - // Persists topics to kv backend. - TopicManager::persist_created_topics(&topics, &kv_backend) - .await - .unwrap(); - - // Restores topics from kv backend. - let restored_topics = TopicManager::restore_created_topics(&kv_backend) - .await - .unwrap(); - - assert_eq!(topics, restored_topics); - } - - /// Tests that the topic manager could allocate topics correctly. - #[tokio::test] - async fn test_alloc_topics() { - run_test_with_kafka_wal(|broker_endpoints| { - Box::pin(async { - // Constructs topics that should be created. - let topics = (0..256) - .map(|i| format!("test_alloc_topics_{}_{}", i, uuid::Uuid::new_v4())) - .collect::>(); - - // Creates a topic manager. - let kafka_topic = KafkaTopicConfig { - replication_factor: broker_endpoints.len() as i16, - ..Default::default() - }; - let config = MetasrvKafkaConfig { - connection: KafkaConnectionConfig { - broker_endpoints, - ..Default::default() - }, - kafka_topic, - ..Default::default() - }; - let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; - let mut manager = TopicManager::new(config.clone(), kv_backend); - // Replaces the default topic pool with the constructed topics. - manager.topic_pool.clone_from(&topics); - // Replaces the default selector with a round-robin selector without shuffled. - manager.topic_selector = Arc::new(RoundRobinTopicSelector::default()); - manager.start().await.unwrap(); - - // Selects exactly the number of `num_topics` topics one by one. - let got = (0..topics.len()) - .map(|_| manager.select().unwrap()) - .cloned() - .collect::>(); - assert_eq!(got, topics); - - // Selects exactly the number of `num_topics` topics in a batching manner. - let got = manager - .select_batch(topics.len()) - .unwrap() - .into_iter() - .map(ToString::to_string) - .collect::>(); - assert_eq!(got, topics); - - // Selects more than the number of `num_topics` topics. - let got = manager - .select_batch(2 * topics.len()) - .unwrap() - .into_iter() - .map(ToString::to_string) - .collect::>(); - let expected = vec![topics.clone(); 2] - .into_iter() - .flatten() - .collect::>(); - assert_eq!(got, expected); - }) - }) - .await; - } -} diff --git a/src/common/meta/src/wal_options_allocator/kafka/topic_selector.rs b/src/common/meta/src/wal_options_allocator/selector.rs similarity index 100% rename from src/common/meta/src/wal_options_allocator/kafka/topic_selector.rs rename to src/common/meta/src/wal_options_allocator/selector.rs diff --git a/src/common/meta/src/wal_options_allocator/topic_creator.rs b/src/common/meta/src/wal_options_allocator/topic_creator.rs new file mode 100644 index 0000000000..a9d655c737 --- /dev/null +++ b/src/common/meta/src/wal_options_allocator/topic_creator.rs @@ -0,0 +1,159 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_telemetry::{error, info}; +use common_wal::config::kafka::MetasrvKafkaConfig; +use rskafka::client::error::Error as RsKafkaError; +use rskafka::client::error::ProtocolError::TopicAlreadyExists; +use rskafka::client::partition::{Compression, UnknownTopicHandling}; +use rskafka::client::{Client, ClientBuilder}; +use rskafka::record::Record; +use rskafka::BackoffConfig; +use snafu::ResultExt; + +use crate::error::{ + BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, BuildKafkaPartitionClientSnafu, + CreateKafkaWalTopicSnafu, ProduceRecordSnafu, ResolveKafkaEndpointSnafu, Result, + TlsConfigSnafu, +}; + +// Each topic only has one partition for now. +// The `DEFAULT_PARTITION` refers to the index of the partition. +const DEFAULT_PARTITION: i32 = 0; + +/// Creates topics in kafka. +pub struct KafkaTopicCreator { + client: Client, + /// The number of partitions per topic. + num_partitions: i32, + /// The replication factor of each topic. + replication_factor: i16, + /// The timeout of topic creation in milliseconds. + create_topic_timeout: i32, +} + +impl KafkaTopicCreator { + async fn create_topic(&self, topic: &String, client: &Client) -> Result<()> { + let controller = client + .controller_client() + .context(BuildKafkaCtrlClientSnafu)?; + match controller + .create_topic( + topic, + self.num_partitions, + self.replication_factor, + self.create_topic_timeout, + ) + .await + { + Ok(_) => { + info!("Successfully created topic {}", topic); + Ok(()) + } + Err(e) => { + if Self::is_topic_already_exist_err(&e) { + info!("The topic {} already exists", topic); + Ok(()) + } else { + error!("Failed to create a topic {}, error {:?}", topic, e); + Err(e).context(CreateKafkaWalTopicSnafu) + } + } + } + } + + async fn append_noop_record(&self, topic: &String, client: &Client) -> Result<()> { + let partition_client = client + .partition_client(topic, DEFAULT_PARTITION, UnknownTopicHandling::Retry) + .await + .context(BuildKafkaPartitionClientSnafu { + topic, + partition: DEFAULT_PARTITION, + })?; + + partition_client + .produce( + vec![Record { + key: None, + value: None, + timestamp: chrono::Utc::now(), + headers: Default::default(), + }], + Compression::Lz4, + ) + .await + .context(ProduceRecordSnafu { topic })?; + + Ok(()) + } + + /// Prepares topics in Kafka. + /// 1. Creates missing topics. + /// 2. Appends a noop record to each topic. + pub async fn prepare_topics(&self, topics: &[&String]) -> Result<()> { + // Try to create missing topics. + let tasks = topics + .iter() + .map(|topic| async { + self.create_topic(topic, &self.client).await?; + self.append_noop_record(topic, &self.client).await?; + Ok(()) + }) + .collect::>(); + futures::future::try_join_all(tasks).await.map(|_| ()) + } + + fn is_topic_already_exist_err(e: &RsKafkaError) -> bool { + matches!( + e, + &RsKafkaError::ServerError { + protocol_error: TopicAlreadyExists, + .. + } + ) + } +} + +pub async fn build_kafka_topic_creator(config: &MetasrvKafkaConfig) -> Result { + // Builds an kafka controller client for creating topics. + let backoff_config = BackoffConfig { + init_backoff: config.backoff.init, + max_backoff: config.backoff.max, + base: config.backoff.base as f64, + deadline: config.backoff.deadline, + }; + let broker_endpoints = common_wal::resolve_to_ipv4(&config.connection.broker_endpoints) + .await + .context(ResolveKafkaEndpointSnafu)?; + let mut builder = ClientBuilder::new(broker_endpoints).backoff_config(backoff_config); + if let Some(sasl) = &config.connection.sasl { + builder = builder.sasl_config(sasl.config.clone().into_sasl_config()); + }; + if let Some(tls) = &config.connection.tls { + builder = builder.tls_config(tls.to_tls_config().await.context(TlsConfigSnafu)?) + }; + let client = builder + .build() + .await + .with_context(|_| BuildKafkaClientSnafu { + broker_endpoints: config.connection.broker_endpoints.clone(), + })?; + + Ok(KafkaTopicCreator { + client, + num_partitions: config.kafka_topic.num_partitions, + replication_factor: config.kafka_topic.replication_factor, + create_topic_timeout: config.kafka_topic.create_topic_timeout.as_millis() as i32, + }) +} diff --git a/src/common/meta/src/wal_options_allocator/topic_manager.rs b/src/common/meta/src/wal_options_allocator/topic_manager.rs new file mode 100644 index 0000000000..7b677b4242 --- /dev/null +++ b/src/common/meta/src/wal_options_allocator/topic_manager.rs @@ -0,0 +1,165 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; + +use crate::error::Result; +use crate::key::topic_name::{TopicNameKey, TopicNameManager}; +use crate::kv_backend::KvBackendRef; + +/// Manages topics in kvbackend. +/// Responsible for: +/// 1. Restores and persisting topics in kvbackend. +/// 2. Clears topics in legacy format and restores them in the new format. +pub struct KafkaTopicManager { + key_manager: TopicNameManager, +} + +impl KafkaTopicManager { + pub fn new(kv_backend: KvBackendRef) -> Self { + Self { + key_manager: TopicNameManager::new(kv_backend), + } + } + + async fn restore_topics(&self) -> Result> { + self.key_manager.update_legacy_topics().await?; + let topics = self.key_manager.range().await?; + Ok(topics) + } + + /// Restores topics from the key-value backend. and returns the topics that are not stored in kvbackend. + pub async fn get_topics_to_create<'a>( + &self, + all_topics: &'a [String], + ) -> Result> { + let existing_topics = self.restore_topics().await?; + let existing_topic_set = existing_topics.iter().collect::>(); + let mut topics_to_create = Vec::with_capacity(all_topics.len()); + for topic in all_topics { + if !existing_topic_set.contains(topic) { + topics_to_create.push(topic); + } + } + Ok(topics_to_create) + } + + /// Persists topics into the key-value backend. + pub async fn persist_topics(&self, topics: &[String]) -> Result<()> { + self.key_manager + .batch_put( + topics + .iter() + .map(|topic| TopicNameKey::new(topic)) + .collect(), + ) + .await?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::*; + use crate::key::LEGACY_TOPIC_KEY_PREFIX; + use crate::kv_backend::memory::MemoryKvBackend; + use crate::rpc::store::PutRequest; + + #[tokio::test] + async fn test_restore_legacy_persisted_topics() { + let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; + let topic_kvbackend_manager = KafkaTopicManager::new(kv_backend.clone()); + + let all_topics = (0..16) + .map(|i| format!("greptimedb_wal_topic_{}", i)) + .collect::>(); + + // No legacy topics. + let mut topics_to_be_created = topic_kvbackend_manager + .get_topics_to_create(&all_topics) + .await + .unwrap(); + topics_to_be_created.sort(); + let mut expected = all_topics.iter().collect::>(); + expected.sort(); + assert_eq!(expected, topics_to_be_created); + + // A topic pool with 16 topics stored in kvbackend in legacy format. + let topics = "[\"greptimedb_wal_topic_0\",\"greptimedb_wal_topic_1\",\"greptimedb_wal_topic_2\",\"greptimedb_wal_topic_3\",\"greptimedb_wal_topic_4\",\"greptimedb_wal_topic_5\",\"greptimedb_wal_topic_6\",\"greptimedb_wal_topic_7\",\"greptimedb_wal_topic_8\",\"greptimedb_wal_topic_9\",\"greptimedb_wal_topic_10\",\"greptimedb_wal_topic_11\",\"greptimedb_wal_topic_12\",\"greptimedb_wal_topic_13\",\"greptimedb_wal_topic_14\",\"greptimedb_wal_topic_15\"]"; + let put_req = PutRequest { + key: LEGACY_TOPIC_KEY_PREFIX.as_bytes().to_vec(), + value: topics.as_bytes().to_vec(), + prev_kv: true, + }; + let res = kv_backend.put(put_req).await.unwrap(); + assert!(res.prev_kv.is_none()); + + let topics_to_be_created = topic_kvbackend_manager + .get_topics_to_create(&all_topics) + .await + .unwrap(); + assert!(topics_to_be_created.is_empty()); + + // Legacy topics should be deleted after restoring. + let legacy_topics = kv_backend + .get(LEGACY_TOPIC_KEY_PREFIX.as_bytes()) + .await + .unwrap(); + assert!(legacy_topics.is_none()); + + // Then we can restore it from the new format. + let mut restored_topics = topic_kvbackend_manager.restore_topics().await.unwrap(); + restored_topics.sort(); + let mut expected = all_topics.clone(); + expected.sort(); + assert_eq!(expected, restored_topics); + } + + // Tests that topics can be successfully persisted into the kv backend and can be successfully restored from the kv backend. + #[tokio::test] + async fn test_restore_persisted_topics() { + let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; + let topic_name_prefix = "greptimedb_wal_topic"; + let num_topics = 16; + + let all_topics = (0..num_topics) + .map(|i| format!("{}_{}", topic_name_prefix, i)) + .collect::>(); + + // Constructs mock topics. + let topic_kvbackend_manager = KafkaTopicManager::new(kv_backend); + + let mut topics_to_be_created = topic_kvbackend_manager + .get_topics_to_create(&all_topics) + .await + .unwrap(); + topics_to_be_created.sort(); + let mut expected = all_topics.iter().collect::>(); + expected.sort(); + assert_eq!(expected, topics_to_be_created); + + // Persists topics to kv backend. + topic_kvbackend_manager + .persist_topics(&all_topics) + .await + .unwrap(); + let topics_to_be_created = topic_kvbackend_manager + .get_topics_to_create(&all_topics) + .await + .unwrap(); + assert!(topics_to_be_created.is_empty()); + } +} diff --git a/src/common/meta/src/wal_options_allocator/topic_pool.rs b/src/common/meta/src/wal_options_allocator/topic_pool.rs new file mode 100644 index 0000000000..aac0fb90af --- /dev/null +++ b/src/common/meta/src/wal_options_allocator/topic_pool.rs @@ -0,0 +1,180 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_wal::config::kafka::MetasrvKafkaConfig; +use common_wal::TopicSelectorType; +use snafu::ensure; + +use crate::error::{InvalidNumTopicsSnafu, Result}; +use crate::kv_backend::KvBackendRef; +use crate::wal_options_allocator::selector::{RoundRobinTopicSelector, TopicSelectorRef}; +use crate::wal_options_allocator::topic_creator::KafkaTopicCreator; +use crate::wal_options_allocator::topic_manager::KafkaTopicManager; + +/// Topic pool for kafka remote wal. +/// Responsible for: +/// 1. Persists topics in kvbackend. +/// 2. Creates topics in kafka. +/// 3. Selects topics for regions. +pub struct KafkaTopicPool { + pub(crate) topics: Vec, + // Manages topics in kvbackend. + topic_manager: KafkaTopicManager, + // Creates topics in kafka. + topic_creator: KafkaTopicCreator, + pub(crate) selector: TopicSelectorRef, + auto_create_topics: bool, +} + +impl KafkaTopicPool { + pub fn new( + config: &MetasrvKafkaConfig, + kvbackend: KvBackendRef, + topic_creator: KafkaTopicCreator, + ) -> Self { + let num_topics = config.kafka_topic.num_topics; + let prefix = config.kafka_topic.topic_name_prefix.clone(); + let topics = (0..num_topics) + .map(|i| format!("{}_{}", prefix, i)) + .collect(); + + let selector = match config.kafka_topic.selector_type { + TopicSelectorType::RoundRobin => RoundRobinTopicSelector::with_shuffle(), + }; + + let topic_manager = KafkaTopicManager::new(kvbackend); + + Self { + topics, + topic_manager, + topic_creator, + selector: Arc::new(selector), + auto_create_topics: config.auto_create_topics, + } + } + + /// Tries to activate the topic manager when metasrv becomes the leader. + /// First tries to restore persisted topics from the kv backend. + /// If not enough topics retrieved, it will try to contact the Kafka cluster and request creating more topics. + pub async fn activate(&self) -> Result<()> { + if !self.auto_create_topics { + return Ok(()); + } + + let num_topics = self.topics.len(); + ensure!(num_topics > 0, InvalidNumTopicsSnafu { num_topics }); + + let topics_to_be_created = self + .topic_manager + .get_topics_to_create(&self.topics) + .await?; + + if !topics_to_be_created.is_empty() { + self.topic_creator + .prepare_topics(&topics_to_be_created) + .await?; + self.topic_manager.persist_topics(&self.topics).await?; + } + Ok(()) + } + + /// Selects one topic from the topic pool through the topic selector. + pub fn select(&self) -> Result<&String> { + self.selector.select(&self.topics) + } + + /// Selects a batch of topics from the topic pool through the topic selector. + pub fn select_batch(&self, num_topics: usize) -> Result> { + (0..num_topics) + .map(|_| self.selector.select(&self.topics)) + .collect() + } +} + +#[cfg(test)] +mod tests { + use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig}; + use common_wal::test_util::run_test_with_kafka_wal; + + use super::*; + use crate::kv_backend::memory::MemoryKvBackend; + use crate::wal_options_allocator::topic_creator::build_kafka_topic_creator; + + /// Tests that the topic manager could allocate topics correctly. + #[tokio::test] + async fn test_alloc_topics() { + run_test_with_kafka_wal(|broker_endpoints| { + Box::pin(async { + // Constructs topics that should be created. + let topics = (0..256) + .map(|i| format!("test_alloc_topics_{}_{}", i, uuid::Uuid::new_v4())) + .collect::>(); + + // Creates a topic manager. + let kafka_topic = KafkaTopicConfig { + replication_factor: broker_endpoints.len() as i16, + ..Default::default() + }; + let config = MetasrvKafkaConfig { + connection: KafkaConnectionConfig { + broker_endpoints, + ..Default::default() + }, + kafka_topic, + ..Default::default() + }; + let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; + let topic_creator = build_kafka_topic_creator(&config).await.unwrap(); + let mut topic_pool = KafkaTopicPool::new(&config, kv_backend, topic_creator); + // Replaces the default topic pool with the constructed topics. + topic_pool.topics.clone_from(&topics); + // Replaces the default selector with a round-robin selector without shuffled. + topic_pool.selector = Arc::new(RoundRobinTopicSelector::default()); + topic_pool.activate().await.unwrap(); + + // Selects exactly the number of `num_topics` topics one by one. + let got = (0..topics.len()) + .map(|_| topic_pool.select().unwrap()) + .cloned() + .collect::>(); + assert_eq!(got, topics); + + // Selects exactly the number of `num_topics` topics in a batching manner. + let got = topic_pool + .select_batch(topics.len()) + .unwrap() + .into_iter() + .map(ToString::to_string) + .collect::>(); + assert_eq!(got, topics); + + // Selects more than the number of `num_topics` topics. + let got = topic_pool + .select_batch(2 * topics.len()) + .unwrap() + .into_iter() + .map(ToString::to_string) + .collect::>(); + let expected = vec![topics.clone(); 2] + .into_iter() + .flatten() + .collect::>(); + assert_eq!(got, expected); + }) + }) + .await; + } +} diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 25c949c55c..37722c1c58 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -742,6 +742,13 @@ pub enum Error { location: Location, source: common_meta::error::Error, }, + + #[snafu(display("Failed to build wal options allocator"))] + BuildWalOptionsAllocator { + #[snafu(implicit)] + location: Location, + source: common_meta::error::Error, + }, } impl Error { @@ -788,7 +795,8 @@ impl ErrorExt for Error { | Error::PeerUnavailable { .. } | Error::ExceededDeadline { .. } | Error::ChooseItems { .. } - | Error::FlowStateHandler { .. } => StatusCode::Internal, + | Error::FlowStateHandler { .. } + | Error::BuildWalOptionsAllocator { .. } => StatusCode::Internal, Error::Unsupported { .. } => StatusCode::Unsupported, diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 0afaf00493..cd01d14883 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -36,7 +36,7 @@ use common_meta::node_manager::NodeManagerRef; use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::sequence::SequenceBuilder; use common_meta::state_store::KvStateStore; -use common_meta::wal_options_allocator::WalOptionsAllocator; +use common_meta::wal_options_allocator::build_wal_options_allocator; use common_procedure::local::{LocalManager, ManagerConfig}; use common_procedure::ProcedureManagerRef; use snafu::ResultExt; @@ -44,7 +44,7 @@ use snafu::ResultExt; use super::{SelectTarget, FLOW_ID_SEQ}; use crate::cache_invalidator::MetasrvCacheInvalidator; use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef}; -use crate::error::{self, Result}; +use crate::error::{self, BuildWalOptionsAllocatorSnafu, Result}; use crate::flow_meta_alloc::FlowPeerAllocator; use crate::greptimedb_telemetry::get_greptimedb_telemetry_task; use crate::handler::failure_handler::RegionFailureHandler; @@ -208,10 +208,10 @@ impl MetasrvBuilder { table_id: None, }; - let wal_options_allocator = Arc::new(WalOptionsAllocator::new( - options.wal.clone(), - kv_backend.clone(), - )); + let wal_options_allocator = build_wal_options_allocator(&options.wal, kv_backend.clone()) + .await + .context(BuildWalOptionsAllocatorSnafu)?; + let wal_options_allocator = Arc::new(wal_options_allocator); let is_remote_wal = wal_options_allocator.is_remote_wal(); let table_metadata_allocator = table_metadata_allocator.unwrap_or_else(|| { let sequence = Arc::new( diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 0b6a377418..2ad62df3e1 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -35,7 +35,7 @@ use common_meta::key::TableMetadataManager; use common_meta::kv_backend::KvBackendRef; use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::sequence::SequenceBuilder; -use common_meta::wal_options_allocator::WalOptionsAllocator; +use common_meta::wal_options_allocator::build_wal_options_allocator; use common_procedure::options::ProcedureConfig; use common_procedure::ProcedureManagerRef; use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig}; @@ -190,10 +190,11 @@ impl GreptimeDbStandaloneBuilder { .step(10) .build(), ); - let wal_options_allocator = Arc::new(WalOptionsAllocator::new( - opts.wal.clone().into(), - kv_backend.clone(), - )); + let kafka_options = opts.wal.clone().into(); + let wal_options_allocator = build_wal_options_allocator(&kafka_options, kv_backend.clone()) + .await + .unwrap(); + let wal_options_allocator = Arc::new(wal_options_allocator); let table_metadata_allocator = Arc::new(TableMetadataAllocator::new( table_id_sequence, wal_options_allocator.clone(),