fix: append noop entry when auto topic creation is disabled (#6092)

* feat: improve topic management and add stale records cleanup

* fix: fix unit tests

* chore: apply suggestions from CR

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2025-05-16 19:26:47 +08:00
committed by GitHub
parent 0ea9ab385d
commit 864cc117b3
15 changed files with 618 additions and 181 deletions

View File

@@ -514,11 +514,25 @@ pub enum Error {
},
#[snafu(display(
"Failed to build a Kafka partition client, topic: {}, partition: {}",
"Failed to get a Kafka partition client, topic: {}, partition: {}",
topic,
partition
))]
BuildKafkaPartitionClient {
KafkaPartitionClient {
topic: String,
partition: i32,
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: rskafka::client::error::Error,
},
#[snafu(display(
"Failed to get offset from Kafka, topic: {}, partition: {}",
topic,
partition
))]
KafkaGetOffset {
topic: String,
partition: i32,
#[snafu(implicit)]
@@ -843,7 +857,7 @@ impl ErrorExt for Error {
| EncodeWalOptions { .. }
| BuildKafkaClient { .. }
| BuildKafkaCtrlClient { .. }
| BuildKafkaPartitionClient { .. }
| KafkaPartitionClient { .. }
| ResolveKafkaEndpoint { .. }
| ProduceRecord { .. }
| CreateKafkaWalTopic { .. }
@@ -852,7 +866,8 @@ impl ErrorExt for Error {
| ProcedureOutput { .. }
| FromUtf8 { .. }
| MetadataCorruption { .. }
| ParseWalOptions { .. } => StatusCode::Unexpected,
| ParseWalOptions { .. }
| KafkaGetOffset { .. } => StatusCode::Unexpected,
SendMessage { .. } | GetKvCache { .. } | CacheNotGet { .. } => StatusCode::Internal,

View File

@@ -20,6 +20,8 @@ use api::v1::region::{InsertRequests, RegionRequest};
pub use common_base::AffectedRows;
use common_query::request::QueryRequest;
use common_recordbatch::SendableRecordBatchStream;
use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
use common_wal::config::kafka::MetasrvKafkaConfig;
use crate::cache_invalidator::DummyCacheInvalidator;
use crate::ddl::flow_meta::FlowMetadataAllocator;
@@ -37,7 +39,8 @@ use crate::peer::{Peer, PeerLookupService};
use crate::region_keeper::MemoryRegionKeeper;
use crate::region_registry::LeaderRegionRegistry;
use crate::sequence::SequenceBuilder;
use crate::wal_options_allocator::WalOptionsAllocator;
use crate::wal_options_allocator::topic_pool::KafkaTopicPool;
use crate::wal_options_allocator::{build_kafka_topic_creator, WalOptionsAllocator};
use crate::{DatanodeId, FlownodeId};
#[async_trait::async_trait]
@@ -199,3 +202,34 @@ impl PeerLookupService for NoopPeerLookupService {
Ok(Some(Peer::empty(id)))
}
}
/// Create a kafka topic pool for testing.
pub async fn test_kafka_topic_pool(
broker_endpoints: Vec<String>,
num_topics: usize,
auto_create_topics: bool,
topic_name_prefix: Option<&str>,
) -> KafkaTopicPool {
let mut config = MetasrvKafkaConfig {
connection: KafkaConnectionConfig {
broker_endpoints,
..Default::default()
},
kafka_topic: KafkaTopicConfig {
num_topics,
..Default::default()
},
auto_create_topics,
..Default::default()
};
if let Some(prefix) = topic_name_prefix {
config.kafka_topic.topic_name_prefix = prefix.to_string();
}
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
let topic_creator = build_kafka_topic_creator(&config.connection, &config.kafka_topic)
.await
.unwrap();
KafkaTopicPool::new(&config, kv_backend, topic_creator)
}

View File

@@ -112,7 +112,9 @@ pub async fn build_wal_options_allocator(
NAME_PATTERN_REGEX.is_match(prefix),
InvalidTopicNamePrefixSnafu { prefix }
);
let topic_creator = build_kafka_topic_creator(kafka_config).await?;
let topic_creator =
build_kafka_topic_creator(&kafka_config.connection, &kafka_config.kafka_topic)
.await?;
let topic_pool = KafkaTopicPool::new(kafka_config, kv_backend, topic_creator);
Ok(WalOptionsAllocator::Kafka(topic_pool))
}
@@ -151,13 +153,16 @@ pub fn prepare_wal_options(
mod tests {
use std::assert_matches::assert_matches;
use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
use common_wal::config::kafka::common::KafkaTopicConfig;
use common_wal::config::kafka::MetasrvKafkaConfig;
use common_wal::test_util::run_test_with_kafka_wal;
use common_wal::maybe_skip_kafka_integration_test;
use common_wal::test_util::get_kafka_endpoints;
use super::*;
use crate::error::Error;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::test_util::test_kafka_topic_pool;
use crate::wal_options_allocator::selector::RoundRobinTopicSelector;
// Tests that the wal options allocator could successfully allocate raft-engine wal options.
#[tokio::test]
@@ -197,55 +202,42 @@ mod tests {
assert_matches!(got, Error::InvalidTopicNamePrefix { .. });
}
// Tests that the wal options allocator could successfully allocate Kafka wal options.
#[tokio::test]
async fn test_allocator_with_kafka() {
run_test_with_kafka_wal(|broker_endpoints| {
Box::pin(async {
let topics = (0..256)
.map(|i| format!("test_allocator_with_kafka_{}_{}", i, uuid::Uuid::new_v4()))
.collect::<Vec<_>>();
// Creates a topic manager.
let kafka_topic = KafkaTopicConfig {
replication_factor: broker_endpoints.len() as i16,
..Default::default()
};
let config = MetasrvKafkaConfig {
connection: KafkaConnectionConfig {
broker_endpoints,
..Default::default()
},
kafka_topic,
..Default::default()
};
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
let topic_creator = build_kafka_topic_creator(&config).await.unwrap();
let mut topic_pool = KafkaTopicPool::new(&config, kv_backend, topic_creator);
topic_pool.topics.clone_from(&topics);
topic_pool.selector = Arc::new(selector::RoundRobinTopicSelector::default());
// Creates an options allocator.
let allocator = WalOptionsAllocator::Kafka(topic_pool);
allocator.start().await.unwrap();
let num_regions = 32;
let regions = (0..num_regions).collect::<Vec<_>>();
let got = allocate_region_wal_options(regions.clone(), &allocator, false).unwrap();
// Check the allocated wal options contain the expected topics.
let expected = (0..num_regions)
.map(|i| {
let options = WalOptions::Kafka(KafkaWalOptions {
topic: topics[i as usize].clone(),
});
(i, serde_json::to_string(&options).unwrap())
})
.collect::<HashMap<_, _>>();
assert_eq!(got, expected);
})
})
async fn test_allocator_with_kafka_allocate_wal_options() {
common_telemetry::init_default_ut_logging();
maybe_skip_kafka_integration_test!();
let num_topics = 5;
let mut topic_pool = test_kafka_topic_pool(
get_kafka_endpoints(),
num_topics,
true,
Some("test_allocator_with_kafka"),
)
.await;
topic_pool.selector = Arc::new(RoundRobinTopicSelector::default());
let topics = topic_pool.topics.clone();
// clean up the topics before test
let topic_creator = topic_pool.topic_creator();
topic_creator.delete_topics(&topics).await.unwrap();
// Creates an options allocator.
let allocator = WalOptionsAllocator::Kafka(topic_pool);
allocator.start().await.unwrap();
let num_regions = 3;
let regions = (0..num_regions).collect::<Vec<_>>();
let got = allocate_region_wal_options(regions.clone(), &allocator, false).unwrap();
// Check the allocated wal options contain the expected topics.
let expected = (0..num_regions)
.map(|i| {
let options = WalOptions::Kafka(KafkaWalOptions {
topic: topics[i as usize].clone(),
});
(i, serde_json::to_string(&options).unwrap())
})
.collect::<HashMap<_, _>>();
assert_eq!(got, expected);
}
#[tokio::test]

View File

@@ -12,20 +12,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_telemetry::{error, info};
use common_wal::config::kafka::common::DEFAULT_BACKOFF_CONFIG;
use common_wal::config::kafka::MetasrvKafkaConfig;
use common_telemetry::{debug, error, info};
use common_wal::config::kafka::common::{
KafkaConnectionConfig, KafkaTopicConfig, DEFAULT_BACKOFF_CONFIG,
};
use rskafka::client::error::Error as RsKafkaError;
use rskafka::client::error::ProtocolError::TopicAlreadyExists;
use rskafka::client::partition::{Compression, UnknownTopicHandling};
use rskafka::client::partition::{Compression, OffsetAt, PartitionClient, UnknownTopicHandling};
use rskafka::client::{Client, ClientBuilder};
use rskafka::record::Record;
use snafu::ResultExt;
use crate::error::{
BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, BuildKafkaPartitionClientSnafu,
CreateKafkaWalTopicSnafu, ProduceRecordSnafu, ResolveKafkaEndpointSnafu, Result,
TlsConfigSnafu,
BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, CreateKafkaWalTopicSnafu,
KafkaGetOffsetSnafu, KafkaPartitionClientSnafu, ProduceRecordSnafu, ResolveKafkaEndpointSnafu,
Result, TlsConfigSnafu,
};
// Each topic only has one partition for now.
@@ -70,21 +71,47 @@ impl KafkaTopicCreator {
info!("The topic {} already exists", topic);
Ok(())
} else {
error!("Failed to create a topic {}, error {:?}", topic, e);
error!(e; "Failed to create a topic {}", topic);
Err(e).context(CreateKafkaWalTopicSnafu)
}
}
}
}
async fn append_noop_record(&self, topic: &String, client: &Client) -> Result<()> {
let partition_client = client
async fn prepare_topic(&self, topic: &String) -> Result<()> {
let partition_client = self.partition_client(topic).await?;
self.append_noop_record(topic, &partition_client).await?;
Ok(())
}
/// Creates a [PartitionClient] for the given topic.
async fn partition_client(&self, topic: &str) -> Result<PartitionClient> {
self.client
.partition_client(topic, DEFAULT_PARTITION, UnknownTopicHandling::Retry)
.await
.context(BuildKafkaPartitionClientSnafu {
.context(KafkaPartitionClientSnafu {
topic,
partition: DEFAULT_PARTITION,
})
}
/// Appends a noop record to the topic.
/// It only appends a noop record if the topic is empty.
async fn append_noop_record(
&self,
topic: &String,
partition_client: &PartitionClient,
) -> Result<()> {
let end_offset = partition_client
.get_offset(OffsetAt::Latest)
.await
.context(KafkaGetOffsetSnafu {
topic: topic.to_string(),
partition: DEFAULT_PARTITION,
})?;
if end_offset > 0 {
return Ok(());
}
partition_client
.produce(
@@ -98,22 +125,28 @@ impl KafkaTopicCreator {
)
.await
.context(ProduceRecordSnafu { topic })?;
debug!("Appended a noop record to topic {}", topic);
Ok(())
}
/// Creates topics in Kafka.
pub async fn create_topics(&self, topics: &[String]) -> Result<()> {
let tasks = topics
.iter()
.map(|topic| async { self.create_topic(topic, &self.client).await })
.collect::<Vec<_>>();
futures::future::try_join_all(tasks).await.map(|_| ())
}
/// Prepares topics in Kafka.
/// 1. Creates missing topics.
/// 2. Appends a noop record to each topic.
pub async fn prepare_topics(&self, topics: &[&String]) -> Result<()> {
///
/// It appends a noop record to each topic if the topic is empty.
pub async fn prepare_topics(&self, topics: &[String]) -> Result<()> {
// Try to create missing topics.
let tasks = topics
.iter()
.map(|topic| async {
self.create_topic(topic, &self.client).await?;
self.append_noop_record(topic, &self.client).await?;
Ok(())
})
.map(|topic| async { self.prepare_topic(topic).await })
.collect::<Vec<_>>();
futures::future::try_join_all(tasks).await.map(|_| ())
}
@@ -129,34 +162,244 @@ impl KafkaTopicCreator {
}
}
#[cfg(test)]
impl KafkaTopicCreator {
pub async fn delete_topics(&self, topics: &[String]) -> Result<()> {
let tasks = topics
.iter()
.map(|topic| async { self.delete_topic(topic, &self.client).await })
.collect::<Vec<_>>();
futures::future::try_join_all(tasks).await.map(|_| ())
}
async fn delete_topic(&self, topic: &String, client: &Client) -> Result<()> {
let controller = client
.controller_client()
.context(BuildKafkaCtrlClientSnafu)?;
match controller.delete_topic(topic, 10).await {
Ok(_) => {
info!("Successfully deleted topic {}", topic);
Ok(())
}
Err(e) => {
if Self::is_unknown_topic_err(&e) {
info!("The topic {} does not exist", topic);
Ok(())
} else {
panic!("Failed to delete a topic {}, error: {}", topic, e);
}
}
}
}
fn is_unknown_topic_err(e: &RsKafkaError) -> bool {
matches!(
e,
&RsKafkaError::ServerError {
protocol_error: rskafka::client::error::ProtocolError::UnknownTopicOrPartition,
..
}
)
}
pub async fn get_partition_client(&self, topic: &str) -> PartitionClient {
self.partition_client(topic).await.unwrap()
}
}
/// Builds a kafka [Client](rskafka::client::Client).
pub async fn build_kafka_client(config: &MetasrvKafkaConfig) -> Result<Client> {
pub async fn build_kafka_client(connection: &KafkaConnectionConfig) -> Result<Client> {
// Builds an kafka controller client for creating topics.
let broker_endpoints = common_wal::resolve_to_ipv4(&config.connection.broker_endpoints)
let broker_endpoints = common_wal::resolve_to_ipv4(&connection.broker_endpoints)
.await
.context(ResolveKafkaEndpointSnafu)?;
let mut builder = ClientBuilder::new(broker_endpoints).backoff_config(DEFAULT_BACKOFF_CONFIG);
if let Some(sasl) = &config.connection.sasl {
if let Some(sasl) = &connection.sasl {
builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
};
if let Some(tls) = &config.connection.tls {
if let Some(tls) = &connection.tls {
builder = builder.tls_config(tls.to_tls_config().await.context(TlsConfigSnafu)?)
};
builder
.build()
.await
.with_context(|_| BuildKafkaClientSnafu {
broker_endpoints: config.connection.broker_endpoints.clone(),
broker_endpoints: connection.broker_endpoints.clone(),
})
}
/// Builds a [KafkaTopicCreator].
pub async fn build_kafka_topic_creator(config: &MetasrvKafkaConfig) -> Result<KafkaTopicCreator> {
let client = build_kafka_client(config).await?;
pub async fn build_kafka_topic_creator(
connection: &KafkaConnectionConfig,
kafka_topic: &KafkaTopicConfig,
) -> Result<KafkaTopicCreator> {
let client = build_kafka_client(connection).await?;
Ok(KafkaTopicCreator {
client,
num_partitions: config.kafka_topic.num_partitions,
replication_factor: config.kafka_topic.replication_factor,
create_topic_timeout: config.kafka_topic.create_topic_timeout.as_millis() as i32,
num_partitions: kafka_topic.num_partitions,
replication_factor: kafka_topic.replication_factor,
create_topic_timeout: kafka_topic.create_topic_timeout.as_millis() as i32,
})
}
#[cfg(test)]
mod tests {
use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
use common_wal::maybe_skip_kafka_integration_test;
use common_wal::test_util::get_kafka_endpoints;
use super::*;
async fn test_topic_creator(broker_endpoints: Vec<String>) -> KafkaTopicCreator {
let connection = KafkaConnectionConfig {
broker_endpoints,
..Default::default()
};
let kafka_topic = KafkaTopicConfig::default();
build_kafka_topic_creator(&connection, &kafka_topic)
.await
.unwrap()
}
async fn append_records(partition_client: &PartitionClient, num_records: usize) -> Result<()> {
for i in 0..num_records {
partition_client
.produce(
vec![Record {
key: Some(b"test".to_vec()),
value: Some(format!("test {}", i).as_bytes().to_vec()),
timestamp: chrono::Utc::now(),
headers: Default::default(),
}],
Compression::Lz4,
)
.await
.unwrap();
}
Ok(())
}
#[tokio::test]
async fn test_append_noop_record_to_empty_topic() {
common_telemetry::init_default_ut_logging();
maybe_skip_kafka_integration_test!();
let prefix = "append_noop_record_to_empty_topic";
let creator = test_topic_creator(get_kafka_endpoints()).await;
let topic = format!("{}{}", prefix, "0");
// Clean up the topics before test
creator.delete_topics(&[topic.to_string()]).await.unwrap();
creator.create_topics(&[topic.to_string()]).await.unwrap();
let partition_client = creator.partition_client(&topic).await.unwrap();
let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
assert_eq!(end_offset, 0);
// The topic is not empty, so no noop record is appended.
creator
.append_noop_record(&topic, &partition_client)
.await
.unwrap();
let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
assert_eq!(end_offset, 1);
}
#[tokio::test]
async fn test_append_noop_record_to_non_empty_topic() {
common_telemetry::init_default_ut_logging();
maybe_skip_kafka_integration_test!();
let prefix = "append_noop_record_to_non_empty_topic";
let creator = test_topic_creator(get_kafka_endpoints()).await;
let topic = format!("{}{}", prefix, "0");
// Clean up the topics before test
creator.delete_topics(&[topic.to_string()]).await.unwrap();
creator.create_topics(&[topic.to_string()]).await.unwrap();
let partition_client = creator.partition_client(&topic).await.unwrap();
append_records(&partition_client, 2).await.unwrap();
let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
assert_eq!(end_offset, 2);
// The topic is not empty, so no noop record is appended.
creator
.append_noop_record(&topic, &partition_client)
.await
.unwrap();
let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
assert_eq!(end_offset, 2);
}
#[tokio::test]
async fn test_create_topic() {
common_telemetry::init_default_ut_logging();
maybe_skip_kafka_integration_test!();
let prefix = "create_topic";
let creator = test_topic_creator(get_kafka_endpoints()).await;
let topic = format!("{}{}", prefix, "0");
// Clean up the topics before test
creator.delete_topics(&[topic.to_string()]).await.unwrap();
creator.create_topics(&[topic.to_string()]).await.unwrap();
// Should be ok
creator.create_topics(&[topic.to_string()]).await.unwrap();
let partition_client = creator.partition_client(&topic).await.unwrap();
let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
assert_eq!(end_offset, 0);
}
#[tokio::test]
async fn test_prepare_topic() {
common_telemetry::init_default_ut_logging();
maybe_skip_kafka_integration_test!();
let prefix = "prepare_topic";
let creator = test_topic_creator(get_kafka_endpoints()).await;
let topic = format!("{}{}", prefix, "0");
// Clean up the topics before test
creator.delete_topics(&[topic.to_string()]).await.unwrap();
creator.create_topics(&[topic.to_string()]).await.unwrap();
creator.prepare_topic(&topic).await.unwrap();
let partition_client = creator.partition_client(&topic).await.unwrap();
let start_offset = partition_client
.get_offset(OffsetAt::Earliest)
.await
.unwrap();
assert_eq!(start_offset, 0);
let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
assert_eq!(end_offset, 1);
}
#[tokio::test]
async fn test_prepare_topic_with_stale_records_without_pruning() {
common_telemetry::init_default_ut_logging();
maybe_skip_kafka_integration_test!();
let prefix = "prepare_topic_with_stale_records_without_pruning";
let creator = test_topic_creator(get_kafka_endpoints()).await;
let topic = format!("{}{}", prefix, "0");
// Clean up the topics before test
creator.delete_topics(&[topic.to_string()]).await.unwrap();
creator.create_topics(&[topic.to_string()]).await.unwrap();
let partition_client = creator.partition_client(&topic).await.unwrap();
append_records(&partition_client, 10).await.unwrap();
creator.prepare_topic(&topic).await.unwrap();
let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
assert_eq!(end_offset, 10);
let start_offset = partition_client
.get_offset(OffsetAt::Earliest)
.await
.unwrap();
assert_eq!(start_offset, 0);
}
}

View File

@@ -40,24 +40,21 @@ impl KafkaTopicManager {
Ok(topics)
}
/// Restores topics from the key-value backend. and returns the topics that are not stored in kvbackend.
pub async fn get_topics_to_create<'a>(
&self,
all_topics: &'a [String],
) -> Result<Vec<&'a String>> {
/// Returns the topics that are not prepared.
pub async fn unprepare_topics(&self, all_topics: &[String]) -> Result<Vec<String>> {
let existing_topics = self.restore_topics().await?;
let existing_topic_set = existing_topics.iter().collect::<HashSet<_>>();
let mut topics_to_create = Vec::with_capacity(all_topics.len());
for topic in all_topics {
if !existing_topic_set.contains(topic) {
topics_to_create.push(topic);
topics_to_create.push(topic.to_string());
}
}
Ok(topics_to_create)
}
/// Persists topics into the key-value backend.
pub async fn persist_topics(&self, topics: &[String]) -> Result<()> {
/// Persists prepared topics into the key-value backend.
pub async fn persist_prepared_topics(&self, topics: &[String]) -> Result<()> {
self.topic_name_manager
.batch_put(
topics
@@ -70,6 +67,14 @@ impl KafkaTopicManager {
}
}
#[cfg(test)]
impl KafkaTopicManager {
/// Lists all topics in the key-value backend.
pub async fn list_topics(&self) -> Result<Vec<String>> {
self.topic_name_manager.range().await
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
@@ -90,11 +95,11 @@ mod tests {
// No legacy topics.
let mut topics_to_be_created = topic_kvbackend_manager
.get_topics_to_create(&all_topics)
.unprepare_topics(&all_topics)
.await
.unwrap();
topics_to_be_created.sort();
let mut expected = all_topics.iter().collect::<Vec<_>>();
let mut expected = all_topics.clone();
expected.sort();
assert_eq!(expected, topics_to_be_created);
@@ -109,7 +114,7 @@ mod tests {
assert!(res.prev_kv.is_none());
let topics_to_be_created = topic_kvbackend_manager
.get_topics_to_create(&all_topics)
.unprepare_topics(&all_topics)
.await
.unwrap();
assert!(topics_to_be_created.is_empty());
@@ -144,21 +149,21 @@ mod tests {
let topic_kvbackend_manager = KafkaTopicManager::new(kv_backend);
let mut topics_to_be_created = topic_kvbackend_manager
.get_topics_to_create(&all_topics)
.unprepare_topics(&all_topics)
.await
.unwrap();
topics_to_be_created.sort();
let mut expected = all_topics.iter().collect::<Vec<_>>();
let mut expected = all_topics.clone();
expected.sort();
assert_eq!(expected, topics_to_be_created);
// Persists topics to kv backend.
topic_kvbackend_manager
.persist_topics(&all_topics)
.persist_prepared_topics(&all_topics)
.await
.unwrap();
let topics_to_be_created = topic_kvbackend_manager
.get_topics_to_create(&all_topics)
.unprepare_topics(&all_topics)
.await
.unwrap();
assert!(topics_to_be_created.is_empty());

View File

@@ -15,6 +15,7 @@
use std::fmt::{self, Formatter};
use std::sync::Arc;
use common_telemetry::info;
use common_wal::config::kafka::MetasrvKafkaConfig;
use common_wal::TopicSelectorType;
use snafu::ensure;
@@ -77,27 +78,35 @@ impl KafkaTopicPool {
}
/// Tries to activate the topic manager when metasrv becomes the leader.
///
/// First tries to restore persisted topics from the kv backend.
/// If not enough topics retrieved, it will try to contact the Kafka cluster and request creating more topics.
/// If there are unprepared topics (topics that exist in the configuration but not in the kv backend),
/// it will create these topics in Kafka if `auto_create_topics` is enabled.
///
/// Then it prepares all unprepared topics by appending a noop record if the topic is empty,
/// and persists them in the kv backend for future use.
pub async fn activate(&self) -> Result<()> {
if !self.auto_create_topics {
return Ok(());
}
let num_topics = self.topics.len();
ensure!(num_topics > 0, InvalidNumTopicsSnafu { num_topics });
let topics_to_be_created = self
.topic_manager
.get_topics_to_create(&self.topics)
.await?;
let unprepared_topics = self.topic_manager.unprepare_topics(&self.topics).await?;
if !topics_to_be_created.is_empty() {
if !unprepared_topics.is_empty() {
if self.auto_create_topics {
info!("Creating {} topics", unprepared_topics.len());
self.topic_creator.create_topics(&unprepared_topics).await?;
} else {
info!("Auto create topics is disabled, skipping topic creation.");
}
self.topic_creator
.prepare_topics(&topics_to_be_created)
.prepare_topics(&unprepared_topics)
.await?;
self.topic_manager
.persist_prepared_topics(&unprepared_topics)
.await?;
self.topic_manager.persist_topics(&self.topics).await?;
}
info!("Activated topic pool with {} topics", self.topics.len());
Ok(())
}
@@ -114,77 +123,147 @@ impl KafkaTopicPool {
}
}
#[cfg(test)]
impl KafkaTopicPool {
pub(crate) fn topic_manager(&self) -> &KafkaTopicManager {
&self.topic_manager
}
pub(crate) fn topic_creator(&self) -> &KafkaTopicCreator {
&self.topic_creator
}
}
#[cfg(test)]
mod tests {
use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
use common_wal::test_util::run_test_with_kafka_wal;
use std::assert_matches::assert_matches;
use common_wal::maybe_skip_kafka_integration_test;
use common_wal::test_util::get_kafka_endpoints;
use super::*;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::wal_options_allocator::topic_creator::build_kafka_topic_creator;
use crate::error::Error;
use crate::test_util::test_kafka_topic_pool;
use crate::wal_options_allocator::selector::RoundRobinTopicSelector;
#[tokio::test]
async fn test_pool_invalid_number_topics_err() {
common_telemetry::init_default_ut_logging();
maybe_skip_kafka_integration_test!();
let endpoints = get_kafka_endpoints();
let pool = test_kafka_topic_pool(endpoints.clone(), 0, false, None).await;
let err = pool.activate().await.unwrap_err();
assert_matches!(err, Error::InvalidNumTopics { .. });
let pool = test_kafka_topic_pool(endpoints, 0, true, None).await;
let err = pool.activate().await.unwrap_err();
assert_matches!(err, Error::InvalidNumTopics { .. });
}
#[tokio::test]
async fn test_pool_activate_unknown_topics_err() {
common_telemetry::init_default_ut_logging();
maybe_skip_kafka_integration_test!();
let pool =
test_kafka_topic_pool(get_kafka_endpoints(), 1, false, Some("unknown_topic")).await;
let err = pool.activate().await.unwrap_err();
assert_matches!(err, Error::KafkaPartitionClient { .. });
}
#[tokio::test]
async fn test_pool_activate() {
common_telemetry::init_default_ut_logging();
maybe_skip_kafka_integration_test!();
let pool =
test_kafka_topic_pool(get_kafka_endpoints(), 2, true, Some("pool_activate")).await;
// clean up the topics before test
let topic_creator = pool.topic_creator();
topic_creator.delete_topics(&pool.topics).await.unwrap();
let topic_manager = pool.topic_manager();
pool.activate().await.unwrap();
let topics = topic_manager.list_topics().await.unwrap();
assert_eq!(topics.len(), 2);
}
#[tokio::test]
async fn test_pool_activate_with_existing_topics() {
common_telemetry::init_default_ut_logging();
maybe_skip_kafka_integration_test!();
let prefix = "pool_activate_with_existing_topics";
let pool = test_kafka_topic_pool(get_kafka_endpoints(), 2, true, Some(prefix)).await;
let topic_creator = pool.topic_creator();
topic_creator.delete_topics(&pool.topics).await.unwrap();
let topic_manager = pool.topic_manager();
// persists one topic info, then pool.activate() will create new topics that not persisted.
topic_manager
.persist_prepared_topics(&pool.topics[0..1])
.await
.unwrap();
pool.activate().await.unwrap();
let topics = topic_manager.list_topics().await.unwrap();
assert_eq!(topics.len(), 2);
let client = pool.topic_creator().client();
let topics = client
.list_topics()
.await
.unwrap()
.into_iter()
.filter(|t| t.name.starts_with(prefix))
.collect::<Vec<_>>();
assert_eq!(topics.len(), 1);
}
/// Tests that the topic manager could allocate topics correctly.
#[tokio::test]
async fn test_alloc_topics() {
run_test_with_kafka_wal(|broker_endpoints| {
Box::pin(async {
// Constructs topics that should be created.
let topics = (0..256)
.map(|i| format!("test_alloc_topics_{}_{}", i, uuid::Uuid::new_v4()))
.collect::<Vec<_>>();
// Creates a topic manager.
let kafka_topic = KafkaTopicConfig {
replication_factor: broker_endpoints.len() as i16,
..Default::default()
};
let config = MetasrvKafkaConfig {
connection: KafkaConnectionConfig {
broker_endpoints,
..Default::default()
},
kafka_topic,
..Default::default()
};
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
let topic_creator = build_kafka_topic_creator(&config).await.unwrap();
let mut topic_pool = KafkaTopicPool::new(&config, kv_backend, topic_creator);
// Replaces the default topic pool with the constructed topics.
topic_pool.topics.clone_from(&topics);
// Replaces the default selector with a round-robin selector without shuffled.
topic_pool.selector = Arc::new(RoundRobinTopicSelector::default());
topic_pool.activate().await.unwrap();
// Selects exactly the number of `num_topics` topics one by one.
let got = (0..topics.len())
.map(|_| topic_pool.select().unwrap())
.cloned()
.collect::<Vec<_>>();
assert_eq!(got, topics);
// Selects exactly the number of `num_topics` topics in a batching manner.
let got = topic_pool
.select_batch(topics.len())
.unwrap()
.into_iter()
.map(ToString::to_string)
.collect::<Vec<_>>();
assert_eq!(got, topics);
// Selects more than the number of `num_topics` topics.
let got = topic_pool
.select_batch(2 * topics.len())
.unwrap()
.into_iter()
.map(ToString::to_string)
.collect::<Vec<_>>();
let expected = vec![topics.clone(); 2]
.into_iter()
.flatten()
.collect::<Vec<_>>();
assert_eq!(got, expected);
})
})
common_telemetry::init_default_ut_logging();
maybe_skip_kafka_integration_test!();
let num_topics = 5;
let mut topic_pool = test_kafka_topic_pool(
get_kafka_endpoints(),
num_topics,
true,
Some("test_allocator_with_kafka"),
)
.await;
topic_pool.selector = Arc::new(RoundRobinTopicSelector::default());
let topics = topic_pool.topics.clone();
// clean up the topics before test
let topic_creator = topic_pool.topic_creator();
topic_creator.delete_topics(&topics).await.unwrap();
// Selects exactly the number of `num_topics` topics one by one.
let got = (0..topics.len())
.map(|_| topic_pool.select().unwrap())
.cloned()
.collect::<Vec<_>>();
assert_eq!(got, topics);
// Selects exactly the number of `num_topics` topics in a batching manner.
let got = topic_pool
.select_batch(topics.len())
.unwrap()
.into_iter()
.map(ToString::to_string)
.collect::<Vec<_>>();
assert_eq!(got, topics);
// Selects more than the number of `num_topics` topics.
let got = topic_pool
.select_batch(2 * topics.len())
.unwrap()
.into_iter()
.map(ToString::to_string)
.collect::<Vec<_>>();
let expected = vec![topics.clone(); 2]
.into_iter()
.flatten()
.collect::<Vec<_>>();
assert_eq!(got, expected);
}
}

View File

@@ -23,11 +23,16 @@ use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
/// The default backoff config for kafka client.
///
/// If the operation fails, the client will retry 3 times.
/// The backoff time is 100ms, 300ms, 900ms.
pub const DEFAULT_BACKOFF_CONFIG: BackoffConfig = BackoffConfig {
init_backoff: Duration::from_millis(100),
max_backoff: Duration::from_secs(10),
base: 2.0,
deadline: Some(Duration::from_secs(120)),
max_backoff: Duration::from_secs(1),
base: 3.0,
// The deadline shouldn't be too long,
// otherwise the client will block the worker loop for a long time.
deadline: Some(Duration::from_secs(3)),
};
/// Default interval for auto WAL pruning.

View File

@@ -31,3 +31,33 @@ where
test(endpoints).await
}
/// Get the kafka endpoints from the environment variable `GT_KAFKA_ENDPOINTS`.
///
/// The format of the environment variable is:
/// ```
/// GT_KAFKA_ENDPOINTS=localhost:9092,localhost:9093
/// ```
pub fn get_kafka_endpoints() -> Vec<String> {
let endpoints = std::env::var("GT_KAFKA_ENDPOINTS").unwrap();
endpoints
.split(',')
.map(|s| s.trim().to_string())
.collect::<Vec<_>>()
}
#[macro_export]
/// Skip the test if the environment variable `GT_KAFKA_ENDPOINTS` is not set.
///
/// The format of the environment variable is:
/// ```
/// GT_KAFKA_ENDPOINTS=localhost:9092,localhost:9093
/// ```
macro_rules! maybe_skip_kafka_integration_test {
() => {
if std::env::var("GT_KAFKA_ENDPOINTS").is_err() {
common_telemetry::warn!("The endpoints is empty, skipping the test");
return;
}
};
}

View File

@@ -182,6 +182,14 @@ impl ClientManager {
}
}
#[cfg(test)]
impl ClientManager {
/// Returns the controller client.
pub(crate) fn controller_client(&self) -> rskafka::client::controller::ControllerClient {
self.client.controller_client().unwrap()
}
}
#[cfg(test)]
mod tests {
use common_wal::test_util::run_test_with_kafka_wal;

View File

@@ -552,6 +552,14 @@ mod tests {
.collect()
}
async fn prepare_topic(logstore: &KafkaLogStore, topic_name: &str) {
let controller_client = logstore.client_manager.controller_client();
controller_client
.create_topic(topic_name.to_string(), 1, 1, 5000)
.await
.unwrap();
}
#[tokio::test]
async fn test_append_batch_basic() {
common_telemetry::init_default_ut_logging();
@@ -573,7 +581,9 @@ mod tests {
};
let logstore = KafkaLogStore::try_new(&config, None).await.unwrap();
let topic_name = uuid::Uuid::new_v4().to_string();
prepare_topic(&logstore, &topic_name).await;
let provider = Provider::kafka_provider(topic_name);
let region_entries = (0..5)
.map(|i| {
let region_id = RegionId::new(1, i);
@@ -647,6 +657,7 @@ mod tests {
};
let logstore = KafkaLogStore::try_new(&config, None).await.unwrap();
let topic_name = uuid::Uuid::new_v4().to_string();
prepare_topic(&logstore, &topic_name).await;
let provider = Provider::kafka_provider(topic_name);
let region_entries = (0..5)
.map(|i| {

View File

@@ -377,7 +377,7 @@ impl MetasrvBuilder {
let (tx, rx) = WalPruneManager::channel();
// Safety: Must be remote WAL.
let remote_wal_options = options.wal.remote_wal_options().unwrap();
let kafka_client = build_kafka_client(remote_wal_options)
let kafka_client = build_kafka_client(&remote_wal_options.connection)
.await
.context(error::BuildKafkaClientSnafu)?;
let wal_prune_context = WalPruneContext {

View File

@@ -52,7 +52,7 @@ use crate::Result;
pub type KafkaClientRef = Arc<Client>;
const DELETE_RECORDS_TIMEOUT: Duration = Duration::from_secs(1);
const DELETE_RECORDS_TIMEOUT: Duration = Duration::from_secs(5);
/// The state of WAL pruning.
#[derive(Debug, Serialize, Deserialize)]
@@ -558,6 +558,7 @@ mod tests {
topic_name = format!("test_procedure_execution-{}", topic_name);
let mut env = TestEnv::new();
let context = env.build_wal_prune_context(broker_endpoints).await;
TestEnv::prepare_topic(&context.client, &topic_name).await;
let mut procedure = WalPruneProcedure::new(topic_name.clone(), context, 10, None);
// Before any data in kvbackend is mocked, should return a retryable error.

View File

@@ -78,7 +78,7 @@ impl TestEnv {
kafka_topic,
..Default::default()
};
Arc::new(build_kafka_client(&config).await.unwrap())
Arc::new(build_kafka_client(&config.connection).await.unwrap())
}
pub async fn build_wal_prune_context(&self, broker_endpoints: Vec<String>) -> WalPruneContext {
@@ -91,4 +91,12 @@ impl TestEnv {
mailbox: self.mailbox.mailbox().clone(),
}
}
pub async fn prepare_topic(client: &Arc<Client>, topic_name: &str) {
let controller_client = client.controller_client().unwrap();
controller_client
.create_topic(topic_name.to_string(), 1, 1, 5000)
.await
.unwrap();
}
}

View File

@@ -145,6 +145,12 @@ pub(crate) async fn prepare_test_for_kafka_log_store(factory: &LogStoreFactory)
}
pub(crate) async fn append_noop_record(client: &Client, topic: &str) {
let controller_client = client.controller_client().unwrap();
controller_client
.create_topic(topic, 1, 1, 5000)
.await
.unwrap();
let partition_client = client
.partition_client(topic, 0, UnknownTopicHandling::Retry)
.await

View File

@@ -14,7 +14,6 @@ services:
- 9092:9092
- 9093:9093
environment:
# KRaft settings
KAFKA_CFG_NODE_ID: "1"
KAFKA_CFG_PROCESS_ROLES: broker,controller
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@127.0.0.1:2181
@@ -27,6 +26,7 @@ services:
KAFKA_BROKER_ID: "1"
KAFKA_CLIENT_USERS: "user_kafka"
KAFKA_CLIENT_PASSWORDS: "secret"
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: false
depends_on:
zookeeper:
condition: service_started