From 864cc117b31499299a18e01c9977246cc080cc9d Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Fri, 16 May 2025 19:26:47 +0800 Subject: [PATCH] fix: append noop entry when auto topic creation is disabled (#6092) * feat: improve topic management and add stale records cleanup * fix: fix unit tests * chore: apply suggestions from CR * chore: apply suggestions from CR --- src/common/meta/src/error.rs | 23 +- src/common/meta/src/test_util.rs | 36 ++- src/common/meta/src/wal_options_allocator.rs | 92 +++--- .../wal_options_allocator/topic_creator.rs | 301 ++++++++++++++++-- .../wal_options_allocator/topic_manager.rs | 35 +- .../src/wal_options_allocator/topic_pool.rs | 229 ++++++++----- src/common/wal/src/config/kafka/common.rs | 11 +- src/common/wal/src/test_util.rs | 30 ++ src/log-store/src/kafka/client_manager.rs | 8 + src/log-store/src/kafka/log_store.rs | 11 + src/meta-srv/src/metasrv/builder.rs | 2 +- src/meta-srv/src/procedure/wal_prune.rs | 3 +- .../src/procedure/wal_prune/test_util.rs | 10 +- src/mito2/src/test_util.rs | 6 + tests-integration/fixtures/docker-compose.yml | 2 +- 15 files changed, 618 insertions(+), 181 deletions(-) diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 1bc85a898d..e3f23c64af 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -514,11 +514,25 @@ pub enum Error { }, #[snafu(display( - "Failed to build a Kafka partition client, topic: {}, partition: {}", + "Failed to get a Kafka partition client, topic: {}, partition: {}", topic, partition ))] - BuildKafkaPartitionClient { + KafkaPartitionClient { + topic: String, + partition: i32, + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: rskafka::client::error::Error, + }, + + #[snafu(display( + "Failed to get offset from Kafka, topic: {}, partition: {}", + topic, + partition + ))] + KafkaGetOffset { topic: String, partition: i32, #[snafu(implicit)] @@ -843,7 +857,7 @@ impl ErrorExt for Error { | EncodeWalOptions { .. } | BuildKafkaClient { .. } | BuildKafkaCtrlClient { .. } - | BuildKafkaPartitionClient { .. } + | KafkaPartitionClient { .. } | ResolveKafkaEndpoint { .. } | ProduceRecord { .. } | CreateKafkaWalTopic { .. } @@ -852,7 +866,8 @@ impl ErrorExt for Error { | ProcedureOutput { .. } | FromUtf8 { .. } | MetadataCorruption { .. } - | ParseWalOptions { .. } => StatusCode::Unexpected, + | ParseWalOptions { .. } + | KafkaGetOffset { .. } => StatusCode::Unexpected, SendMessage { .. } | GetKvCache { .. } | CacheNotGet { .. } => StatusCode::Internal, diff --git a/src/common/meta/src/test_util.rs b/src/common/meta/src/test_util.rs index 0f94aad814..f87f3df565 100644 --- a/src/common/meta/src/test_util.rs +++ b/src/common/meta/src/test_util.rs @@ -20,6 +20,8 @@ use api::v1::region::{InsertRequests, RegionRequest}; pub use common_base::AffectedRows; use common_query::request::QueryRequest; use common_recordbatch::SendableRecordBatchStream; +use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig}; +use common_wal::config::kafka::MetasrvKafkaConfig; use crate::cache_invalidator::DummyCacheInvalidator; use crate::ddl::flow_meta::FlowMetadataAllocator; @@ -37,7 +39,8 @@ use crate::peer::{Peer, PeerLookupService}; use crate::region_keeper::MemoryRegionKeeper; use crate::region_registry::LeaderRegionRegistry; use crate::sequence::SequenceBuilder; -use crate::wal_options_allocator::WalOptionsAllocator; +use crate::wal_options_allocator::topic_pool::KafkaTopicPool; +use crate::wal_options_allocator::{build_kafka_topic_creator, WalOptionsAllocator}; use crate::{DatanodeId, FlownodeId}; #[async_trait::async_trait] @@ -199,3 +202,34 @@ impl PeerLookupService for NoopPeerLookupService { Ok(Some(Peer::empty(id))) } } + +/// Create a kafka topic pool for testing. +pub async fn test_kafka_topic_pool( + broker_endpoints: Vec, + num_topics: usize, + auto_create_topics: bool, + topic_name_prefix: Option<&str>, +) -> KafkaTopicPool { + let mut config = MetasrvKafkaConfig { + connection: KafkaConnectionConfig { + broker_endpoints, + ..Default::default() + }, + kafka_topic: KafkaTopicConfig { + num_topics, + + ..Default::default() + }, + auto_create_topics, + ..Default::default() + }; + if let Some(prefix) = topic_name_prefix { + config.kafka_topic.topic_name_prefix = prefix.to_string(); + } + let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; + let topic_creator = build_kafka_topic_creator(&config.connection, &config.kafka_topic) + .await + .unwrap(); + + KafkaTopicPool::new(&config, kv_backend, topic_creator) +} diff --git a/src/common/meta/src/wal_options_allocator.rs b/src/common/meta/src/wal_options_allocator.rs index a6e1482f04..9be4655dd7 100644 --- a/src/common/meta/src/wal_options_allocator.rs +++ b/src/common/meta/src/wal_options_allocator.rs @@ -112,7 +112,9 @@ pub async fn build_wal_options_allocator( NAME_PATTERN_REGEX.is_match(prefix), InvalidTopicNamePrefixSnafu { prefix } ); - let topic_creator = build_kafka_topic_creator(kafka_config).await?; + let topic_creator = + build_kafka_topic_creator(&kafka_config.connection, &kafka_config.kafka_topic) + .await?; let topic_pool = KafkaTopicPool::new(kafka_config, kv_backend, topic_creator); Ok(WalOptionsAllocator::Kafka(topic_pool)) } @@ -151,13 +153,16 @@ pub fn prepare_wal_options( mod tests { use std::assert_matches::assert_matches; - use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig}; + use common_wal::config::kafka::common::KafkaTopicConfig; use common_wal::config::kafka::MetasrvKafkaConfig; - use common_wal::test_util::run_test_with_kafka_wal; + use common_wal::maybe_skip_kafka_integration_test; + use common_wal::test_util::get_kafka_endpoints; use super::*; use crate::error::Error; use crate::kv_backend::memory::MemoryKvBackend; + use crate::test_util::test_kafka_topic_pool; + use crate::wal_options_allocator::selector::RoundRobinTopicSelector; // Tests that the wal options allocator could successfully allocate raft-engine wal options. #[tokio::test] @@ -197,55 +202,42 @@ mod tests { assert_matches!(got, Error::InvalidTopicNamePrefix { .. }); } - // Tests that the wal options allocator could successfully allocate Kafka wal options. #[tokio::test] - async fn test_allocator_with_kafka() { - run_test_with_kafka_wal(|broker_endpoints| { - Box::pin(async { - let topics = (0..256) - .map(|i| format!("test_allocator_with_kafka_{}_{}", i, uuid::Uuid::new_v4())) - .collect::>(); - - // Creates a topic manager. - let kafka_topic = KafkaTopicConfig { - replication_factor: broker_endpoints.len() as i16, - ..Default::default() - }; - let config = MetasrvKafkaConfig { - connection: KafkaConnectionConfig { - broker_endpoints, - ..Default::default() - }, - kafka_topic, - ..Default::default() - }; - let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; - let topic_creator = build_kafka_topic_creator(&config).await.unwrap(); - let mut topic_pool = KafkaTopicPool::new(&config, kv_backend, topic_creator); - topic_pool.topics.clone_from(&topics); - topic_pool.selector = Arc::new(selector::RoundRobinTopicSelector::default()); - - // Creates an options allocator. - let allocator = WalOptionsAllocator::Kafka(topic_pool); - allocator.start().await.unwrap(); - - let num_regions = 32; - let regions = (0..num_regions).collect::>(); - let got = allocate_region_wal_options(regions.clone(), &allocator, false).unwrap(); - - // Check the allocated wal options contain the expected topics. - let expected = (0..num_regions) - .map(|i| { - let options = WalOptions::Kafka(KafkaWalOptions { - topic: topics[i as usize].clone(), - }); - (i, serde_json::to_string(&options).unwrap()) - }) - .collect::>(); - assert_eq!(got, expected); - }) - }) + async fn test_allocator_with_kafka_allocate_wal_options() { + common_telemetry::init_default_ut_logging(); + maybe_skip_kafka_integration_test!(); + let num_topics = 5; + let mut topic_pool = test_kafka_topic_pool( + get_kafka_endpoints(), + num_topics, + true, + Some("test_allocator_with_kafka"), + ) .await; + topic_pool.selector = Arc::new(RoundRobinTopicSelector::default()); + let topics = topic_pool.topics.clone(); + // clean up the topics before test + let topic_creator = topic_pool.topic_creator(); + topic_creator.delete_topics(&topics).await.unwrap(); + + // Creates an options allocator. + let allocator = WalOptionsAllocator::Kafka(topic_pool); + allocator.start().await.unwrap(); + + let num_regions = 3; + let regions = (0..num_regions).collect::>(); + let got = allocate_region_wal_options(regions.clone(), &allocator, false).unwrap(); + + // Check the allocated wal options contain the expected topics. + let expected = (0..num_regions) + .map(|i| { + let options = WalOptions::Kafka(KafkaWalOptions { + topic: topics[i as usize].clone(), + }); + (i, serde_json::to_string(&options).unwrap()) + }) + .collect::>(); + assert_eq!(got, expected); } #[tokio::test] diff --git a/src/common/meta/src/wal_options_allocator/topic_creator.rs b/src/common/meta/src/wal_options_allocator/topic_creator.rs index 1a023546d3..3b34331126 100644 --- a/src/common/meta/src/wal_options_allocator/topic_creator.rs +++ b/src/common/meta/src/wal_options_allocator/topic_creator.rs @@ -12,20 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_telemetry::{error, info}; -use common_wal::config::kafka::common::DEFAULT_BACKOFF_CONFIG; -use common_wal::config::kafka::MetasrvKafkaConfig; +use common_telemetry::{debug, error, info}; +use common_wal::config::kafka::common::{ + KafkaConnectionConfig, KafkaTopicConfig, DEFAULT_BACKOFF_CONFIG, +}; use rskafka::client::error::Error as RsKafkaError; use rskafka::client::error::ProtocolError::TopicAlreadyExists; -use rskafka::client::partition::{Compression, UnknownTopicHandling}; +use rskafka::client::partition::{Compression, OffsetAt, PartitionClient, UnknownTopicHandling}; use rskafka::client::{Client, ClientBuilder}; use rskafka::record::Record; use snafu::ResultExt; use crate::error::{ - BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, BuildKafkaPartitionClientSnafu, - CreateKafkaWalTopicSnafu, ProduceRecordSnafu, ResolveKafkaEndpointSnafu, Result, - TlsConfigSnafu, + BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, CreateKafkaWalTopicSnafu, + KafkaGetOffsetSnafu, KafkaPartitionClientSnafu, ProduceRecordSnafu, ResolveKafkaEndpointSnafu, + Result, TlsConfigSnafu, }; // Each topic only has one partition for now. @@ -70,21 +71,47 @@ impl KafkaTopicCreator { info!("The topic {} already exists", topic); Ok(()) } else { - error!("Failed to create a topic {}, error {:?}", topic, e); + error!(e; "Failed to create a topic {}", topic); Err(e).context(CreateKafkaWalTopicSnafu) } } } } - async fn append_noop_record(&self, topic: &String, client: &Client) -> Result<()> { - let partition_client = client + async fn prepare_topic(&self, topic: &String) -> Result<()> { + let partition_client = self.partition_client(topic).await?; + self.append_noop_record(topic, &partition_client).await?; + Ok(()) + } + + /// Creates a [PartitionClient] for the given topic. + async fn partition_client(&self, topic: &str) -> Result { + self.client .partition_client(topic, DEFAULT_PARTITION, UnknownTopicHandling::Retry) .await - .context(BuildKafkaPartitionClientSnafu { + .context(KafkaPartitionClientSnafu { topic, partition: DEFAULT_PARTITION, + }) + } + + /// Appends a noop record to the topic. + /// It only appends a noop record if the topic is empty. + async fn append_noop_record( + &self, + topic: &String, + partition_client: &PartitionClient, + ) -> Result<()> { + let end_offset = partition_client + .get_offset(OffsetAt::Latest) + .await + .context(KafkaGetOffsetSnafu { + topic: topic.to_string(), + partition: DEFAULT_PARTITION, })?; + if end_offset > 0 { + return Ok(()); + } partition_client .produce( @@ -98,22 +125,28 @@ impl KafkaTopicCreator { ) .await .context(ProduceRecordSnafu { topic })?; + debug!("Appended a noop record to topic {}", topic); Ok(()) } + /// Creates topics in Kafka. + pub async fn create_topics(&self, topics: &[String]) -> Result<()> { + let tasks = topics + .iter() + .map(|topic| async { self.create_topic(topic, &self.client).await }) + .collect::>(); + futures::future::try_join_all(tasks).await.map(|_| ()) + } + /// Prepares topics in Kafka. - /// 1. Creates missing topics. - /// 2. Appends a noop record to each topic. - pub async fn prepare_topics(&self, topics: &[&String]) -> Result<()> { + /// + /// It appends a noop record to each topic if the topic is empty. + pub async fn prepare_topics(&self, topics: &[String]) -> Result<()> { // Try to create missing topics. let tasks = topics .iter() - .map(|topic| async { - self.create_topic(topic, &self.client).await?; - self.append_noop_record(topic, &self.client).await?; - Ok(()) - }) + .map(|topic| async { self.prepare_topic(topic).await }) .collect::>(); futures::future::try_join_all(tasks).await.map(|_| ()) } @@ -129,34 +162,244 @@ impl KafkaTopicCreator { } } +#[cfg(test)] +impl KafkaTopicCreator { + pub async fn delete_topics(&self, topics: &[String]) -> Result<()> { + let tasks = topics + .iter() + .map(|topic| async { self.delete_topic(topic, &self.client).await }) + .collect::>(); + futures::future::try_join_all(tasks).await.map(|_| ()) + } + + async fn delete_topic(&self, topic: &String, client: &Client) -> Result<()> { + let controller = client + .controller_client() + .context(BuildKafkaCtrlClientSnafu)?; + match controller.delete_topic(topic, 10).await { + Ok(_) => { + info!("Successfully deleted topic {}", topic); + Ok(()) + } + Err(e) => { + if Self::is_unknown_topic_err(&e) { + info!("The topic {} does not exist", topic); + Ok(()) + } else { + panic!("Failed to delete a topic {}, error: {}", topic, e); + } + } + } + } + + fn is_unknown_topic_err(e: &RsKafkaError) -> bool { + matches!( + e, + &RsKafkaError::ServerError { + protocol_error: rskafka::client::error::ProtocolError::UnknownTopicOrPartition, + .. + } + ) + } + + pub async fn get_partition_client(&self, topic: &str) -> PartitionClient { + self.partition_client(topic).await.unwrap() + } +} /// Builds a kafka [Client](rskafka::client::Client). -pub async fn build_kafka_client(config: &MetasrvKafkaConfig) -> Result { +pub async fn build_kafka_client(connection: &KafkaConnectionConfig) -> Result { // Builds an kafka controller client for creating topics. - let broker_endpoints = common_wal::resolve_to_ipv4(&config.connection.broker_endpoints) + let broker_endpoints = common_wal::resolve_to_ipv4(&connection.broker_endpoints) .await .context(ResolveKafkaEndpointSnafu)?; let mut builder = ClientBuilder::new(broker_endpoints).backoff_config(DEFAULT_BACKOFF_CONFIG); - if let Some(sasl) = &config.connection.sasl { + if let Some(sasl) = &connection.sasl { builder = builder.sasl_config(sasl.config.clone().into_sasl_config()); }; - if let Some(tls) = &config.connection.tls { + if let Some(tls) = &connection.tls { builder = builder.tls_config(tls.to_tls_config().await.context(TlsConfigSnafu)?) }; builder .build() .await .with_context(|_| BuildKafkaClientSnafu { - broker_endpoints: config.connection.broker_endpoints.clone(), + broker_endpoints: connection.broker_endpoints.clone(), }) } /// Builds a [KafkaTopicCreator]. -pub async fn build_kafka_topic_creator(config: &MetasrvKafkaConfig) -> Result { - let client = build_kafka_client(config).await?; +pub async fn build_kafka_topic_creator( + connection: &KafkaConnectionConfig, + kafka_topic: &KafkaTopicConfig, +) -> Result { + let client = build_kafka_client(connection).await?; Ok(KafkaTopicCreator { client, - num_partitions: config.kafka_topic.num_partitions, - replication_factor: config.kafka_topic.replication_factor, - create_topic_timeout: config.kafka_topic.create_topic_timeout.as_millis() as i32, + num_partitions: kafka_topic.num_partitions, + replication_factor: kafka_topic.replication_factor, + create_topic_timeout: kafka_topic.create_topic_timeout.as_millis() as i32, }) } + +#[cfg(test)] +mod tests { + use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig}; + use common_wal::maybe_skip_kafka_integration_test; + use common_wal::test_util::get_kafka_endpoints; + + use super::*; + + async fn test_topic_creator(broker_endpoints: Vec) -> KafkaTopicCreator { + let connection = KafkaConnectionConfig { + broker_endpoints, + ..Default::default() + }; + let kafka_topic = KafkaTopicConfig::default(); + + build_kafka_topic_creator(&connection, &kafka_topic) + .await + .unwrap() + } + + async fn append_records(partition_client: &PartitionClient, num_records: usize) -> Result<()> { + for i in 0..num_records { + partition_client + .produce( + vec![Record { + key: Some(b"test".to_vec()), + value: Some(format!("test {}", i).as_bytes().to_vec()), + timestamp: chrono::Utc::now(), + headers: Default::default(), + }], + Compression::Lz4, + ) + .await + .unwrap(); + } + Ok(()) + } + + #[tokio::test] + async fn test_append_noop_record_to_empty_topic() { + common_telemetry::init_default_ut_logging(); + maybe_skip_kafka_integration_test!(); + let prefix = "append_noop_record_to_empty_topic"; + let creator = test_topic_creator(get_kafka_endpoints()).await; + + let topic = format!("{}{}", prefix, "0"); + // Clean up the topics before test + creator.delete_topics(&[topic.to_string()]).await.unwrap(); + creator.create_topics(&[topic.to_string()]).await.unwrap(); + + let partition_client = creator.partition_client(&topic).await.unwrap(); + let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap(); + assert_eq!(end_offset, 0); + + // The topic is not empty, so no noop record is appended. + creator + .append_noop_record(&topic, &partition_client) + .await + .unwrap(); + let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap(); + assert_eq!(end_offset, 1); + } + + #[tokio::test] + async fn test_append_noop_record_to_non_empty_topic() { + common_telemetry::init_default_ut_logging(); + maybe_skip_kafka_integration_test!(); + let prefix = "append_noop_record_to_non_empty_topic"; + let creator = test_topic_creator(get_kafka_endpoints()).await; + + let topic = format!("{}{}", prefix, "0"); + // Clean up the topics before test + creator.delete_topics(&[topic.to_string()]).await.unwrap(); + + creator.create_topics(&[topic.to_string()]).await.unwrap(); + let partition_client = creator.partition_client(&topic).await.unwrap(); + append_records(&partition_client, 2).await.unwrap(); + + let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap(); + assert_eq!(end_offset, 2); + + // The topic is not empty, so no noop record is appended. + creator + .append_noop_record(&topic, &partition_client) + .await + .unwrap(); + let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap(); + assert_eq!(end_offset, 2); + } + + #[tokio::test] + async fn test_create_topic() { + common_telemetry::init_default_ut_logging(); + maybe_skip_kafka_integration_test!(); + let prefix = "create_topic"; + let creator = test_topic_creator(get_kafka_endpoints()).await; + + let topic = format!("{}{}", prefix, "0"); + // Clean up the topics before test + creator.delete_topics(&[topic.to_string()]).await.unwrap(); + + creator.create_topics(&[topic.to_string()]).await.unwrap(); + // Should be ok + creator.create_topics(&[topic.to_string()]).await.unwrap(); + + let partition_client = creator.partition_client(&topic).await.unwrap(); + let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap(); + assert_eq!(end_offset, 0); + } + + #[tokio::test] + async fn test_prepare_topic() { + common_telemetry::init_default_ut_logging(); + maybe_skip_kafka_integration_test!(); + let prefix = "prepare_topic"; + let creator = test_topic_creator(get_kafka_endpoints()).await; + + let topic = format!("{}{}", prefix, "0"); + // Clean up the topics before test + creator.delete_topics(&[topic.to_string()]).await.unwrap(); + + creator.create_topics(&[topic.to_string()]).await.unwrap(); + creator.prepare_topic(&topic).await.unwrap(); + + let partition_client = creator.partition_client(&topic).await.unwrap(); + let start_offset = partition_client + .get_offset(OffsetAt::Earliest) + .await + .unwrap(); + assert_eq!(start_offset, 0); + + let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap(); + assert_eq!(end_offset, 1); + } + + #[tokio::test] + async fn test_prepare_topic_with_stale_records_without_pruning() { + common_telemetry::init_default_ut_logging(); + maybe_skip_kafka_integration_test!(); + + let prefix = "prepare_topic_with_stale_records_without_pruning"; + let creator = test_topic_creator(get_kafka_endpoints()).await; + + let topic = format!("{}{}", prefix, "0"); + // Clean up the topics before test + creator.delete_topics(&[topic.to_string()]).await.unwrap(); + + creator.create_topics(&[topic.to_string()]).await.unwrap(); + let partition_client = creator.partition_client(&topic).await.unwrap(); + append_records(&partition_client, 10).await.unwrap(); + + creator.prepare_topic(&topic).await.unwrap(); + + let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap(); + assert_eq!(end_offset, 10); + let start_offset = partition_client + .get_offset(OffsetAt::Earliest) + .await + .unwrap(); + assert_eq!(start_offset, 0); + } +} diff --git a/src/common/meta/src/wal_options_allocator/topic_manager.rs b/src/common/meta/src/wal_options_allocator/topic_manager.rs index ebe036b489..072298d0b2 100644 --- a/src/common/meta/src/wal_options_allocator/topic_manager.rs +++ b/src/common/meta/src/wal_options_allocator/topic_manager.rs @@ -40,24 +40,21 @@ impl KafkaTopicManager { Ok(topics) } - /// Restores topics from the key-value backend. and returns the topics that are not stored in kvbackend. - pub async fn get_topics_to_create<'a>( - &self, - all_topics: &'a [String], - ) -> Result> { + /// Returns the topics that are not prepared. + pub async fn unprepare_topics(&self, all_topics: &[String]) -> Result> { let existing_topics = self.restore_topics().await?; let existing_topic_set = existing_topics.iter().collect::>(); let mut topics_to_create = Vec::with_capacity(all_topics.len()); for topic in all_topics { if !existing_topic_set.contains(topic) { - topics_to_create.push(topic); + topics_to_create.push(topic.to_string()); } } Ok(topics_to_create) } - /// Persists topics into the key-value backend. - pub async fn persist_topics(&self, topics: &[String]) -> Result<()> { + /// Persists prepared topics into the key-value backend. + pub async fn persist_prepared_topics(&self, topics: &[String]) -> Result<()> { self.topic_name_manager .batch_put( topics @@ -70,6 +67,14 @@ impl KafkaTopicManager { } } +#[cfg(test)] +impl KafkaTopicManager { + /// Lists all topics in the key-value backend. + pub async fn list_topics(&self) -> Result> { + self.topic_name_manager.range().await + } +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -90,11 +95,11 @@ mod tests { // No legacy topics. let mut topics_to_be_created = topic_kvbackend_manager - .get_topics_to_create(&all_topics) + .unprepare_topics(&all_topics) .await .unwrap(); topics_to_be_created.sort(); - let mut expected = all_topics.iter().collect::>(); + let mut expected = all_topics.clone(); expected.sort(); assert_eq!(expected, topics_to_be_created); @@ -109,7 +114,7 @@ mod tests { assert!(res.prev_kv.is_none()); let topics_to_be_created = topic_kvbackend_manager - .get_topics_to_create(&all_topics) + .unprepare_topics(&all_topics) .await .unwrap(); assert!(topics_to_be_created.is_empty()); @@ -144,21 +149,21 @@ mod tests { let topic_kvbackend_manager = KafkaTopicManager::new(kv_backend); let mut topics_to_be_created = topic_kvbackend_manager - .get_topics_to_create(&all_topics) + .unprepare_topics(&all_topics) .await .unwrap(); topics_to_be_created.sort(); - let mut expected = all_topics.iter().collect::>(); + let mut expected = all_topics.clone(); expected.sort(); assert_eq!(expected, topics_to_be_created); // Persists topics to kv backend. topic_kvbackend_manager - .persist_topics(&all_topics) + .persist_prepared_topics(&all_topics) .await .unwrap(); let topics_to_be_created = topic_kvbackend_manager - .get_topics_to_create(&all_topics) + .unprepare_topics(&all_topics) .await .unwrap(); assert!(topics_to_be_created.is_empty()); diff --git a/src/common/meta/src/wal_options_allocator/topic_pool.rs b/src/common/meta/src/wal_options_allocator/topic_pool.rs index d29517e6d6..e77039d384 100644 --- a/src/common/meta/src/wal_options_allocator/topic_pool.rs +++ b/src/common/meta/src/wal_options_allocator/topic_pool.rs @@ -15,6 +15,7 @@ use std::fmt::{self, Formatter}; use std::sync::Arc; +use common_telemetry::info; use common_wal::config::kafka::MetasrvKafkaConfig; use common_wal::TopicSelectorType; use snafu::ensure; @@ -77,27 +78,35 @@ impl KafkaTopicPool { } /// Tries to activate the topic manager when metasrv becomes the leader. + /// /// First tries to restore persisted topics from the kv backend. - /// If not enough topics retrieved, it will try to contact the Kafka cluster and request creating more topics. + /// If there are unprepared topics (topics that exist in the configuration but not in the kv backend), + /// it will create these topics in Kafka if `auto_create_topics` is enabled. + /// + /// Then it prepares all unprepared topics by appending a noop record if the topic is empty, + /// and persists them in the kv backend for future use. pub async fn activate(&self) -> Result<()> { - if !self.auto_create_topics { - return Ok(()); - } - let num_topics = self.topics.len(); ensure!(num_topics > 0, InvalidNumTopicsSnafu { num_topics }); - let topics_to_be_created = self - .topic_manager - .get_topics_to_create(&self.topics) - .await?; + let unprepared_topics = self.topic_manager.unprepare_topics(&self.topics).await?; - if !topics_to_be_created.is_empty() { + if !unprepared_topics.is_empty() { + if self.auto_create_topics { + info!("Creating {} topics", unprepared_topics.len()); + self.topic_creator.create_topics(&unprepared_topics).await?; + } else { + info!("Auto create topics is disabled, skipping topic creation."); + } self.topic_creator - .prepare_topics(&topics_to_be_created) + .prepare_topics(&unprepared_topics) + .await?; + self.topic_manager + .persist_prepared_topics(&unprepared_topics) .await?; - self.topic_manager.persist_topics(&self.topics).await?; } + info!("Activated topic pool with {} topics", self.topics.len()); + Ok(()) } @@ -114,77 +123,147 @@ impl KafkaTopicPool { } } +#[cfg(test)] +impl KafkaTopicPool { + pub(crate) fn topic_manager(&self) -> &KafkaTopicManager { + &self.topic_manager + } + + pub(crate) fn topic_creator(&self) -> &KafkaTopicCreator { + &self.topic_creator + } +} + #[cfg(test)] mod tests { - use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig}; - use common_wal::test_util::run_test_with_kafka_wal; + use std::assert_matches::assert_matches; + + use common_wal::maybe_skip_kafka_integration_test; + use common_wal::test_util::get_kafka_endpoints; use super::*; - use crate::kv_backend::memory::MemoryKvBackend; - use crate::wal_options_allocator::topic_creator::build_kafka_topic_creator; + use crate::error::Error; + use crate::test_util::test_kafka_topic_pool; + use crate::wal_options_allocator::selector::RoundRobinTopicSelector; + + #[tokio::test] + async fn test_pool_invalid_number_topics_err() { + common_telemetry::init_default_ut_logging(); + maybe_skip_kafka_integration_test!(); + let endpoints = get_kafka_endpoints(); + + let pool = test_kafka_topic_pool(endpoints.clone(), 0, false, None).await; + let err = pool.activate().await.unwrap_err(); + assert_matches!(err, Error::InvalidNumTopics { .. }); + + let pool = test_kafka_topic_pool(endpoints, 0, true, None).await; + let err = pool.activate().await.unwrap_err(); + assert_matches!(err, Error::InvalidNumTopics { .. }); + } + + #[tokio::test] + async fn test_pool_activate_unknown_topics_err() { + common_telemetry::init_default_ut_logging(); + maybe_skip_kafka_integration_test!(); + let pool = + test_kafka_topic_pool(get_kafka_endpoints(), 1, false, Some("unknown_topic")).await; + let err = pool.activate().await.unwrap_err(); + assert_matches!(err, Error::KafkaPartitionClient { .. }); + } + + #[tokio::test] + async fn test_pool_activate() { + common_telemetry::init_default_ut_logging(); + maybe_skip_kafka_integration_test!(); + let pool = + test_kafka_topic_pool(get_kafka_endpoints(), 2, true, Some("pool_activate")).await; + // clean up the topics before test + let topic_creator = pool.topic_creator(); + topic_creator.delete_topics(&pool.topics).await.unwrap(); + + let topic_manager = pool.topic_manager(); + pool.activate().await.unwrap(); + let topics = topic_manager.list_topics().await.unwrap(); + assert_eq!(topics.len(), 2); + } + + #[tokio::test] + async fn test_pool_activate_with_existing_topics() { + common_telemetry::init_default_ut_logging(); + maybe_skip_kafka_integration_test!(); + let prefix = "pool_activate_with_existing_topics"; + let pool = test_kafka_topic_pool(get_kafka_endpoints(), 2, true, Some(prefix)).await; + let topic_creator = pool.topic_creator(); + topic_creator.delete_topics(&pool.topics).await.unwrap(); + + let topic_manager = pool.topic_manager(); + // persists one topic info, then pool.activate() will create new topics that not persisted. + topic_manager + .persist_prepared_topics(&pool.topics[0..1]) + .await + .unwrap(); + + pool.activate().await.unwrap(); + let topics = topic_manager.list_topics().await.unwrap(); + assert_eq!(topics.len(), 2); + + let client = pool.topic_creator().client(); + let topics = client + .list_topics() + .await + .unwrap() + .into_iter() + .filter(|t| t.name.starts_with(prefix)) + .collect::>(); + assert_eq!(topics.len(), 1); + } /// Tests that the topic manager could allocate topics correctly. #[tokio::test] async fn test_alloc_topics() { - run_test_with_kafka_wal(|broker_endpoints| { - Box::pin(async { - // Constructs topics that should be created. - let topics = (0..256) - .map(|i| format!("test_alloc_topics_{}_{}", i, uuid::Uuid::new_v4())) - .collect::>(); - - // Creates a topic manager. - let kafka_topic = KafkaTopicConfig { - replication_factor: broker_endpoints.len() as i16, - ..Default::default() - }; - let config = MetasrvKafkaConfig { - connection: KafkaConnectionConfig { - broker_endpoints, - ..Default::default() - }, - kafka_topic, - ..Default::default() - }; - let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; - let topic_creator = build_kafka_topic_creator(&config).await.unwrap(); - let mut topic_pool = KafkaTopicPool::new(&config, kv_backend, topic_creator); - // Replaces the default topic pool with the constructed topics. - topic_pool.topics.clone_from(&topics); - // Replaces the default selector with a round-robin selector without shuffled. - topic_pool.selector = Arc::new(RoundRobinTopicSelector::default()); - topic_pool.activate().await.unwrap(); - - // Selects exactly the number of `num_topics` topics one by one. - let got = (0..topics.len()) - .map(|_| topic_pool.select().unwrap()) - .cloned() - .collect::>(); - assert_eq!(got, topics); - - // Selects exactly the number of `num_topics` topics in a batching manner. - let got = topic_pool - .select_batch(topics.len()) - .unwrap() - .into_iter() - .map(ToString::to_string) - .collect::>(); - assert_eq!(got, topics); - - // Selects more than the number of `num_topics` topics. - let got = topic_pool - .select_batch(2 * topics.len()) - .unwrap() - .into_iter() - .map(ToString::to_string) - .collect::>(); - let expected = vec![topics.clone(); 2] - .into_iter() - .flatten() - .collect::>(); - assert_eq!(got, expected); - }) - }) + common_telemetry::init_default_ut_logging(); + maybe_skip_kafka_integration_test!(); + let num_topics = 5; + let mut topic_pool = test_kafka_topic_pool( + get_kafka_endpoints(), + num_topics, + true, + Some("test_allocator_with_kafka"), + ) .await; + topic_pool.selector = Arc::new(RoundRobinTopicSelector::default()); + let topics = topic_pool.topics.clone(); + // clean up the topics before test + let topic_creator = topic_pool.topic_creator(); + topic_creator.delete_topics(&topics).await.unwrap(); + + // Selects exactly the number of `num_topics` topics one by one. + let got = (0..topics.len()) + .map(|_| topic_pool.select().unwrap()) + .cloned() + .collect::>(); + assert_eq!(got, topics); + + // Selects exactly the number of `num_topics` topics in a batching manner. + let got = topic_pool + .select_batch(topics.len()) + .unwrap() + .into_iter() + .map(ToString::to_string) + .collect::>(); + assert_eq!(got, topics); + + // Selects more than the number of `num_topics` topics. + let got = topic_pool + .select_batch(2 * topics.len()) + .unwrap() + .into_iter() + .map(ToString::to_string) + .collect::>(); + let expected = vec![topics.clone(); 2] + .into_iter() + .flatten() + .collect::>(); + assert_eq!(got, expected); } } diff --git a/src/common/wal/src/config/kafka/common.rs b/src/common/wal/src/config/kafka/common.rs index e4763687cd..41f9a379db 100644 --- a/src/common/wal/src/config/kafka/common.rs +++ b/src/common/wal/src/config/kafka/common.rs @@ -23,11 +23,16 @@ use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; /// The default backoff config for kafka client. +/// +/// If the operation fails, the client will retry 3 times. +/// The backoff time is 100ms, 300ms, 900ms. pub const DEFAULT_BACKOFF_CONFIG: BackoffConfig = BackoffConfig { init_backoff: Duration::from_millis(100), - max_backoff: Duration::from_secs(10), - base: 2.0, - deadline: Some(Duration::from_secs(120)), + max_backoff: Duration::from_secs(1), + base: 3.0, + // The deadline shouldn't be too long, + // otherwise the client will block the worker loop for a long time. + deadline: Some(Duration::from_secs(3)), }; /// Default interval for auto WAL pruning. diff --git a/src/common/wal/src/test_util.rs b/src/common/wal/src/test_util.rs index 9dab26e837..a01e49feaa 100644 --- a/src/common/wal/src/test_util.rs +++ b/src/common/wal/src/test_util.rs @@ -31,3 +31,33 @@ where test(endpoints).await } + +/// Get the kafka endpoints from the environment variable `GT_KAFKA_ENDPOINTS`. +/// +/// The format of the environment variable is: +/// ``` +/// GT_KAFKA_ENDPOINTS=localhost:9092,localhost:9093 +/// ``` +pub fn get_kafka_endpoints() -> Vec { + let endpoints = std::env::var("GT_KAFKA_ENDPOINTS").unwrap(); + endpoints + .split(',') + .map(|s| s.trim().to_string()) + .collect::>() +} + +#[macro_export] +/// Skip the test if the environment variable `GT_KAFKA_ENDPOINTS` is not set. +/// +/// The format of the environment variable is: +/// ``` +/// GT_KAFKA_ENDPOINTS=localhost:9092,localhost:9093 +/// ``` +macro_rules! maybe_skip_kafka_integration_test { + () => { + if std::env::var("GT_KAFKA_ENDPOINTS").is_err() { + common_telemetry::warn!("The endpoints is empty, skipping the test"); + return; + } + }; +} diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index 65599cb09e..1e172a268c 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -182,6 +182,14 @@ impl ClientManager { } } +#[cfg(test)] +impl ClientManager { + /// Returns the controller client. + pub(crate) fn controller_client(&self) -> rskafka::client::controller::ControllerClient { + self.client.controller_client().unwrap() + } +} + #[cfg(test)] mod tests { use common_wal::test_util::run_test_with_kafka_wal; diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index 66ad613bbc..ec1dce5339 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -552,6 +552,14 @@ mod tests { .collect() } + async fn prepare_topic(logstore: &KafkaLogStore, topic_name: &str) { + let controller_client = logstore.client_manager.controller_client(); + controller_client + .create_topic(topic_name.to_string(), 1, 1, 5000) + .await + .unwrap(); + } + #[tokio::test] async fn test_append_batch_basic() { common_telemetry::init_default_ut_logging(); @@ -573,7 +581,9 @@ mod tests { }; let logstore = KafkaLogStore::try_new(&config, None).await.unwrap(); let topic_name = uuid::Uuid::new_v4().to_string(); + prepare_topic(&logstore, &topic_name).await; let provider = Provider::kafka_provider(topic_name); + let region_entries = (0..5) .map(|i| { let region_id = RegionId::new(1, i); @@ -647,6 +657,7 @@ mod tests { }; let logstore = KafkaLogStore::try_new(&config, None).await.unwrap(); let topic_name = uuid::Uuid::new_v4().to_string(); + prepare_topic(&logstore, &topic_name).await; let provider = Provider::kafka_provider(topic_name); let region_entries = (0..5) .map(|i| { diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 85cccaff11..3c1d30ce91 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -377,7 +377,7 @@ impl MetasrvBuilder { let (tx, rx) = WalPruneManager::channel(); // Safety: Must be remote WAL. let remote_wal_options = options.wal.remote_wal_options().unwrap(); - let kafka_client = build_kafka_client(remote_wal_options) + let kafka_client = build_kafka_client(&remote_wal_options.connection) .await .context(error::BuildKafkaClientSnafu)?; let wal_prune_context = WalPruneContext { diff --git a/src/meta-srv/src/procedure/wal_prune.rs b/src/meta-srv/src/procedure/wal_prune.rs index 19258f1636..e608300328 100644 --- a/src/meta-srv/src/procedure/wal_prune.rs +++ b/src/meta-srv/src/procedure/wal_prune.rs @@ -52,7 +52,7 @@ use crate::Result; pub type KafkaClientRef = Arc; -const DELETE_RECORDS_TIMEOUT: Duration = Duration::from_secs(1); +const DELETE_RECORDS_TIMEOUT: Duration = Duration::from_secs(5); /// The state of WAL pruning. #[derive(Debug, Serialize, Deserialize)] @@ -558,6 +558,7 @@ mod tests { topic_name = format!("test_procedure_execution-{}", topic_name); let mut env = TestEnv::new(); let context = env.build_wal_prune_context(broker_endpoints).await; + TestEnv::prepare_topic(&context.client, &topic_name).await; let mut procedure = WalPruneProcedure::new(topic_name.clone(), context, 10, None); // Before any data in kvbackend is mocked, should return a retryable error. diff --git a/src/meta-srv/src/procedure/wal_prune/test_util.rs b/src/meta-srv/src/procedure/wal_prune/test_util.rs index b7cdbad286..baa9129d3c 100644 --- a/src/meta-srv/src/procedure/wal_prune/test_util.rs +++ b/src/meta-srv/src/procedure/wal_prune/test_util.rs @@ -78,7 +78,7 @@ impl TestEnv { kafka_topic, ..Default::default() }; - Arc::new(build_kafka_client(&config).await.unwrap()) + Arc::new(build_kafka_client(&config.connection).await.unwrap()) } pub async fn build_wal_prune_context(&self, broker_endpoints: Vec) -> WalPruneContext { @@ -91,4 +91,12 @@ impl TestEnv { mailbox: self.mailbox.mailbox().clone(), } } + + pub async fn prepare_topic(client: &Arc, topic_name: &str) { + let controller_client = client.controller_client().unwrap(); + controller_client + .create_topic(topic_name.to_string(), 1, 1, 5000) + .await + .unwrap(); + } } diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 65af855e04..8c51cdaf25 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -145,6 +145,12 @@ pub(crate) async fn prepare_test_for_kafka_log_store(factory: &LogStoreFactory) } pub(crate) async fn append_noop_record(client: &Client, topic: &str) { + let controller_client = client.controller_client().unwrap(); + controller_client + .create_topic(topic, 1, 1, 5000) + .await + .unwrap(); + let partition_client = client .partition_client(topic, 0, UnknownTopicHandling::Retry) .await diff --git a/tests-integration/fixtures/docker-compose.yml b/tests-integration/fixtures/docker-compose.yml index cb2437d2ca..ef0206d4f0 100644 --- a/tests-integration/fixtures/docker-compose.yml +++ b/tests-integration/fixtures/docker-compose.yml @@ -14,7 +14,6 @@ services: - 9092:9092 - 9093:9093 environment: - # KRaft settings KAFKA_CFG_NODE_ID: "1" KAFKA_CFG_PROCESS_ROLES: broker,controller KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@127.0.0.1:2181 @@ -27,6 +26,7 @@ services: KAFKA_BROKER_ID: "1" KAFKA_CLIENT_USERS: "user_kafka" KAFKA_CLIENT_PASSWORDS: "secret" + KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: false depends_on: zookeeper: condition: service_started