refactor: use MetadataKey for kafka topic (#5351)

* refactor: use MetadataKey

* fix: match all prefix

* refactor: introduce TopicPool

* fix: fix test, some rename

* test: add unit test for legacy restore

* fix: add _ between prefix and topic id

* chore: readable legacy topics

* refactor: a refactor

* Apply suggestions from code review

* Apply suggestions from code review

* refactor: introduce TopicPool

* fix: fix unit test

* chore: fix unit test and add some comments

* fix: fix unit test

* refactor: just refactor

* refactor: rename

* chore: rename, comments and remove unnecessary clone
This commit is contained in:
Yohan Wal
2025-01-20 15:38:22 +08:00
committed by GitHub
parent 64ce9d3744
commit 5287d46073
14 changed files with 806 additions and 410 deletions

View File

@@ -345,6 +345,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to build wal options allocator"))]
BuildWalOptionsAllocator {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -378,7 +385,8 @@ impl ErrorExt for Error {
Error::StartProcedureManager { source, .. }
| Error::StopProcedureManager { source, .. } => source.status_code(),
Error::StartWalOptionsAllocator { source, .. } => source.status_code(),
Error::BuildWalOptionsAllocator { source, .. }
| Error::StartWalOptionsAllocator { source, .. } => source.status_code(),
Error::ReplCreation { .. } | Error::Readline { .. } | Error::HttpQuerySql { .. } => {
StatusCode::Internal
}

View File

@@ -43,7 +43,7 @@ use common_meta::node_manager::NodeManagerRef;
use common_meta::peer::Peer;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal_options_allocator::{WalOptionsAllocator, WalOptionsAllocatorRef};
use common_meta::wal_options_allocator::{build_wal_options_allocator, WalOptionsAllocatorRef};
use common_procedure::{ProcedureInfo, ProcedureManagerRef};
use common_telemetry::info;
use common_telemetry::logging::{LoggingOptions, TracingOptions};
@@ -76,10 +76,10 @@ use tokio::sync::{broadcast, RwLock};
use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{
BuildCacheRegistrySnafu, CreateDirSnafu, IllegalConfigSnafu, InitDdlManagerSnafu,
InitMetadataSnafu, InitTimezoneSnafu, LoadLayeredConfigSnafu, OtherSnafu, Result,
ShutdownDatanodeSnafu, ShutdownFlownodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu,
StartFlownodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu,
BuildCacheRegistrySnafu, BuildWalOptionsAllocatorSnafu, CreateDirSnafu, IllegalConfigSnafu,
InitDdlManagerSnafu, InitMetadataSnafu, InitTimezoneSnafu, LoadLayeredConfigSnafu, OtherSnafu,
Result, ShutdownDatanodeSnafu, ShutdownFlownodeSnafu, ShutdownFrontendSnafu,
StartDatanodeSnafu, StartFlownodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu,
StartWalOptionsAllocatorSnafu, StopProcedureManagerSnafu,
};
use crate::options::{GlobalOptions, GreptimeOptions};
@@ -569,10 +569,11 @@ impl StartCommand {
.step(10)
.build(),
);
let wal_options_allocator = Arc::new(WalOptionsAllocator::new(
opts.wal.clone().into(),
kv_backend.clone(),
));
let kafka_options = opts.wal.clone().into();
let wal_options_allocator = build_wal_options_allocator(&kafka_options, kv_backend.clone())
.await
.context(BuildWalOptionsAllocatorSnafu)?;
let wal_options_allocator = Arc::new(wal_options_allocator);
let table_meta_allocator = Arc::new(TableMetadataAllocator::new(
table_id_sequence,
wal_options_allocator.clone(),

View File

@@ -51,10 +51,14 @@
//! 10. Table flow key: `__flow/source_table/{table_id}/{flownode_id}/{flow_id}/{partition_id}`
//! - Mapping source table's {table_id} to {flownode_id}
//! - Used in `Flownode` booting.
//!
//! 11. View info key: `__view_info/{view_id}`
//! - The value is a [ViewInfoValue] struct; it contains the encoded logical plan.
//! - This key is mainly used in constructing the view in Datanode and Frontend.
//!
//! 12. Kafka topic key: `__topic_name/kafka/{topic_name}`
// - The key is used to mark existing topics in kafka for WAL.
//!
//! All keys have related managers. The managers take care of the serialization and deserialization
//! of keys and values, and the interaction with the underlying KV store backend.
//!
@@ -100,6 +104,7 @@ pub mod table_route;
#[cfg(any(test, feature = "testing"))]
pub mod test_utils;
mod tombstone;
pub mod topic_name;
pub(crate) mod txn_helper;
pub mod view_info;
@@ -158,6 +163,9 @@ pub const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name";
pub const SCHEMA_NAME_KEY_PREFIX: &str = "__schema_name";
pub const TABLE_ROUTE_PREFIX: &str = "__table_route";
pub const NODE_ADDRESS_PREFIX: &str = "__node_address";
pub const KAFKA_TOPIC_KEY_PREFIX: &str = "__topic_name/kafka";
// The legacy topic key prefix is used to store the topic name in previous versions.
pub const LEGACY_TOPIC_KEY_PREFIX: &str = "__created_wal_topics/kafka";
/// The keys with these prefixes will be loaded into the cache when the leader starts.
pub const CACHE_KEY_PREFIXES: [&str; 5] = [
@@ -223,6 +231,11 @@ lazy_static! {
Regex::new(&format!("^{NODE_ADDRESS_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
}
lazy_static! {
pub static ref KAFKA_TOPIC_KEY_PATTERN: Regex =
Regex::new(&format!("^{KAFKA_TOPIC_KEY_PREFIX}/(.*)$")).unwrap();
}
/// The key of metadata.
pub trait MetadataKey<'a, T> {
fn to_bytes(&self) -> Vec<u8>;

View File

@@ -0,0 +1,218 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::{self, Display};
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use crate::error::{DecodeJsonSnafu, Error, InvalidMetadataSnafu, Result};
use crate::key::{
MetadataKey, KAFKA_TOPIC_KEY_PATTERN, KAFKA_TOPIC_KEY_PREFIX, LEGACY_TOPIC_KEY_PREFIX,
};
use crate::kv_backend::memory::MemoryKvBackend;
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::{BatchPutRequest, RangeRequest};
use crate::rpc::KeyValue;
#[derive(Debug, Clone, PartialEq)]
pub struct TopicNameKey<'a> {
pub topic: &'a str,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct TopicNameValue;
impl<'a> TopicNameKey<'a> {
pub fn new(topic: &'a str) -> Self {
Self { topic }
}
pub fn gen_with_id_and_prefix(id: usize, prefix: &'a str) -> String {
format!("{}_{}", prefix, id)
}
pub fn range_start_key() -> String {
KAFKA_TOPIC_KEY_PREFIX.to_string()
}
}
impl<'a> MetadataKey<'a, TopicNameKey<'a>> for TopicNameKey<'_> {
fn to_bytes(&self) -> Vec<u8> {
self.to_string().into_bytes()
}
fn from_bytes(bytes: &'a [u8]) -> Result<TopicNameKey<'a>> {
let key = std::str::from_utf8(bytes).map_err(|e| {
InvalidMetadataSnafu {
err_msg: format!(
"TopicNameKey '{}' is not a valid UTF8 string: {e}",
String::from_utf8_lossy(bytes)
),
}
.build()
})?;
TopicNameKey::try_from(key)
}
}
impl Display for TopicNameKey<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}/{}", KAFKA_TOPIC_KEY_PREFIX, self.topic)
}
}
impl<'a> TryFrom<&'a str> for TopicNameKey<'a> {
type Error = Error;
fn try_from(value: &'a str) -> Result<TopicNameKey<'a>> {
let captures = KAFKA_TOPIC_KEY_PATTERN
.captures(value)
.context(InvalidMetadataSnafu {
err_msg: format!("Invalid topic name key: {}", value),
})?;
// Safety: pass the regex check above
Ok(TopicNameKey {
topic: captures.get(1).unwrap().as_str(),
})
}
}
/// Convert a key-value pair to a topic name.
fn topic_decoder(kv: &KeyValue) -> Result<String> {
let key = TopicNameKey::from_bytes(&kv.key)?;
Ok(key.topic.to_string())
}
pub struct TopicNameManager {
kv_backend: KvBackendRef,
}
impl Default for TopicNameManager {
fn default() -> Self {
Self::new(Arc::new(MemoryKvBackend::default()))
}
}
impl TopicNameManager {
pub fn new(kv_backend: KvBackendRef) -> Self {
Self { kv_backend }
}
/// Update the topics in legacy format to the new format.
pub async fn update_legacy_topics(&self) -> Result<()> {
if let Some(kv) = self
.kv_backend
.get(LEGACY_TOPIC_KEY_PREFIX.as_bytes())
.await?
{
let topics =
serde_json::from_slice::<Vec<String>>(&kv.value).context(DecodeJsonSnafu)?;
let mut reqs = topics
.iter()
.map(|topic| {
let key = TopicNameKey::new(topic);
TxnOp::Put(key.to_bytes(), vec![])
})
.collect::<Vec<_>>();
let delete_req = TxnOp::Delete(LEGACY_TOPIC_KEY_PREFIX.as_bytes().to_vec());
reqs.push(delete_req);
let txn = Txn::new().and_then(reqs);
self.kv_backend.txn(txn).await?;
}
Ok(())
}
/// Range query for topics.
/// Caution: this method returns keys as String instead of values of range query since the topics are stoired in keys.
pub async fn range(&self) -> Result<Vec<String>> {
let prefix = TopicNameKey::range_start_key();
let raw_prefix = prefix.as_bytes();
let req = RangeRequest::new().with_prefix(raw_prefix);
let resp = self.kv_backend.range(req).await?;
resp.kvs
.iter()
.map(topic_decoder)
.collect::<Result<Vec<String>>>()
}
/// Put topics into kvbackend.
pub async fn batch_put(&self, topic_name_keys: Vec<TopicNameKey<'_>>) -> Result<()> {
let req = BatchPutRequest {
kvs: topic_name_keys
.iter()
.map(|key| KeyValue {
key: key.to_bytes(),
value: vec![],
})
.collect(),
prev_kv: false,
};
self.kv_backend.batch_put(req).await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::kv_backend::KvBackend;
use crate::rpc::store::PutRequest;
#[tokio::test]
async fn test_topic_name_key_manager() {
let kv_backend = Arc::new(MemoryKvBackend::default());
let manager = TopicNameManager::new(kv_backend.clone());
let mut all_topics = (0..16)
.map(|i| format!("{}/{}", KAFKA_TOPIC_KEY_PREFIX, i))
.collect::<Vec<_>>();
all_topics.sort();
let topic_name_keys = all_topics
.iter()
.map(|topic| TopicNameKey::new(topic))
.collect::<Vec<_>>();
manager.batch_put(topic_name_keys.clone()).await.unwrap();
let topics = manager.range().await.unwrap();
assert_eq!(topics, all_topics);
kv_backend
.put(PutRequest {
key: LEGACY_TOPIC_KEY_PREFIX.as_bytes().to_vec(),
value: serde_json::to_vec(&all_topics).unwrap(),
prev_kv: false,
})
.await
.unwrap();
manager.update_legacy_topics().await.unwrap();
let res = kv_backend
.get(LEGACY_TOPIC_KEY_PREFIX.as_bytes())
.await
.unwrap();
assert!(res.is_none());
let topics = manager.range().await.unwrap();
assert_eq!(topics, all_topics);
let topics = manager.range().await.unwrap();
assert_eq!(topics, all_topics);
}
}

View File

@@ -12,7 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod kafka;
mod selector;
mod topic_creator;
mod topic_manager;
mod topic_pool;
use std::collections::HashMap;
use std::sync::Arc;
@@ -26,35 +29,26 @@ use store_api::storage::{RegionId, RegionNumber};
use crate::error::{EncodeWalOptionsSnafu, Result};
use crate::kv_backend::KvBackendRef;
use crate::leadership_notifier::LeadershipChangeListener;
use crate::wal_options_allocator::kafka::topic_manager::TopicManager as KafkaTopicManager;
use crate::wal_options_allocator::topic_creator::build_kafka_topic_creator;
use crate::wal_options_allocator::topic_pool::KafkaTopicPool;
/// Allocates wal options in region granularity.
#[derive(Default)]
pub enum WalOptionsAllocator {
#[default]
RaftEngine,
Kafka(KafkaTopicManager),
Kafka(KafkaTopicPool),
}
/// Arc wrapper of WalOptionsAllocator.
pub type WalOptionsAllocatorRef = Arc<WalOptionsAllocator>;
impl WalOptionsAllocator {
/// Creates a WalOptionsAllocator.
pub fn new(config: MetasrvWalConfig, kv_backend: KvBackendRef) -> Self {
match config {
MetasrvWalConfig::RaftEngine => Self::RaftEngine,
MetasrvWalConfig::Kafka(kafka_config) => {
Self::Kafka(KafkaTopicManager::new(kafka_config, kv_backend))
}
}
}
/// Tries to start the allocator.
pub async fn start(&self) -> Result<()> {
match self {
Self::RaftEngine => Ok(()),
Self::Kafka(kafka_topic_manager) => kafka_topic_manager.start().await,
Self::Kafka(kafka_topic_manager) => kafka_topic_manager.activate().await,
}
}
@@ -111,6 +105,21 @@ impl LeadershipChangeListener for WalOptionsAllocator {
}
}
/// Builds a wal options allocator based on the given configuration.
pub async fn build_wal_options_allocator(
config: &MetasrvWalConfig,
kv_backend: KvBackendRef,
) -> Result<WalOptionsAllocator> {
match config {
MetasrvWalConfig::RaftEngine => Ok(WalOptionsAllocator::RaftEngine),
MetasrvWalConfig::Kafka(kafka_config) => {
let topic_creator = build_kafka_topic_creator(kafka_config).await?;
let topic_pool = KafkaTopicPool::new(kafka_config, kv_backend, topic_creator);
Ok(WalOptionsAllocator::Kafka(topic_pool))
}
}
}
/// Allocates a wal options for each region. The allocated wal options is encoded immediately.
pub fn allocate_region_wal_options(
regions: Vec<RegionNumber>,
@@ -146,14 +155,15 @@ mod tests {
use super::*;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::wal_options_allocator::kafka::topic_selector::RoundRobinTopicSelector;
// Tests that the wal options allocator could successfully allocate raft-engine wal options.
#[tokio::test]
async fn test_allocator_with_raft_engine() {
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
let wal_config = MetasrvWalConfig::RaftEngine;
let allocator = WalOptionsAllocator::new(wal_config, kv_backend);
let allocator = build_wal_options_allocator(&wal_config, kv_backend)
.await
.unwrap();
allocator.start().await.unwrap();
let num_regions = 32;
@@ -191,14 +201,13 @@ mod tests {
..Default::default()
};
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
let mut topic_manager = KafkaTopicManager::new(config.clone(), kv_backend);
// Replaces the default topic pool with the constructed topics.
topic_manager.topic_pool.clone_from(&topics);
// Replaces the default selector with a round-robin selector without shuffled.
topic_manager.topic_selector = Arc::new(RoundRobinTopicSelector::default());
let topic_creator = build_kafka_topic_creator(&config).await.unwrap();
let mut topic_pool = KafkaTopicPool::new(&config, kv_backend, topic_creator);
topic_pool.topics.clone_from(&topics);
topic_pool.selector = Arc::new(selector::RoundRobinTopicSelector::default());
// Creates an options allocator.
let allocator = WalOptionsAllocator::Kafka(topic_manager);
let allocator = WalOptionsAllocator::Kafka(topic_pool);
allocator.start().await.unwrap();
let num_regions = 32;

View File

@@ -1,16 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod topic_manager;
pub mod topic_selector;

View File

@@ -1,350 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::sync::Arc;
use common_telemetry::{error, info};
use common_wal::config::kafka::MetasrvKafkaConfig;
use common_wal::TopicSelectorType;
use rskafka::client::controller::ControllerClient;
use rskafka::client::error::Error as RsKafkaError;
use rskafka::client::error::ProtocolError::TopicAlreadyExists;
use rskafka::client::partition::{Compression, UnknownTopicHandling};
use rskafka::client::{Client, ClientBuilder};
use rskafka::record::Record;
use rskafka::BackoffConfig;
use snafu::{ensure, ResultExt};
use crate::error::{
BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, BuildKafkaPartitionClientSnafu,
CreateKafkaWalTopicSnafu, DecodeJsonSnafu, EncodeJsonSnafu, InvalidNumTopicsSnafu,
ProduceRecordSnafu, ResolveKafkaEndpointSnafu, Result, TlsConfigSnafu,
};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::PutRequest;
use crate::wal_options_allocator::kafka::topic_selector::{
RoundRobinTopicSelector, TopicSelectorRef,
};
const CREATED_TOPICS_KEY: &str = "__created_wal_topics/kafka/";
// Each topic only has one partition for now.
// The `DEFAULT_PARTITION` refers to the index of the partition.
const DEFAULT_PARTITION: i32 = 0;
/// Manages topic initialization and selection.
pub struct TopicManager {
config: MetasrvKafkaConfig,
pub(crate) topic_pool: Vec<String>,
pub(crate) topic_selector: TopicSelectorRef,
kv_backend: KvBackendRef,
}
impl TopicManager {
/// Creates a new topic manager.
pub fn new(config: MetasrvKafkaConfig, kv_backend: KvBackendRef) -> Self {
// Topics should be created.
let topics = (0..config.kafka_topic.num_topics)
.map(|topic_id| format!("{}_{topic_id}", config.kafka_topic.topic_name_prefix))
.collect::<Vec<_>>();
let selector = match config.kafka_topic.selector_type {
TopicSelectorType::RoundRobin => RoundRobinTopicSelector::with_shuffle(),
};
Self {
config,
topic_pool: topics,
topic_selector: Arc::new(selector),
kv_backend,
}
}
/// Tries to initialize the topic manager.
/// The initializer first tries to restore persisted topics from the kv backend.
/// If not enough topics retrieved, the initializer will try to contact the Kafka cluster and request creating more topics.
pub async fn start(&self) -> Result<()> {
// Skip creating topics.
if !self.config.auto_create_topics {
return Ok(());
}
let num_topics = self.config.kafka_topic.num_topics;
ensure!(num_topics > 0, InvalidNumTopicsSnafu { num_topics });
// Topics should be created.
let topics = &self.topic_pool;
// Topics already created.
// There may have extra topics created but it's okay since those topics won't break topic allocation.
let created_topics = Self::restore_created_topics(&self.kv_backend)
.await?
.into_iter()
.collect::<HashSet<String>>();
// Creates missing topics.
let to_be_created = topics
.iter()
.enumerate()
.filter_map(|(i, topic)| {
if created_topics.contains(topic) {
return None;
}
Some(i)
})
.collect::<Vec<_>>();
if !to_be_created.is_empty() {
self.try_create_topics(topics, &to_be_created).await?;
Self::persist_created_topics(topics, &self.kv_backend).await?;
}
Ok(())
}
/// Tries to create topics specified by indexes in `to_be_created`.
async fn try_create_topics(&self, topics: &[String], to_be_created: &[usize]) -> Result<()> {
// Builds an kafka controller client for creating topics.
let backoff_config = BackoffConfig {
init_backoff: self.config.backoff.init,
max_backoff: self.config.backoff.max,
base: self.config.backoff.base as f64,
deadline: self.config.backoff.deadline,
};
let broker_endpoints =
common_wal::resolve_to_ipv4(&self.config.connection.broker_endpoints)
.await
.context(ResolveKafkaEndpointSnafu)?;
let mut builder = ClientBuilder::new(broker_endpoints).backoff_config(backoff_config);
if let Some(sasl) = &self.config.connection.sasl {
builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
};
if let Some(tls) = &self.config.connection.tls {
builder = builder.tls_config(tls.to_tls_config().await.context(TlsConfigSnafu)?)
};
let client = builder
.build()
.await
.with_context(|_| BuildKafkaClientSnafu {
broker_endpoints: self.config.connection.broker_endpoints.clone(),
})?;
let control_client = client
.controller_client()
.context(BuildKafkaCtrlClientSnafu)?;
// Try to create missing topics.
let tasks = to_be_created
.iter()
.map(|i| async {
self.try_create_topic(&topics[*i], &control_client).await?;
self.try_append_noop_record(&topics[*i], &client).await?;
Ok(())
})
.collect::<Vec<_>>();
futures::future::try_join_all(tasks).await.map(|_| ())
}
/// Selects one topic from the topic pool through the topic selector.
pub fn select(&self) -> Result<&String> {
self.topic_selector.select(&self.topic_pool)
}
/// Selects a batch of topics from the topic pool through the topic selector.
pub fn select_batch(&self, num_topics: usize) -> Result<Vec<&String>> {
(0..num_topics)
.map(|_| self.topic_selector.select(&self.topic_pool))
.collect()
}
async fn try_append_noop_record(&self, topic: &String, client: &Client) -> Result<()> {
let partition_client = client
.partition_client(topic, DEFAULT_PARTITION, UnknownTopicHandling::Retry)
.await
.context(BuildKafkaPartitionClientSnafu {
topic,
partition: DEFAULT_PARTITION,
})?;
partition_client
.produce(
vec![Record {
key: None,
value: None,
timestamp: chrono::Utc::now(),
headers: Default::default(),
}],
Compression::Lz4,
)
.await
.context(ProduceRecordSnafu { topic })?;
Ok(())
}
async fn try_create_topic(&self, topic: &String, client: &ControllerClient) -> Result<()> {
match client
.create_topic(
topic.clone(),
self.config.kafka_topic.num_partitions,
self.config.kafka_topic.replication_factor,
self.config.kafka_topic.create_topic_timeout.as_millis() as i32,
)
.await
{
Ok(_) => {
info!("Successfully created topic {}", topic);
Ok(())
}
Err(e) => {
if Self::is_topic_already_exist_err(&e) {
info!("The topic {} already exists", topic);
Ok(())
} else {
error!("Failed to create a topic {}, error {:?}", topic, e);
Err(e).context(CreateKafkaWalTopicSnafu)
}
}
}
}
async fn restore_created_topics(kv_backend: &KvBackendRef) -> Result<Vec<String>> {
kv_backend
.get(CREATED_TOPICS_KEY.as_bytes())
.await?
.map_or_else(
|| Ok(vec![]),
|key_value| serde_json::from_slice(&key_value.value).context(DecodeJsonSnafu),
)
}
async fn persist_created_topics(topics: &[String], kv_backend: &KvBackendRef) -> Result<()> {
let raw_topics = serde_json::to_vec(topics).context(EncodeJsonSnafu)?;
kv_backend
.put(PutRequest {
key: CREATED_TOPICS_KEY.as_bytes().to_vec(),
value: raw_topics,
prev_kv: false,
})
.await
.map(|_| ())
}
fn is_topic_already_exist_err(e: &RsKafkaError) -> bool {
matches!(
e,
&RsKafkaError::ServerError {
protocol_error: TopicAlreadyExists,
..
}
)
}
}
#[cfg(test)]
mod tests {
use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
use common_wal::test_util::run_test_with_kafka_wal;
use super::*;
use crate::kv_backend::memory::MemoryKvBackend;
// Tests that topics can be successfully persisted into the kv backend and can be successfully restored from the kv backend.
#[tokio::test]
async fn test_restore_persisted_topics() {
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
let topic_name_prefix = "greptimedb_wal_topic";
let num_topics = 16;
// Constructs mock topics.
let topics = (0..num_topics)
.map(|topic| format!("{topic_name_prefix}{topic}"))
.collect::<Vec<_>>();
// Persists topics to kv backend.
TopicManager::persist_created_topics(&topics, &kv_backend)
.await
.unwrap();
// Restores topics from kv backend.
let restored_topics = TopicManager::restore_created_topics(&kv_backend)
.await
.unwrap();
assert_eq!(topics, restored_topics);
}
/// Tests that the topic manager could allocate topics correctly.
#[tokio::test]
async fn test_alloc_topics() {
run_test_with_kafka_wal(|broker_endpoints| {
Box::pin(async {
// Constructs topics that should be created.
let topics = (0..256)
.map(|i| format!("test_alloc_topics_{}_{}", i, uuid::Uuid::new_v4()))
.collect::<Vec<_>>();
// Creates a topic manager.
let kafka_topic = KafkaTopicConfig {
replication_factor: broker_endpoints.len() as i16,
..Default::default()
};
let config = MetasrvKafkaConfig {
connection: KafkaConnectionConfig {
broker_endpoints,
..Default::default()
},
kafka_topic,
..Default::default()
};
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
let mut manager = TopicManager::new(config.clone(), kv_backend);
// Replaces the default topic pool with the constructed topics.
manager.topic_pool.clone_from(&topics);
// Replaces the default selector with a round-robin selector without shuffled.
manager.topic_selector = Arc::new(RoundRobinTopicSelector::default());
manager.start().await.unwrap();
// Selects exactly the number of `num_topics` topics one by one.
let got = (0..topics.len())
.map(|_| manager.select().unwrap())
.cloned()
.collect::<Vec<_>>();
assert_eq!(got, topics);
// Selects exactly the number of `num_topics` topics in a batching manner.
let got = manager
.select_batch(topics.len())
.unwrap()
.into_iter()
.map(ToString::to_string)
.collect::<Vec<_>>();
assert_eq!(got, topics);
// Selects more than the number of `num_topics` topics.
let got = manager
.select_batch(2 * topics.len())
.unwrap()
.into_iter()
.map(ToString::to_string)
.collect::<Vec<_>>();
let expected = vec![topics.clone(); 2]
.into_iter()
.flatten()
.collect::<Vec<_>>();
assert_eq!(got, expected);
})
})
.await;
}
}

View File

@@ -0,0 +1,159 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_telemetry::{error, info};
use common_wal::config::kafka::MetasrvKafkaConfig;
use rskafka::client::error::Error as RsKafkaError;
use rskafka::client::error::ProtocolError::TopicAlreadyExists;
use rskafka::client::partition::{Compression, UnknownTopicHandling};
use rskafka::client::{Client, ClientBuilder};
use rskafka::record::Record;
use rskafka::BackoffConfig;
use snafu::ResultExt;
use crate::error::{
BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, BuildKafkaPartitionClientSnafu,
CreateKafkaWalTopicSnafu, ProduceRecordSnafu, ResolveKafkaEndpointSnafu, Result,
TlsConfigSnafu,
};
// Each topic only has one partition for now.
// The `DEFAULT_PARTITION` refers to the index of the partition.
const DEFAULT_PARTITION: i32 = 0;
/// Creates topics in kafka.
pub struct KafkaTopicCreator {
client: Client,
/// The number of partitions per topic.
num_partitions: i32,
/// The replication factor of each topic.
replication_factor: i16,
/// The timeout of topic creation in milliseconds.
create_topic_timeout: i32,
}
impl KafkaTopicCreator {
async fn create_topic(&self, topic: &String, client: &Client) -> Result<()> {
let controller = client
.controller_client()
.context(BuildKafkaCtrlClientSnafu)?;
match controller
.create_topic(
topic,
self.num_partitions,
self.replication_factor,
self.create_topic_timeout,
)
.await
{
Ok(_) => {
info!("Successfully created topic {}", topic);
Ok(())
}
Err(e) => {
if Self::is_topic_already_exist_err(&e) {
info!("The topic {} already exists", topic);
Ok(())
} else {
error!("Failed to create a topic {}, error {:?}", topic, e);
Err(e).context(CreateKafkaWalTopicSnafu)
}
}
}
}
async fn append_noop_record(&self, topic: &String, client: &Client) -> Result<()> {
let partition_client = client
.partition_client(topic, DEFAULT_PARTITION, UnknownTopicHandling::Retry)
.await
.context(BuildKafkaPartitionClientSnafu {
topic,
partition: DEFAULT_PARTITION,
})?;
partition_client
.produce(
vec![Record {
key: None,
value: None,
timestamp: chrono::Utc::now(),
headers: Default::default(),
}],
Compression::Lz4,
)
.await
.context(ProduceRecordSnafu { topic })?;
Ok(())
}
/// Prepares topics in Kafka.
/// 1. Creates missing topics.
/// 2. Appends a noop record to each topic.
pub async fn prepare_topics(&self, topics: &[&String]) -> Result<()> {
// Try to create missing topics.
let tasks = topics
.iter()
.map(|topic| async {
self.create_topic(topic, &self.client).await?;
self.append_noop_record(topic, &self.client).await?;
Ok(())
})
.collect::<Vec<_>>();
futures::future::try_join_all(tasks).await.map(|_| ())
}
fn is_topic_already_exist_err(e: &RsKafkaError) -> bool {
matches!(
e,
&RsKafkaError::ServerError {
protocol_error: TopicAlreadyExists,
..
}
)
}
}
pub async fn build_kafka_topic_creator(config: &MetasrvKafkaConfig) -> Result<KafkaTopicCreator> {
// Builds an kafka controller client for creating topics.
let backoff_config = BackoffConfig {
init_backoff: config.backoff.init,
max_backoff: config.backoff.max,
base: config.backoff.base as f64,
deadline: config.backoff.deadline,
};
let broker_endpoints = common_wal::resolve_to_ipv4(&config.connection.broker_endpoints)
.await
.context(ResolveKafkaEndpointSnafu)?;
let mut builder = ClientBuilder::new(broker_endpoints).backoff_config(backoff_config);
if let Some(sasl) = &config.connection.sasl {
builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
};
if let Some(tls) = &config.connection.tls {
builder = builder.tls_config(tls.to_tls_config().await.context(TlsConfigSnafu)?)
};
let client = builder
.build()
.await
.with_context(|_| BuildKafkaClientSnafu {
broker_endpoints: config.connection.broker_endpoints.clone(),
})?;
Ok(KafkaTopicCreator {
client,
num_partitions: config.kafka_topic.num_partitions,
replication_factor: config.kafka_topic.replication_factor,
create_topic_timeout: config.kafka_topic.create_topic_timeout.as_millis() as i32,
})
}

View File

@@ -0,0 +1,165 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use crate::error::Result;
use crate::key::topic_name::{TopicNameKey, TopicNameManager};
use crate::kv_backend::KvBackendRef;
/// Manages topics in kvbackend.
/// Responsible for:
/// 1. Restores and persisting topics in kvbackend.
/// 2. Clears topics in legacy format and restores them in the new format.
pub struct KafkaTopicManager {
key_manager: TopicNameManager,
}
impl KafkaTopicManager {
pub fn new(kv_backend: KvBackendRef) -> Self {
Self {
key_manager: TopicNameManager::new(kv_backend),
}
}
async fn restore_topics(&self) -> Result<Vec<String>> {
self.key_manager.update_legacy_topics().await?;
let topics = self.key_manager.range().await?;
Ok(topics)
}
/// Restores topics from the key-value backend. and returns the topics that are not stored in kvbackend.
pub async fn get_topics_to_create<'a>(
&self,
all_topics: &'a [String],
) -> Result<Vec<&'a String>> {
let existing_topics = self.restore_topics().await?;
let existing_topic_set = existing_topics.iter().collect::<HashSet<_>>();
let mut topics_to_create = Vec::with_capacity(all_topics.len());
for topic in all_topics {
if !existing_topic_set.contains(topic) {
topics_to_create.push(topic);
}
}
Ok(topics_to_create)
}
/// Persists topics into the key-value backend.
pub async fn persist_topics(&self, topics: &[String]) -> Result<()> {
self.key_manager
.batch_put(
topics
.iter()
.map(|topic| TopicNameKey::new(topic))
.collect(),
)
.await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use crate::key::LEGACY_TOPIC_KEY_PREFIX;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::rpc::store::PutRequest;
#[tokio::test]
async fn test_restore_legacy_persisted_topics() {
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
let topic_kvbackend_manager = KafkaTopicManager::new(kv_backend.clone());
let all_topics = (0..16)
.map(|i| format!("greptimedb_wal_topic_{}", i))
.collect::<Vec<_>>();
// No legacy topics.
let mut topics_to_be_created = topic_kvbackend_manager
.get_topics_to_create(&all_topics)
.await
.unwrap();
topics_to_be_created.sort();
let mut expected = all_topics.iter().collect::<Vec<_>>();
expected.sort();
assert_eq!(expected, topics_to_be_created);
// A topic pool with 16 topics stored in kvbackend in legacy format.
let topics = "[\"greptimedb_wal_topic_0\",\"greptimedb_wal_topic_1\",\"greptimedb_wal_topic_2\",\"greptimedb_wal_topic_3\",\"greptimedb_wal_topic_4\",\"greptimedb_wal_topic_5\",\"greptimedb_wal_topic_6\",\"greptimedb_wal_topic_7\",\"greptimedb_wal_topic_8\",\"greptimedb_wal_topic_9\",\"greptimedb_wal_topic_10\",\"greptimedb_wal_topic_11\",\"greptimedb_wal_topic_12\",\"greptimedb_wal_topic_13\",\"greptimedb_wal_topic_14\",\"greptimedb_wal_topic_15\"]";
let put_req = PutRequest {
key: LEGACY_TOPIC_KEY_PREFIX.as_bytes().to_vec(),
value: topics.as_bytes().to_vec(),
prev_kv: true,
};
let res = kv_backend.put(put_req).await.unwrap();
assert!(res.prev_kv.is_none());
let topics_to_be_created = topic_kvbackend_manager
.get_topics_to_create(&all_topics)
.await
.unwrap();
assert!(topics_to_be_created.is_empty());
// Legacy topics should be deleted after restoring.
let legacy_topics = kv_backend
.get(LEGACY_TOPIC_KEY_PREFIX.as_bytes())
.await
.unwrap();
assert!(legacy_topics.is_none());
// Then we can restore it from the new format.
let mut restored_topics = topic_kvbackend_manager.restore_topics().await.unwrap();
restored_topics.sort();
let mut expected = all_topics.clone();
expected.sort();
assert_eq!(expected, restored_topics);
}
// Tests that topics can be successfully persisted into the kv backend and can be successfully restored from the kv backend.
#[tokio::test]
async fn test_restore_persisted_topics() {
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
let topic_name_prefix = "greptimedb_wal_topic";
let num_topics = 16;
let all_topics = (0..num_topics)
.map(|i| format!("{}_{}", topic_name_prefix, i))
.collect::<Vec<_>>();
// Constructs mock topics.
let topic_kvbackend_manager = KafkaTopicManager::new(kv_backend);
let mut topics_to_be_created = topic_kvbackend_manager
.get_topics_to_create(&all_topics)
.await
.unwrap();
topics_to_be_created.sort();
let mut expected = all_topics.iter().collect::<Vec<_>>();
expected.sort();
assert_eq!(expected, topics_to_be_created);
// Persists topics to kv backend.
topic_kvbackend_manager
.persist_topics(&all_topics)
.await
.unwrap();
let topics_to_be_created = topic_kvbackend_manager
.get_topics_to_create(&all_topics)
.await
.unwrap();
assert!(topics_to_be_created.is_empty());
}
}

View File

@@ -0,0 +1,180 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_wal::config::kafka::MetasrvKafkaConfig;
use common_wal::TopicSelectorType;
use snafu::ensure;
use crate::error::{InvalidNumTopicsSnafu, Result};
use crate::kv_backend::KvBackendRef;
use crate::wal_options_allocator::selector::{RoundRobinTopicSelector, TopicSelectorRef};
use crate::wal_options_allocator::topic_creator::KafkaTopicCreator;
use crate::wal_options_allocator::topic_manager::KafkaTopicManager;
/// Topic pool for kafka remote wal.
/// Responsible for:
/// 1. Persists topics in kvbackend.
/// 2. Creates topics in kafka.
/// 3. Selects topics for regions.
pub struct KafkaTopicPool {
pub(crate) topics: Vec<String>,
// Manages topics in kvbackend.
topic_manager: KafkaTopicManager,
// Creates topics in kafka.
topic_creator: KafkaTopicCreator,
pub(crate) selector: TopicSelectorRef,
auto_create_topics: bool,
}
impl KafkaTopicPool {
pub fn new(
config: &MetasrvKafkaConfig,
kvbackend: KvBackendRef,
topic_creator: KafkaTopicCreator,
) -> Self {
let num_topics = config.kafka_topic.num_topics;
let prefix = config.kafka_topic.topic_name_prefix.clone();
let topics = (0..num_topics)
.map(|i| format!("{}_{}", prefix, i))
.collect();
let selector = match config.kafka_topic.selector_type {
TopicSelectorType::RoundRobin => RoundRobinTopicSelector::with_shuffle(),
};
let topic_manager = KafkaTopicManager::new(kvbackend);
Self {
topics,
topic_manager,
topic_creator,
selector: Arc::new(selector),
auto_create_topics: config.auto_create_topics,
}
}
/// Tries to activate the topic manager when metasrv becomes the leader.
/// First tries to restore persisted topics from the kv backend.
/// If not enough topics retrieved, it will try to contact the Kafka cluster and request creating more topics.
pub async fn activate(&self) -> Result<()> {
if !self.auto_create_topics {
return Ok(());
}
let num_topics = self.topics.len();
ensure!(num_topics > 0, InvalidNumTopicsSnafu { num_topics });
let topics_to_be_created = self
.topic_manager
.get_topics_to_create(&self.topics)
.await?;
if !topics_to_be_created.is_empty() {
self.topic_creator
.prepare_topics(&topics_to_be_created)
.await?;
self.topic_manager.persist_topics(&self.topics).await?;
}
Ok(())
}
/// Selects one topic from the topic pool through the topic selector.
pub fn select(&self) -> Result<&String> {
self.selector.select(&self.topics)
}
/// Selects a batch of topics from the topic pool through the topic selector.
pub fn select_batch(&self, num_topics: usize) -> Result<Vec<&String>> {
(0..num_topics)
.map(|_| self.selector.select(&self.topics))
.collect()
}
}
#[cfg(test)]
mod tests {
use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
use common_wal::test_util::run_test_with_kafka_wal;
use super::*;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::wal_options_allocator::topic_creator::build_kafka_topic_creator;
/// Tests that the topic manager could allocate topics correctly.
#[tokio::test]
async fn test_alloc_topics() {
run_test_with_kafka_wal(|broker_endpoints| {
Box::pin(async {
// Constructs topics that should be created.
let topics = (0..256)
.map(|i| format!("test_alloc_topics_{}_{}", i, uuid::Uuid::new_v4()))
.collect::<Vec<_>>();
// Creates a topic manager.
let kafka_topic = KafkaTopicConfig {
replication_factor: broker_endpoints.len() as i16,
..Default::default()
};
let config = MetasrvKafkaConfig {
connection: KafkaConnectionConfig {
broker_endpoints,
..Default::default()
},
kafka_topic,
..Default::default()
};
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
let topic_creator = build_kafka_topic_creator(&config).await.unwrap();
let mut topic_pool = KafkaTopicPool::new(&config, kv_backend, topic_creator);
// Replaces the default topic pool with the constructed topics.
topic_pool.topics.clone_from(&topics);
// Replaces the default selector with a round-robin selector without shuffled.
topic_pool.selector = Arc::new(RoundRobinTopicSelector::default());
topic_pool.activate().await.unwrap();
// Selects exactly the number of `num_topics` topics one by one.
let got = (0..topics.len())
.map(|_| topic_pool.select().unwrap())
.cloned()
.collect::<Vec<_>>();
assert_eq!(got, topics);
// Selects exactly the number of `num_topics` topics in a batching manner.
let got = topic_pool
.select_batch(topics.len())
.unwrap()
.into_iter()
.map(ToString::to_string)
.collect::<Vec<_>>();
assert_eq!(got, topics);
// Selects more than the number of `num_topics` topics.
let got = topic_pool
.select_batch(2 * topics.len())
.unwrap()
.into_iter()
.map(ToString::to_string)
.collect::<Vec<_>>();
let expected = vec![topics.clone(); 2]
.into_iter()
.flatten()
.collect::<Vec<_>>();
assert_eq!(got, expected);
})
})
.await;
}
}

View File

@@ -742,6 +742,13 @@ pub enum Error {
location: Location,
source: common_meta::error::Error,
},
#[snafu(display("Failed to build wal options allocator"))]
BuildWalOptionsAllocator {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
},
}
impl Error {
@@ -788,7 +795,8 @@ impl ErrorExt for Error {
| Error::PeerUnavailable { .. }
| Error::ExceededDeadline { .. }
| Error::ChooseItems { .. }
| Error::FlowStateHandler { .. } => StatusCode::Internal,
| Error::FlowStateHandler { .. }
| Error::BuildWalOptionsAllocator { .. } => StatusCode::Internal,
Error::Unsupported { .. } => StatusCode::Unsupported,

View File

@@ -36,7 +36,7 @@ use common_meta::node_manager::NodeManagerRef;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::sequence::SequenceBuilder;
use common_meta::state_store::KvStateStore;
use common_meta::wal_options_allocator::WalOptionsAllocator;
use common_meta::wal_options_allocator::build_wal_options_allocator;
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::ProcedureManagerRef;
use snafu::ResultExt;
@@ -44,7 +44,7 @@ use snafu::ResultExt;
use super::{SelectTarget, FLOW_ID_SEQ};
use crate::cache_invalidator::MetasrvCacheInvalidator;
use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
use crate::error::{self, Result};
use crate::error::{self, BuildWalOptionsAllocatorSnafu, Result};
use crate::flow_meta_alloc::FlowPeerAllocator;
use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
use crate::handler::failure_handler::RegionFailureHandler;
@@ -208,10 +208,10 @@ impl MetasrvBuilder {
table_id: None,
};
let wal_options_allocator = Arc::new(WalOptionsAllocator::new(
options.wal.clone(),
kv_backend.clone(),
));
let wal_options_allocator = build_wal_options_allocator(&options.wal, kv_backend.clone())
.await
.context(BuildWalOptionsAllocatorSnafu)?;
let wal_options_allocator = Arc::new(wal_options_allocator);
let is_remote_wal = wal_options_allocator.is_remote_wal();
let table_metadata_allocator = table_metadata_allocator.unwrap_or_else(|| {
let sequence = Arc::new(

View File

@@ -35,7 +35,7 @@ use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::KvBackendRef;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal_options_allocator::WalOptionsAllocator;
use common_meta::wal_options_allocator::build_wal_options_allocator;
use common_procedure::options::ProcedureConfig;
use common_procedure::ProcedureManagerRef;
use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
@@ -190,10 +190,11 @@ impl GreptimeDbStandaloneBuilder {
.step(10)
.build(),
);
let wal_options_allocator = Arc::new(WalOptionsAllocator::new(
opts.wal.clone().into(),
kv_backend.clone(),
));
let kafka_options = opts.wal.clone().into();
let wal_options_allocator = build_wal_options_allocator(&kafka_options, kv_backend.clone())
.await
.unwrap();
let wal_options_allocator = Arc::new(wal_options_allocator);
let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
table_id_sequence,
wal_options_allocator.clone(),