From b0d2d26ad8e9663531df0ad31ee99eee5cd3f2a0 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Tue, 9 Dec 2025 12:45:47 +0800 Subject: [PATCH] chore: pick #6845 #6821 into release/v0.16 (#7368) --- src/common/meta/src/error.rs | 4 ---- .../wal_options_allocator/topic_creator.rs | 9 +++----- src/flow/src/heartbeat.rs | 7 +++++- src/frontend/src/heartbeat.rs | 23 +++++++++++-------- src/log-store/src/error.rs | 4 ---- src/log-store/src/kafka/client_manager.rs | 11 +++------ src/meta-srv/src/service/heartbeat.rs | 11 ++++++--- 7 files changed, 33 insertions(+), 36 deletions(-) diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index be0f681d88..9539da39bb 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -528,9 +528,6 @@ pub enum Error { source: common_wal::error::Error, }, - #[snafu(display("Failed to resolve Kafka broker endpoint."))] - ResolveKafkaEndpoint { source: common_wal::error::Error }, - #[snafu(display("Failed to build a Kafka controller client"))] BuildKafkaCtrlClient { #[snafu(implicit)] @@ -1040,7 +1037,6 @@ impl ErrorExt for Error { | BuildKafkaClient { .. } | BuildKafkaCtrlClient { .. } | KafkaPartitionClient { .. } - | ResolveKafkaEndpoint { .. } | ProduceRecord { .. } | CreateKafkaWalTopic { .. } | EmptyTopicPool { .. } diff --git a/src/common/meta/src/wal_options_allocator/topic_creator.rs b/src/common/meta/src/wal_options_allocator/topic_creator.rs index 3b34331126..f19b9476ee 100644 --- a/src/common/meta/src/wal_options_allocator/topic_creator.rs +++ b/src/common/meta/src/wal_options_allocator/topic_creator.rs @@ -25,8 +25,7 @@ use snafu::ResultExt; use crate::error::{ BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, CreateKafkaWalTopicSnafu, - KafkaGetOffsetSnafu, KafkaPartitionClientSnafu, ProduceRecordSnafu, ResolveKafkaEndpointSnafu, - Result, TlsConfigSnafu, + KafkaGetOffsetSnafu, KafkaPartitionClientSnafu, ProduceRecordSnafu, Result, TlsConfigSnafu, }; // Each topic only has one partition for now. @@ -209,10 +208,8 @@ impl KafkaTopicCreator { /// Builds a kafka [Client](rskafka::client::Client). pub async fn build_kafka_client(connection: &KafkaConnectionConfig) -> Result { // Builds an kafka controller client for creating topics. - let broker_endpoints = common_wal::resolve_to_ipv4(&connection.broker_endpoints) - .await - .context(ResolveKafkaEndpointSnafu)?; - let mut builder = ClientBuilder::new(broker_endpoints).backoff_config(DEFAULT_BACKOFF_CONFIG); + let mut builder = ClientBuilder::new(connection.broker_endpoints.clone()) + .backoff_config(DEFAULT_BACKOFF_CONFIG); if let Some(sasl) = &connection.sasl { builder = builder.sasl_config(sasl.config.clone().into_sasl_config()); }; diff --git a/src/flow/src/heartbeat.rs b/src/flow/src/heartbeat.rs index e686526726..0092a90276 100644 --- a/src/flow/src/heartbeat.rs +++ b/src/flow/src/heartbeat.rs @@ -218,6 +218,7 @@ impl HeartbeatTask { if let Some(message) = message { Self::new_heartbeat_request(&heartbeat_request, Some(message), &latest_report) } else { + warn!("Sender has been dropped, exiting the heartbeat loop"); // Receives None that means Sender was dropped, we need to break the current loop break } @@ -259,7 +260,11 @@ impl HeartbeatTask { error!(e; "Error while handling heartbeat response"); } } - Ok(None) => break, + Ok(None) => { + warn!("Heartbeat response stream closed"); + capture_self.start_with_retry(retry_interval).await; + break; + } Err(e) => { error!(e; "Occur error while reading heartbeat response"); capture_self.start_with_retry(retry_interval).await; diff --git a/src/frontend/src/heartbeat.rs b/src/frontend/src/heartbeat.rs index 47dac786b2..d60eb79125 100644 --- a/src/frontend/src/heartbeat.rs +++ b/src/frontend/src/heartbeat.rs @@ -23,7 +23,7 @@ use common_meta::heartbeat::handler::{ }; use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage}; use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message; -use common_telemetry::{debug, error, info}; +use common_telemetry::{debug, error, info, warn}; use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient}; use servers::addrs; use servers::heartbeat_options::HeartbeatOptions; @@ -42,8 +42,8 @@ use crate::metrics::{HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT}; pub struct HeartbeatTask { peer_addr: String, meta_client: Arc, - report_interval: u64, - retry_interval: u64, + report_interval: Duration, + retry_interval: Duration, resp_handler_executor: HeartbeatResponseHandlerExecutorRef, start_time_ms: u64, } @@ -58,8 +58,8 @@ impl HeartbeatTask { HeartbeatTask { peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)), meta_client, - report_interval: heartbeat_opts.interval.as_millis() as u64, - retry_interval: heartbeat_opts.retry_interval.as_millis() as u64, + report_interval: heartbeat_opts.interval, + retry_interval: heartbeat_opts.retry_interval, resp_handler_executor, start_time_ms: common_time::util::current_time_millis() as u64, } @@ -103,13 +103,15 @@ impl HeartbeatTask { HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc(); } } - Ok(None) => break, + Ok(None) => { + warn!("Heartbeat response stream closed"); + capture_self.start_with_retry(retry_interval).await; + break; + } Err(e) => { HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc(); error!(e; "Occur error while reading heartbeat response"); - capture_self - .start_with_retry(Duration::from_millis(retry_interval)) - .await; + capture_self.start_with_retry(retry_interval).await; break; } @@ -177,12 +179,13 @@ impl HeartbeatTask { if let Some(message) = message { Self::new_heartbeat_request(&heartbeat_request, Some(message)) } else { + warn!("Sender has been dropped, exiting the heartbeat loop"); // Receives None that means Sender was dropped, we need to break the current loop break } } _ = &mut sleep => { - sleep.as_mut().reset(Instant::now() + Duration::from_millis(report_interval)); + sleep.as_mut().reset(Instant::now() + report_interval); Self::new_heartbeat_request(&heartbeat_request, None) } }; diff --git a/src/log-store/src/error.rs b/src/log-store/src/error.rs index c8b9469542..b56334a01a 100644 --- a/src/log-store/src/error.rs +++ b/src/log-store/src/error.rs @@ -139,9 +139,6 @@ pub enum Error { error: rskafka::client::error::Error, }, - #[snafu(display("Failed to resolve Kafka broker endpoint."))] - ResolveKafkaEndpoint { source: common_wal::error::Error }, - #[snafu(display( "Failed to build a Kafka partition client, topic: {}, partition: {}", topic, @@ -343,7 +340,6 @@ impl ErrorExt for Error { StartWalTask { .. } | StopWalTask { .. } | IllegalState { .. } - | ResolveKafkaEndpoint { .. } | NoMaxValue { .. } | Cast { .. } | EncodeJson { .. } diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index 1e172a268c..c4128b1648 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -24,9 +24,7 @@ use snafu::ResultExt; use store_api::logstore::provider::KafkaProvider; use tokio::sync::{Mutex, RwLock}; -use crate::error::{ - BuildClientSnafu, BuildPartitionClientSnafu, ResolveKafkaEndpointSnafu, Result, TlsConfigSnafu, -}; +use crate::error::{BuildClientSnafu, BuildPartitionClientSnafu, Result, TlsConfigSnafu}; use crate::kafka::index::{GlobalIndexCollector, NoopCollector}; use crate::kafka::producer::{OrderedBatchProducer, OrderedBatchProducerRef}; @@ -78,11 +76,8 @@ impl ClientManager { high_watermark: Arc, u64>>, ) -> Result { // Sets backoff config for the top-level kafka client and all clients constructed by it. - let broker_endpoints = common_wal::resolve_to_ipv4(&config.connection.broker_endpoints) - .await - .context(ResolveKafkaEndpointSnafu)?; - let mut builder = - ClientBuilder::new(broker_endpoints).backoff_config(DEFAULT_BACKOFF_CONFIG); + let mut builder = ClientBuilder::new(config.connection.broker_endpoints.clone()) + .backoff_config(DEFAULT_BACKOFF_CONFIG); if let Some(sasl) = &config.connection.sasl { builder = builder.sasl_config(sasl.config.clone().into_sasl_config()); }; diff --git a/src/meta-srv/src/service/heartbeat.rs b/src/meta-srv/src/service/heartbeat.rs index fd9232ecd7..d1a3a0e636 100644 --- a/src/meta-srv/src/service/heartbeat.rs +++ b/src/meta-srv/src/service/heartbeat.rs @@ -27,10 +27,9 @@ use snafu::OptionExt; use tokio::sync::mpsc; use tokio::sync::mpsc::Sender; use tokio_stream::wrappers::ReceiverStream; -use tonic::{Request, Response, Streaming}; +use tonic::{Request, Response, Status, Streaming}; -use crate::error; -use crate::error::Result; +use crate::error::{self, Result}; use crate::handler::{HeartbeatHandlerGroup, Pusher, PusherId}; use crate::metasrv::{Context, Metasrv}; use crate::metrics::METRIC_META_HEARTBEAT_RECV; @@ -109,6 +108,12 @@ impl heartbeat_server::Heartbeat for Metasrv { if is_not_leader { warn!("Quit because it is no longer the leader"); + let _ = tx + .send(Err(Status::aborted(format!( + "The requested metasrv node is not leader, node addr: {}", + ctx.server_addr + )))) + .await; break; } }