From 301ffc1d911c349f9f9086bae45bbe7a985a0539 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Fri, 29 Dec 2023 16:46:48 +0900 Subject: [PATCH] feat(remote_wal): append a noop record after kafka topic initialization (#3040) * feat: append a noop record after kafka topic initialization * chore: apply suggestions from CR * feat: ignore the noop record during the read --- src/common/meta/src/error.rs | 23 +++++++++ .../meta/src/wal/kafka/topic_manager.rs | 48 +++++++++++++++++-- src/log-store/src/kafka/log_store.rs | 4 ++ 3 files changed, 70 insertions(+), 5 deletions(-) diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index c120c8ba93..323d922b9c 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -321,6 +321,27 @@ pub enum Error { error: rskafka::client::error::Error, }, + #[snafu(display( + "Failed to build a Kafka partition client, topic: {}, partition: {}", + topic, + partition + ))] + BuildKafkaPartitionClient { + topic: String, + partition: i32, + location: Location, + #[snafu(source)] + error: rskafka::client::error::Error, + }, + + #[snafu(display("Failed to produce records to Kafka, topic: {}", topic))] + ProduceRecord { + topic: String, + location: Location, + #[snafu(source)] + error: rskafka::client::error::Error, + }, + #[snafu(display("Failed to create a Kafka wal topic"))] CreateKafkaWalTopic { location: Location, @@ -368,6 +389,8 @@ impl ErrorExt for Error { | EncodeWalOptions { .. } | BuildKafkaClient { .. } | BuildKafkaCtrlClient { .. } + | BuildKafkaPartitionClient { .. } + | ProduceRecord { .. } | CreateKafkaWalTopic { .. } | EmptyTopicPool { .. } => StatusCode::Unexpected, diff --git a/src/common/meta/src/wal/kafka/topic_manager.rs b/src/common/meta/src/wal/kafka/topic_manager.rs index 860192b970..80aaa90d40 100644 --- a/src/common/meta/src/wal/kafka/topic_manager.rs +++ b/src/common/meta/src/wal/kafka/topic_manager.rs @@ -21,13 +21,16 @@ use common_telemetry::{debug, error, info}; use rskafka::client::controller::ControllerClient; use rskafka::client::error::Error as RsKafkaError; use rskafka::client::error::ProtocolError::TopicAlreadyExists; -use rskafka::client::ClientBuilder; +use rskafka::client::partition::{Compression, UnknownTopicHandling}; +use rskafka::client::{Client, ClientBuilder}; +use rskafka::record::Record; use rskafka::BackoffConfig; use snafu::{ensure, AsErrorSource, ResultExt}; use crate::error::{ - BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, CreateKafkaWalTopicSnafu, DecodeJsonSnafu, - EncodeJsonSnafu, InvalidNumTopicsSnafu, Result, + BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, BuildKafkaPartitionClientSnafu, + CreateKafkaWalTopicSnafu, DecodeJsonSnafu, EncodeJsonSnafu, InvalidNumTopicsSnafu, + ProduceRecordSnafu, Result, }; use crate::kv_backend::KvBackendRef; use crate::rpc::store::PutRequest; @@ -37,6 +40,10 @@ use crate::wal::kafka::KafkaConfig; const CREATED_TOPICS_KEY: &str = "__created_wal_topics/kafka/"; +// Each topic only has one partition for now. +// The `DEFAULT_PARTITION` refers to the index of the partition. +const DEFAULT_PARTITION: i32 = 0; + /// Manages topic initialization and selection. pub struct TopicManager { config: KafkaConfig, @@ -117,14 +124,20 @@ impl TopicManager { .await .with_context(|_| BuildKafkaClientSnafu { broker_endpoints: self.config.broker_endpoints.clone(), - })? + })?; + + let control_client = client .controller_client() .context(BuildKafkaCtrlClientSnafu)?; // Try to create missing topics. let tasks = to_be_created .iter() - .map(|i| self.try_create_topic(&topics[*i], &client)) + .map(|i| async { + self.try_create_topic(&topics[*i], &control_client).await?; + self.try_append_noop_record(&topics[*i], &client).await?; + Ok(()) + }) .collect::>(); futures::future::try_join_all(tasks).await.map(|_| ()) } @@ -141,6 +154,31 @@ impl TopicManager { .collect() } + async fn try_append_noop_record(&self, topic: &Topic, client: &Client) -> Result<()> { + let partition_client = client + .partition_client(topic, DEFAULT_PARTITION, UnknownTopicHandling::Retry) + .await + .context(BuildKafkaPartitionClientSnafu { + topic, + partition: DEFAULT_PARTITION, + })?; + + partition_client + .produce( + vec![Record { + key: None, + value: None, + timestamp: rskafka::chrono::Utc::now(), + headers: Default::default(), + }], + Compression::NoCompression, + ) + .await + .context(ProduceRecordSnafu { topic })?; + + Ok(()) + } + async fn try_create_topic(&self, topic: &Topic, client: &ControllerClient) -> Result<()> { match client .create_topic( diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index 73b0fe1de2..df64fa6657 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -186,6 +186,10 @@ impl LogStore for KafkaLogStore { record_offset, ns_clone, high_watermark ); + // Ignores the noop record. + if record.record.value.is_none() { + continue; + } let entries = decode_from_record(record.record)?; // Filters entries by region id.